Skip to content

Commit b1da6ee

Browse files
committed
优化group
1 parent a1cc753 commit b1da6ee

File tree

8 files changed

+87
-47
lines changed

8 files changed

+87
-47
lines changed

src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryGroupByOrderStreamMergeAsyncEnumerator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private async Task<IEnumerator<T>> GetAllRowsAsync(IStreamMergeAsyncEnumerator<T
4444
list.Add(streamMergeAsyncEnumerator.GetCurrent());
4545
_inMemoryReallyCount++;
4646
}
47-
return list.AsQueryable().OrderWithExpression(_streamMergeContext.Orders).GetEnumerator();
47+
return list.AsQueryable().OrderWithExpression(_streamMergeContext.GroupByContext.PropertyOrders).GetEnumerator();
4848
}
4949
private IEnumerator<T> GetAllRows(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
5050
{
@@ -60,7 +60,7 @@ private IEnumerator<T> GetAllRows(IStreamMergeAsyncEnumerator<T> streamMergeAsyn
6060
_inMemoryReallyCount++;
6161
}
6262

63-
return list.AsQueryable().OrderWithExpression(_streamMergeContext.Orders).GetEnumerator();
63+
return list.AsQueryable().OrderWithExpression(_streamMergeContext.GroupByContext.PropertyOrders).GetEnumerator();
6464
}
6565

6666
public bool SkipFirst()

src/ShardingCore/Sharding/MergeContexts/GroupByContext.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Collections.Generic;
12
using System.Linq.Expressions;
23

34
namespace ShardingCore.Sharding.MergeContexts
@@ -18,6 +19,11 @@ public class GroupByContext
1819
/// 是否内存聚合
1920
/// </summary>
2021
public bool GroupMemoryMerge { get; set; }
22+
public List<PropertyOrder> PropertyOrders { get; } = new List<PropertyOrder>();
23+
public string GetOrderExpression()
24+
{
25+
return string.Join(",", PropertyOrders);
26+
}
2127

2228
}
2329
}

src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,39 @@ public IRewriteResult GetRewriteQueryable(IMergeQueryCompilerContext mergeQueryC
104104
}
105105
else
106106
{
107-
var groupKeys = selectGroupKeyProperties.Select(o=>o.PropertyName).ToHashSet();
107+
var groupKeys = selectGroupKeyProperties.Select(o => o.PropertyName).ToHashSet();
108108
bool groupMemoryMerge = false;
109109
foreach (var propertyOrder in orders)
110110
{
111-
if (groupKeys.IsEmpty())
111+
groupByContext.PropertyOrders.Add(propertyOrder);
112+
if (!groupMemoryMerge && !groupKeys.IsEmpty())
112113
{
113-
break;
114-
}
115-
if (!groupKeys.Contains(propertyOrder.PropertyExpression))
116-
{
117-
groupMemoryMerge = true;
118-
break;
114+
if (!groupKeys.Contains(propertyOrder.PropertyExpression))
115+
{
116+
groupMemoryMerge = true;
117+
}
118+
119+
groupKeys.Remove(propertyOrder.PropertyExpression);
119120
}
120-
groupKeys.Remove(propertyOrder.PropertyExpression);
121121
}
122+
122123
//判断是否优先group key排序如果不是就是要内存聚合
123124
groupByContext.GroupMemoryMerge = groupMemoryMerge;
124-
125-
126-
var sort = string.Join(",", selectGroupKeyProperties.Select(o => $"{o.PropertyName} asc"));
127-
reWriteQueryable = reWriteQueryable.RemoveAnyOrderBy().OrderWithExpression(sort, null);
125+
if (groupByContext.GroupMemoryMerge)
126+
{
127+
if (groupByContext.GroupMemoryMerge)
128+
{
129+
var sort = string.Join(",", selectGroupKeyProperties.Select(o => $"{o.PropertyName} asc"));
130+
reWriteQueryable = reWriteQueryable.RemoveAnyOrderBy().OrderWithExpression(sort, null);
131+
}
132+
133+
orders.Clear();
134+
foreach (var orderProperty in selectGroupKeyProperties)
135+
{
136+
orders.AddLast(new PropertyOrder(orderProperty.PropertyName, true,
137+
orderProperty.OwnerType));
138+
}
139+
}
128140
}
129141

130142
// else if (!mergeQueryCompilerContext.UseUnionAllMerge())
@@ -188,11 +200,13 @@ o is SelectSumProperty selectSumProperty &&
188200
//}
189201
}
190202

191-
if (mergeQueryCompilerContext.UseUnionAllMerge() &
192-
!mergeQueryCompilerContext.GetShardingDbContext().SupportUnionAllMerge())
203+
if (mergeQueryCompilerContext.UseUnionAllMerge())
193204
{
194-
throw new ShardingCoreException(
195-
$"if use {nameof(EntityFrameworkShardingQueryableExtension.UseUnionAllMerge)} plz rewrite {nameof(IQuerySqlGeneratorFactory)} with {nameof(IUnionAllMergeQuerySqlGeneratorFactory)} and {nameof(IQueryCompiler)} with {nameof(IUnionAllMergeQueryCompiler)}");
205+
if (!mergeQueryCompilerContext.GetShardingDbContext().SupportUnionAllMerge())
206+
{
207+
throw new ShardingCoreException(
208+
$"if use {nameof(EntityFrameworkShardingQueryableExtension.UseUnionAllMerge)} plz rewrite {nameof(IQuerySqlGeneratorFactory)} with {nameof(IUnionAllMergeQuerySqlGeneratorFactory)} and {nameof(IQueryCompiler)} with {nameof(IUnionAllMergeQueryCompiler)}");
209+
}
196210
}
197211

198212
return new RewriteResult(combineQueryable, reWriteQueryable);

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ public AbstractEnumerableShardingMerger(StreamMergeContext streamMergeContext, b
2323
}
2424
public virtual IStreamMergeAsyncEnumerator<TEntity> StreamMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
2525
{
26+
//如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥
27+
if (GetStreamMergeContext().GroupQueryMemoryMerge())
28+
{
29+
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(_streamMergeContext, parallelResults);
30+
//内存按key聚合好之后需要进行重排序按order
31+
var inMemoryGroupByOrderStreamMergeAsyncEnumerator = new InMemoryGroupByOrderStreamMergeAsyncEnumerator<TEntity>(_streamMergeContext,multiAggregateOrderStreamMergeAsyncEnumerator, _async);
32+
if (_streamMergeContext.IsPaginationQuery())
33+
{
34+
//分页的前提下还需要进行内存分页
35+
return new PaginationStreamMergeAsyncEnumerator<TEntity>(_streamMergeContext,new[]{inMemoryGroupByOrderStreamMergeAsyncEnumerator});
36+
}
37+
38+
return inMemoryGroupByOrderStreamMergeAsyncEnumerator;
39+
}
2640
if (_streamMergeContext.IsPaginationQuery())
2741
return new PaginationStreamMergeAsyncEnumerator<TEntity>(_streamMergeContext, parallelResults);
2842
if (_streamMergeContext.HasGroupQuery())
@@ -32,6 +46,15 @@ public virtual IStreamMergeAsyncEnumerator<TEntity> StreamMerge(List<IStreamMerg
3246

3347
protected virtual IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
3448
{
49+
//如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥
50+
if (GetStreamMergeContext().GroupQueryMemoryMerge())
51+
{
52+
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(_streamMergeContext, parallelResults);
53+
}
54+
if (GetStreamMergeContext().IsPaginationQuery())
55+
{
56+
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
57+
}
3558
return StreamMerge(parallelResults);
3659
}
3760

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,5 @@ internal class DefaultEnumerableShardingMerger<TEntity>:AbstractEnumerableShardi
99
public DefaultEnumerableShardingMerger(StreamMergeContext streamMergeContext,bool async) : base(streamMergeContext,async)
1010
{
1111
}
12-
13-
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
14-
{
15-
if (GetStreamMergeContext().IsPaginationQuery())
16-
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
17-
18-
return base.StreamInMemoryMerge(parallelResults);
19-
}
2012
}
21-
}
13+
}

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,23 @@ public ReverseEnumerableShardingMerger(StreamMergeContext streamMergeContext, bo
1111
{
1212
}
1313

14-
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(
15-
List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
16-
{
17-
if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
18-
{
19-
var multiAggregateOrderStreamMergeAsyncEnumerator =
20-
new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
21-
parallelResults);
22-
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
23-
new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0,
24-
GetStreamMergeContext().GetPaginationReWriteTake());
25-
}
26-
27-
if (GetStreamMergeContext().IsPaginationQuery())
28-
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0,
29-
GetStreamMergeContext().GetPaginationReWriteTake());
30-
return base.StreamInMemoryMerge(parallelResults);
31-
}
14+
// protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(
15+
// List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
16+
// {
17+
// // if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
18+
// // {
19+
// // var multiAggregateOrderStreamMergeAsyncEnumerator =
20+
// // new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
21+
// // parallelResults);
22+
// // return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(),
23+
// // new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0,
24+
// // GetStreamMergeContext().GetPaginationReWriteTake());
25+
// // }
26+
//
27+
// if (GetStreamMergeContext().IsPaginationQuery())
28+
// return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0,
29+
// GetStreamMergeContext().GetPaginationReWriteTake());
30+
// return base.StreamInMemoryMerge(parallelResults);
31+
// }
3232
}
3333
}

src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,10 @@ public override IStreamMergeAsyncEnumerator<TEntity> StreamMerge(
2222

2323
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults);
2424
}
25+
26+
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
27+
{
28+
return StreamMerge(parallelResults);
29+
}
2530
}
2631
}

src/ShardingCore/Sharding/StreamMergeContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public bool HasGroupQuery()
198198
/// <returns></returns>
199199
public bool GroupQueryMemoryMerge()
200200
{
201-
return this.GroupByContext.GroupMemoryMerge;
201+
return HasGroupQuery()&&this.GroupByContext.GroupMemoryMerge;
202202
}
203203

204204
public bool IsMergeQuery()

0 commit comments

Comments
 (0)