Skip to content

Commit b0f1108

Browse files
committed
优化group和reverse等查询聚合
1 parent b1da6ee commit b0f1108

File tree

4 files changed

+23
-27
lines changed

4 files changed

+23
-27
lines changed

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,26 @@ public AppendOrderSequenceEnumerableShardingMerger(StreamMergeContext streamMerg
1414
streamMergeContext, async)
1515
{
1616
}
17-
1817
public override IStreamMergeAsyncEnumerator<TEntity> StreamMerge(
1918
List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
2019
{
2120
if (GetStreamMergeContext().HasGroupQuery())
21+
{
2222
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
2323
parallelResults);
24+
}
25+
2426
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults);
2527
}
28+
29+
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
30+
{
31+
//如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥
32+
if (GetStreamMergeContext().GroupQueryMemoryMerge())
33+
{
34+
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults);
35+
}
36+
return StreamMerge(parallelResults);
37+
}
2638
}
2739
}

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,5 @@ public LastOrDefaultEnumerableShardingMerger(StreamMergeContext streamMergeConte
1010
streamMergeContext, async)
1111
{
1212
}
13-
14-
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
15-
{
16-
if (GetStreamMergeContext().IsPaginationQuery())
17-
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
18-
19-
return base.StreamInMemoryMerge(parallelResults);
20-
}
2113
}
2214
}

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,10 @@ public ReverseEnumerableShardingMerger(StreamMergeContext streamMergeContext, bo
1111
{
1212
}
1313

14-
// protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(
15-
// List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
16-
// {
17-
// // if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
18-
// // {
19-
// // var multiAggregateOrderStreamMergeAsyncEnumerator =
20-
// // new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
21-
// // parallelResults);
22-
// // return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
23-
// // new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0,
24-
// // GetStreamMergeContext().GetPaginationReWriteTake());
25-
// // }
26-
//
27-
// if (GetStreamMergeContext().IsPaginationQuery())
28-
// return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0,
29-
// GetStreamMergeContext().GetPaginationReWriteTake());
30-
// return base.StreamInMemoryMerge(parallelResults);
31-
// }
14+
public override IStreamMergeAsyncEnumerator<TEntity> StreamMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
15+
{
16+
var streamMergeAsyncEnumerator = base.StreamMerge(parallelResults);
17+
return new InMemoryReverseStreamMergeAsyncEnumerator<TEntity>(streamMergeAsyncEnumerator);
18+
}
3219
}
3320
}

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ public override IStreamMergeAsyncEnumerator<TEntity> StreamMerge(
2525

2626
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
2727
{
28+
//如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥
29+
if (GetStreamMergeContext().GroupQueryMemoryMerge())
30+
{
31+
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults);
32+
}
2833
return StreamMerge(parallelResults);
2934
}
3035
}

0 commit comments

Comments
 (0)