Skip to content

Commit ab31ffd

Browse files
committed
添加并发线程控制查询结构合并
1 parent 38efd9e commit ab31ffd

File tree

9 files changed

+56
-91
lines changed

9 files changed

+56
-91
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<h1 align="center"> ShardingCore </h1>
22

33

4-
`ShardingCore` 易用、简单、高性能、普适性,是一款扩展针对efcore生态下的分表分库的扩展解决方案,支持efcore2+的所有版本,支持efcore2+的所有数据库、支持自定义路由、动态路由、高性能分页、读写分离的一款组件,如果你喜欢这组件或者这个组件对你有帮助请[点我star github 地址](https://github.com/xuejmnet/sharding-core)
4+
`ShardingCore` 易用、简单、高性能、普适性,是一款扩展针对efcore生态下的分表分库的扩展解决方案,支持efcore2+的所有版本,支持efcore2+的所有数据库、支持自定义路由、动态路由、高性能分页、读写分离的一款组件,如果你喜欢这组件或者这个组件对你有帮助请点击下发star让更多的.neter可以看到使用
55

66
---
77

Lines changed: 7 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using ShardingCore.Exceptions;
2+
using ShardingCore.Extensions;
3+
using ShardingCore.Sharding.Abstractions;
4+
using ShardingCore.Sharding.MergeEngines.ParallelControl;
5+
using System;
36
using System.Linq;
47
using System.Linq.Expressions;
5-
using System.Text;
68
using System.Threading;
79
using System.Threading.Tasks;
8-
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
9-
using ShardingCore.Exceptions;
10-
using ShardingCore.Extensions;
11-
using ShardingCore.Sharding.Abstractions;
12-
using ShardingCore.Sharding.Enumerators;
13-
using ShardingCore.Sharding.MergeEngines.ParallelControl;
14-
using ShardingCore.Sharding.StreamMergeEngines;
1510

1611
namespace ShardingCore.Sharding.MergeEngines.Abstractions
1712
{
@@ -22,7 +17,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
2217
* @Ver: 1.0
2318
2419
*/
25-
public abstract class AbstractBaseMergeEngine<TEntity>
20+
public abstract class AbstractBaseMergeEngine<TEntity>: IAsyncParallelLimit
2621
{
2722

2823
private readonly SemaphoreSlim _semaphore;
@@ -44,51 +39,13 @@ public AbstractBaseMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
4439
_parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut();
4540
}
4641
/// <summary>
47-
/// 执行异步并发
48-
/// </summary>
49-
/// <param name="queryable"></param>
50-
/// <param name="dataSourceName"></param>
51-
/// <param name="routeResult"></param>
52-
/// <param name="efQuery"></param>
53-
/// <param name="cancellationToken"></param>
54-
/// <returns></returns>
55-
public Task<RouteQueryResult<TResult>> AsyncParallelResultExecuteAsync<TResult>(IQueryable queryable, string dataSourceName, TableRouteResult routeResult, Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
56-
{
57-
return AsyncParallelControlExecuteAsync(async () =>
58-
{
59-
var queryResult =
60-
await AsyncParallelResultExecuteAsync0<TResult>(queryable, efQuery, cancellationToken);
61-
62-
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
63-
}, cancellationToken);
64-
}
65-
/// <summary>
66-
/// 异步执行并发的实际方法
67-
/// </summary>
68-
/// <param name="queryable"></param>
69-
/// <param name="efQuery"></param>
70-
/// <param name="cancellationToken"></param>
71-
/// <returns></returns>
72-
public abstract Task<TResult> AsyncParallelResultExecuteAsync0<TResult>(IQueryable queryable, Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken());
73-
/// <summary>
74-
/// 执行异步并发
75-
/// </summary>
76-
/// <param name="queryable"></param>
77-
/// <param name="async"></param>
78-
/// <param name="cancellationToken"></param>
79-
/// <returns></returns>
80-
public Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync(IQueryable<TEntity> queryable, bool async, CancellationToken cancellationToken = new CancellationToken())
81-
{
82-
return AsyncParallelControlExecuteAsync(async () => await AsyncParallelEnumeratorExecuteAsync0(queryable, async, cancellationToken), cancellationToken);
83-
}
84-
/// <summary>
8542
/// 异步多线程控制并发
8643
/// </summary>
8744
/// <typeparam name="TResult"></typeparam>
8845
/// <param name="executeAsync"></param>
8946
/// <param name="cancellationToken"></param>
9047
/// <returns></returns>
91-
public Task<TResult> AsyncParallelControlExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
48+
public Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
9249
{
9350
var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds;
9451
var acquired = this._semaphore.Wait((int)parallelTimeOut);
@@ -120,14 +77,5 @@ public Task<TResult> AsyncParallelControlExecuteAsync<TResult>(Func<Task<TResult
12077
throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint());
12178
}
12279
}
123-
/// <summary>
124-
/// 异步执行并发的实际方法
125-
/// </summary>
126-
/// <param name="queryable"></param>
127-
/// <param name="async"></param>
128-
/// <param name="cancellationToken"></param>
129-
/// <returns></returns>
130-
public abstract Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync0(IQueryable<TEntity> queryable, bool async,
131-
CancellationToken cancellationToken = new CancellationToken());
13280
}
13381
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace ShardingCore.Sharding.MergeEngines.Abstractions
8+
{
9+
/*
10+
* @Author: xjm
11+
* @Description:
12+
* @Date: 2021/10/4 6:25:02
13+
* @Ver: 1.0
14+
15+
*/
16+
public interface IAsyncParallelLimit
17+
{
18+
Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,
19+
CancellationToken cancellationToken = new CancellationToken());
20+
}
21+
}

src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname, TableRout
8787
{
8888
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
8989

90-
return AsyncParallelResultExecuteAsync(asyncExecuteQueryable,dataSourceName,routeResult,efQuery, cancellationToken);
90+
return AsyncParallelResultExecute(asyncExecuteQueryable,dataSourceName,routeResult,efQuery, cancellationToken);
9191

9292
});
9393
}).ToArray();
@@ -98,25 +98,24 @@ private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname, TableRout
9898
/// <summary>
9999
/// 异步并发查询
100100
/// </summary>
101+
/// <typeparam name="TResult"></typeparam>
101102
/// <param name="queryable"></param>
103+
/// <param name="dataSourceName"></param>
104+
/// <param name="routeResult"></param>
102105
/// <param name="efQuery"></param>
103106
/// <param name="cancellationToken"></param>
104-
/// <typeparam name="TResult"></typeparam>
105107
/// <returns></returns>
106-
public override async Task<TResult> AsyncParallelResultExecuteAsync0<TResult>(IQueryable queryable, Func<IQueryable, Task<TResult>> efQuery,
108+
public Task<RouteQueryResult<TResult>> AsyncParallelResultExecute<TResult>(IQueryable queryable,string dataSourceName,TableRouteResult routeResult, Func<IQueryable, Task<TResult>> efQuery,
107109
CancellationToken cancellationToken = new CancellationToken())
108110
{
109-
var queryResult = await efQuery(queryable);
110-
return queryResult;
111-
}
111+
return AsyncParallelLimitExecuteAsync(async () =>
112+
{
113+
var queryResult = await efQuery(queryable);
112114

113-
public override Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync0(IQueryable<TEntity> queryable, bool async,
114-
CancellationToken cancellationToken = new CancellationToken())
115-
{
116-
throw new NotImplementedException();
115+
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
116+
},cancellationToken);
117117
}
118118

119-
120119
public virtual IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable)
121120
{
122121
return queryable;

src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -100,25 +100,22 @@ private IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(bool
100100
/// <param name="async"></param>
101101
/// <param name="cancellationToken"></param>
102102
/// <returns></returns>
103-
public override async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync0(IQueryable<TEntity> queryable, bool async,
103+
public Task<StreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
104104
CancellationToken cancellationToken = new CancellationToken())
105105
{
106-
if (async)
106+
return AsyncParallelLimitExecuteAsync(async () =>
107107
{
108-
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
109-
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
110-
}
111-
else
112-
{
113-
var enumerator = GetEnumerator0(queryable);
114-
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
115-
}
116-
}
117-
118-
public override Task<TResult> AsyncParallelResultExecuteAsync0<TResult>(IQueryable queryable, Func<IQueryable, Task<TResult>> efQuery,
119-
CancellationToken cancellationToken = new CancellationToken())
120-
{
121-
throw new NotImplementedException();
108+
if (async)
109+
{
110+
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
111+
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
112+
}
113+
else
114+
{
115+
var enumerator = GetEnumerator0(queryable);
116+
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
117+
}
118+
}, cancellationToken);
122119
}
123120

124121
/// <summary>

src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeA
105105
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
106106
{
107107
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult, reSetOrders);
108-
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async, cancellationToken);
108+
return AsyncParallelEnumerator(newQueryable, async, cancellationToken);
109109
}).ToArray();
110110

111111
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();

src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntit
3535
return tableRouteResults.Select(routeResult =>
3636
{
3737
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult);
38-
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async,cancellationToken);
38+
return AsyncParallelEnumerator(newQueryable, async,cancellationToken);
3939
});
4040

4141
}).ToArray();

src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeA
5151
return StreamMergeContext.TableRouteResults.Select(routeResult =>
5252
{
5353
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult);
54-
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async,cancellationToken);
54+
return AsyncParallelEnumerator(newQueryable, async,cancellationToken);
5555
});
5656
}).ToArray();;
5757

src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeA
9292
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
9393
{
9494
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult);
95-
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async,cancellationToken);
95+
return AsyncParallelEnumerator(newQueryable, async,cancellationToken);
9696
}).ToArray();
9797

9898
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();

0 commit comments

Comments
 (0)