Skip to content

Commit 4e22e38

Browse files
feat: add polling to get the number of active queues
1 parent 25f7a06 commit 4e22e38

24 files changed

+314
-11
lines changed

samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,18 @@ internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer(
174174
.WithCleanupPollingConfiguration(
175175
configure => configure
176176
.Enabled(false)
177+
.WithCronExpression("0 0/1 * 1/1 * ? *")
178+
)
179+
.WithRetryDurableActiveQueuesCountPollingConfiguration(
180+
configure => configure
181+
.Enabled(true)
182+
.WithCronExpression("0 0/1 * 1/1 * ? *")
183+
.Do((numberOfActiveQueues) =>
184+
{
185+
Console.Write($"Number of active queues {numberOfActiveQueues}");
186+
})
177187
)
188+
178189
))
179190
.AddTypedHandlers(
180191
handlers => handlers

samples/KafkaFlow.Retry.Sample/Program.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,16 @@ private static async Task Main()
2121
var mongoDbDatabaseName = "kafka_flow_retry_durable_sample";
2222
var mongoDbRetryQueueCollectionName = "RetryQueues";
2323
var mongoDbRetryQueueItemCollectionName = "RetryQueueItems";
24-
var sqlServerConnectionString =
25-
"Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample";
24+
var sqlServerConnectionString = string.Join(
25+
string.Empty,
26+
"Server=localhost;",
27+
"Trusted_Connection=True;",
28+
"Pooling=true;",
29+
"Min Pool Size=1;",
30+
"Max Pool Size=100;",
31+
"MultipleActiveResultSets=true;",
32+
"Application Name=KafkaFlow Retry Tests;"
33+
);
2634
var sqlServerDatabaseName = "kafka_flow_retry_durable_sample";
2735
var topics = new[]
2836
{

src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
105105
return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems);
106106
}
107107

108+
public Task<long> CountQueuesAsync(CountQueuesInput input)
109+
{
110+
throw new NotImplementedException();
111+
}
112+
108113
public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
109114
{
110115
Guard.Argument(input, nameof(input)).NotNull();

src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
107107
}
108108
}
109109

110+
public Task<long> CountQueuesAsync(CountQueuesInput input)
111+
{
112+
throw new NotImplementedException();
113+
}
114+
110115
public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
111116
{
112117
Guard.Argument(input, nameof(input)).NotNull();

src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ internal interface IRetryQueueRepository
1717

1818
Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, string queueGroupKey);
1919

20+
Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus);
21+
2022
Task<IList<RetryQueueDbo>> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top);
2123

2224
Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution);

src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@ public async Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retry
3434
}
3535
}
3636

37+
public async Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus)
38+
{
39+
using (var command = dbConnection.CreateCommand())
40+
{
41+
command.CommandType = CommandType.Text;
42+
command.CommandText =
43+
$@"SELECT COUNT(1)
44+
FROM [{dbConnection.Schema}].[RetryQueues]
45+
WHERE SearchGroupKey = @SearchGroupKey
46+
AND IdStatus = @IdStatus";
47+
48+
command.Parameters.AddWithValue("SearchGroupKey", searchGroupKey);
49+
command.Parameters.AddWithValue("IdStatus", (byte)retryQueueStatus);
50+
51+
return (Int32) await command.ExecuteScalarAsync().ConfigureAwait(false);
52+
}
53+
}
54+
3755
public async Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey,
3856
RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete)
3957
{
@@ -83,9 +101,9 @@ public async Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, strin
83101
command.CommandType = CommandType.Text;
84102
command.CommandText =
85103
$@"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution
86-
FROM [{dbConnection.Schema}].[RetryQueues]
87-
WHERE QueueGroupKey = @QueueGroupKey
88-
ORDER BY Id";
104+
FROM [{dbConnection.Schema}].[RetryQueues]
105+
WHERE QueueGroupKey = @QueueGroupKey
106+
ORDER BY Id";
89107

90108
command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey);
91109

src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,20 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
107107
}
108108
}
109109

110+
public async Task<long> CountQueuesAsync(CountQueuesInput input)
111+
{
112+
Guard.Argument(input, nameof(input)).NotNull();
113+
114+
using (var dbConnection = _connectionProvider.Create(_sqlServerDbSettings))
115+
{
116+
return await _retryQueueRepository.CountQueueAsync(
117+
dbConnection,
118+
input.SearchGroupKey,
119+
input.Status)
120+
.ConfigureAwait(false);
121+
}
122+
}
123+
110124
public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
111125
{
112126
Guard.Argument(input, nameof(input)).NotNull();

src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using Dawn;
5+
using KafkaFlow.Retry.Durable.Definitions.Builders.Polling;
56
using KafkaFlow.Retry.Durable.Definitions.Polling;
67

78
namespace KafkaFlow.Retry;
@@ -11,12 +12,14 @@ public class PollingDefinitionsAggregatorBuilder
1112
private readonly CleanupPollingDefinitionBuilder _cleanupPollingDefinitionBuilder;
1213
private readonly List<PollingDefinition> _pollingDefinitions;
1314
private readonly RetryDurablePollingDefinitionBuilder _retryDurablePollingDefinitionBuilder;
15+
private readonly RetryDurableActiveQueuesCountPollingDefinitionBuilder _retryDurableActiveQueuesCountPollingDefinitionBuilder;
1416
private string _schedulerId;
1517

1618
public PollingDefinitionsAggregatorBuilder()
1719
{
1820
_cleanupPollingDefinitionBuilder = new CleanupPollingDefinitionBuilder();
1921
_retryDurablePollingDefinitionBuilder = new RetryDurablePollingDefinitionBuilder();
22+
_retryDurableActiveQueuesCountPollingDefinitionBuilder = new RetryDurableActiveQueuesCountPollingDefinitionBuilder();
2023

2124
_pollingDefinitions = new List<PollingDefinition>();
2225
}
@@ -47,6 +50,19 @@ public PollingDefinitionsAggregatorBuilder WithRetryDurablePollingConfiguration(
4750
return this;
4851
}
4952

53+
public PollingDefinitionsAggregatorBuilder WithRetryDurableActiveQueuesCountPollingConfiguration(
54+
Action<RetryDurableActiveQueuesCountPollingDefinitionBuilder> configure)
55+
{
56+
Guard.Argument(configure, nameof(configure)).NotNull();
57+
58+
configure(_retryDurableActiveQueuesCountPollingDefinitionBuilder);
59+
var etryDurableActiveQueuesCountPollingDefinition = _retryDurableActiveQueuesCountPollingDefinitionBuilder.Build();
60+
61+
_pollingDefinitions.Add(etryDurableActiveQueuesCountPollingDefinition);
62+
63+
return this;
64+
}
65+
5066
public PollingDefinitionsAggregatorBuilder WithSchedulerId(string schedulerId)
5167
{
5268
_schedulerId = schedulerId;
@@ -65,6 +81,11 @@ internal PollingDefinitionsAggregator Build()
6581
ValidateRequiredPollingDefinition(PollingJobType.Cleanup);
6682
}
6783

84+
if (_retryDurableActiveQueuesCountPollingDefinitionBuilder.Required)
85+
{
86+
ValidateRequiredPollingDefinition(PollingJobType.RetryDurableActiveQueuesCount);
87+
}
88+
6889
return new PollingDefinitionsAggregator(_schedulerId, _pollingDefinitions);
6990
}
7091

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System;
2+
using Dawn;
3+
using KafkaFlow.Retry.Durable.Definitions.Polling;
4+
5+
namespace KafkaFlow.Retry.Durable.Definitions.Builders.Polling;
6+
7+
public class RetryDurableActiveQueuesCountPollingDefinitionBuilder
8+
: PollingDefinitionBuilder<RetryDurableActiveQueuesCountPollingDefinitionBuilder>
9+
{
10+
protected Action<long> ActionToPerform;
11+
12+
internal override bool Required => false;
13+
14+
public RetryDurableActiveQueuesCountPollingDefinitionBuilder Do(Action<long> actionToPerform)
15+
{
16+
Guard.Argument(actionToPerform, nameof(actionToPerform)).NotNull();
17+
18+
ActionToPerform = actionToPerform;
19+
return this;
20+
}
21+
22+
internal RetryDurableActiveQueuesCountPollingDefinition Build()
23+
{
24+
return new RetryDurableActiveQueuesCountPollingDefinition(
25+
IsEnabled,
26+
CronExpression,
27+
ActionToPerform
28+
);
29+
}
30+
}

src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ internal enum PollingJobType
55
Unknown = 0,
66

77
RetryDurable = 1,
8-
Cleanup = 2
8+
Cleanup = 2,
9+
RetryDurableActiveQueuesCount = 3
910
}

0 commit comments

Comments
 (0)