Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions KafkaFlow.Retry.sln
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{9557F908-472F-4872-BCF8-8EC028EFDA9B}"
ProjectSection(SolutionItems) = preProject
Directory.Build.props = Directory.Build.props
README.md = README.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Retry.SchemaRegistry.Sample", "samples\KafkaFlow.Retry.SchemaRegistry.Sample\KafkaFlow.Retry.SchemaRegistry.Sample.csproj", "{510D65E8-B62C-402C-9CE3-47C7055A29FF}"
Expand All @@ -47,9 +48,6 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{0192C262-63AF-4918-B142-EC07DBB9E501}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{18F1AB11-9DC0-442F-9B6D-7098A93727B8}"
ProjectSection(SolutionItems) = preProject
CodeCoverage.runsettings = tests\CodeCoverage.runsettings
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -106,21 +104,21 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{5AE4E956-15A8-4117-9A9D-97B53060FE4C} = {0192C262-63AF-4918-B142-EC07DBB9E501}
{1DF1AB0D-37CB-4AFC-B701-8F0F2B260E54} = {0192C262-63AF-4918-B142-EC07DBB9E501}
{90B43DD5-C3CE-4BF8-B63E-3A34B962DC96} = {0192C262-63AF-4918-B142-EC07DBB9E501}
{9DC50EFE-C511-4DEC-A8EA-199795CEFE01} = {1DF1AB0D-37CB-4AFC-B701-8F0F2B260E54}
{745CB854-1FFE-4C60-AD15-69A3A784CC9F} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
{C61CCF7F-E7C8-4FEF-9E7E-22AB3ECD148D} = {90B43DD5-C3CE-4BF8-B63E-3A34B962DC96}
{F06DD63E-8965-4C43-BEEE-4ABBB5914FF8} = {90B43DD5-C3CE-4BF8-B63E-3A34B962DC96}
{D3664EBB-D77B-42C2-AF90-7B2F3E354C3F} = {5AE4E956-15A8-4117-9A9D-97B53060FE4C}
{F27309CD-D796-425B-B5D6-780B7B57E9C7} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
{9E3B34BA-E309-4DA4-93D4-C0DD72D4711D} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
{A25A5E30-8D5A-40DB-BA21-7A5B4FB44DE0} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
{510D65E8-B62C-402C-9CE3-47C7055A29FF} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
{B14C5859-85C5-4E2F-80C7-D8B29E36481A} = {B1D2A20A-0742-4D8E-A773-66EB95560152}
{2A0BC610-E0FE-4BC3-B232-A8B918BE7381} = {9557F908-472F-4872-BCF8-8EC028EFDA9B}
{B7E4C23D-48DC-4056-8658-19D54AF7008A} = {90B43DD5-C3CE-4BF8-B63E-3A34B962DC96}
{A25A5E30-8D5A-40DB-BA21-7A5B4FB44DE0} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
{9E3B34BA-E309-4DA4-93D4-C0DD72D4711D} = {18F1AB11-9DC0-442F-9B6D-7098A93727B8}
{90B43DD5-C3CE-4BF8-B63E-3A34B962DC96} = {0192C262-63AF-4918-B142-EC07DBB9E501}
{1DF1AB0D-37CB-4AFC-B701-8F0F2B260E54} = {0192C262-63AF-4918-B142-EC07DBB9E501}
{5AE4E956-15A8-4117-9A9D-97B53060FE4C} = {0192C262-63AF-4918-B142-EC07DBB9E501}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A953E534-FBCA-4F30-9CA5-96F67C1A49D8}
Expand Down
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,9 @@ Read our [contributing guidelines](CONTRIBUTING.md) to learn about our developme

## Maintainers

- [Bruno Gomes](https://github.com/brunohfgomes)
- [Carlos Miranda](https://github.com/carlosgoias)
- [Fernando Marins](https://github.com/fernando-a-marins)
- [Leandro Magalhães](https://github.com/spookylsm)
- [Luís Garcês](https://github.com/luispfgarces)
- [Martinho Novais](https://github.com/martinhonovais)
- [Rodrigo Belo](https://github.com/rodrigobelo)
- [Sérgio Ribeiro](https://github.com/sergioamribeiro)

## Get in touch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoDb(
.WithTimeToLiveInDays(60)
.Enabled(true)
)
.WithRetryDurableActiveQueuesCountPollingConfiguration(
configure => configure
.Enabled(true)
.WithCronExpression("0 0/1 * 1/1 * ? *")
.Do((numberOfActiveQueues) =>
{
Console.Write($"Number of mongodb active queues {numberOfActiveQueues}");
})
)
))
.AddTypedHandlers(
handlers => handlers
Expand Down Expand Up @@ -174,7 +183,18 @@ internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer(
.WithCleanupPollingConfiguration(
configure => configure
.Enabled(false)
.WithCronExpression("0 0/1 * 1/1 * ? *")
)
.WithRetryDurableActiveQueuesCountPollingConfiguration(
configure => configure
.Enabled(true)
.WithCronExpression("0 0/1 * 1/1 * ? *")
.Do((numberOfActiveQueues) =>
{
Console.Write($"Number of sql server active queues {numberOfActiveQueues}");
})
)

))
.AddTypedHandlers(
handlers => handlers
Expand Down
197 changes: 104 additions & 93 deletions samples/KafkaFlow.Retry.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ private static async Task Main()
var mongoDbDatabaseName = "kafka_flow_retry_durable_sample";
var mongoDbRetryQueueCollectionName = "RetryQueues";
var mongoDbRetryQueueItemCollectionName = "RetryQueueItems";
var sqlServerConnectionString =
"Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample";
var sqlServerConnectionString = string.Join(
string.Empty,
"Server=localhost;",
"Trusted_Connection=false;",
"TrustServerCertificate=true;",
"Integrated Security=false;",
"Pooling=true;",
"Min Pool Size=1;",
"Max Pool Size=100;",
"MultipleActiveResultSets=true;",
"Application Name=KafkaFlow Retry Tests;",
"Encrypt=false;"
);
var sqlServerDatabaseName = "kafka_flow_retry_durable_sample";
var topics = new[]
{
Expand Down Expand Up @@ -76,106 +87,106 @@ private static async Task Main()
switch (input)
{
case "retry-durable-mongodb":
{
Console.Write("Number of the distinct messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
Console.Write("Number of messages with same partition key: ");
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);
var messages = Enumerable
.Range(0, numOfMessages)
.SelectMany(
x =>
{
var partitionKey = Guid.NewGuid().ToString();
return Enumerable
.Range(0, numOfMessagesWithSamePartitionkey)
.Select(y => new BatchProduceItem(
"sample-kafka-flow-retry-durable-mongodb-topic",
partitionKey,
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
null))
.ToList();
}
)
.ToList();

await producers["kafka-flow-retry-durable-mongodb-producer"]
.BatchProduceAsync(messages)
.ConfigureAwait(false);
Console.WriteLine("Published");
}
{
Console.Write("Number of the distinct messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
Console.Write("Number of messages with same partition key: ");
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);
var messages = Enumerable
.Range(0, numOfMessages)
.SelectMany(
x =>
{
var partitionKey = Guid.NewGuid().ToString();
return Enumerable
.Range(0, numOfMessagesWithSamePartitionkey)
.Select(y => new BatchProduceItem(
"sample-kafka-flow-retry-durable-mongodb-topic",
partitionKey,
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
null))
.ToList();
}
)
.ToList();

await producers["kafka-flow-retry-durable-mongodb-producer"]
.BatchProduceAsync(messages)
.ConfigureAwait(false);
Console.WriteLine("Published");
}
break;

case "retry-durable-sqlserver":
{
Console.Write("Number of the distinct messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
Console.Write("Number of messages with same partition key: ");
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);

var messages = Enumerable
.Range(0, numOfMessages)
.SelectMany(
x =>
{
var partitionKey = Guid.NewGuid().ToString();
return Enumerable
.Range(0, numOfMessagesWithSamePartitionkey)
.Select(y => new BatchProduceItem(
"sample-kafka-flow-retry-durable-sqlserver-topic",
partitionKey,
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
null))
.ToList();
}
)
.ToList();

await producers["kafka-flow-retry-durable-sqlserver-producer"]
.BatchProduceAsync(messages)
.ConfigureAwait(false);
Console.WriteLine("Published");
}
{
Console.Write("Number of the distinct messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
Console.Write("Number of messages with same partition key: ");
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);

var messages = Enumerable
.Range(0, numOfMessages)
.SelectMany(
x =>
{
var partitionKey = Guid.NewGuid().ToString();
return Enumerable
.Range(0, numOfMessagesWithSamePartitionkey)
.Select(y => new BatchProduceItem(
"sample-kafka-flow-retry-durable-sqlserver-topic",
partitionKey,
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
null))
.ToList();
}
)
.ToList();

await producers["kafka-flow-retry-durable-sqlserver-producer"]
.BatchProduceAsync(messages)
.ConfigureAwait(false);
Console.WriteLine("Published");
}
break;

case "retry-forever":
{
Console.Write("Number of messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
await producers["kafka-flow-retry-forever-producer"]
.BatchProduceAsync(
Enumerable
.Range(0, numOfMessages)
.Select(
x => new BatchProduceItem(
"sample-kafka-flow-retry-forever-topic",
"partition-key",
new RetryForeverTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
null))
.ToList())
.ConfigureAwait(false);
Console.WriteLine("Published");
}
{
Console.Write("Number of messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
await producers["kafka-flow-retry-forever-producer"]
.BatchProduceAsync(
Enumerable
.Range(0, numOfMessages)
.Select(
x => new BatchProduceItem(
"sample-kafka-flow-retry-forever-topic",
"partition-key",
new RetryForeverTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
null))
.ToList())
.ConfigureAwait(false);
Console.WriteLine("Published");
}
break;

case "retry-simple":
{
Console.Write("Number of messages to produce:");
int.TryParse(Console.ReadLine(), out var numOfMessages);
await producers["kafka-flow-retry-simple-producer"]
.BatchProduceAsync(
Enumerable
.Range(0, numOfMessages)
.Select(
x => new BatchProduceItem(
"sample-kafka-flow-retry-simple-topic",
"partition-key",
new RetrySimpleTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
null))
.ToList())
.ConfigureAwait(false);
Console.WriteLine("Published");
}
{
Console.Write("Number of messages to produce:");
int.TryParse(Console.ReadLine(), out var numOfMessages);
await producers["kafka-flow-retry-simple-producer"]
.BatchProduceAsync(
Enumerable
.Range(0, numOfMessages)
.Select(
x => new BatchProduceItem(
"sample-kafka-flow-retry-simple-topic",
"partition-key",
new RetrySimpleTestMessage { Text = $"Message({x}): {Guid.NewGuid()}" },
null))
.ToList())
.ConfigureAwait(false);
Console.WriteLine("Published");
}
break;

case "exit":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace KafkaFlow.Retry.MongoDb.Repositories;

internal interface IRetryQueueRepository
{
Task<long> CountQueuesAsync(string searchGroupKey, RetryQueueStatus status);

Task<DeleteQueuesResult> DeleteQueuesAsync(IEnumerable<Guid> queueIds);

Task<RetryQueueDbo> GetQueueAsync(string queueGroupKey);
Expand Down
10 changes: 10 additions & 0 deletions src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public RetryQueueRepository(DbContext dbContext)
_dbContext = dbContext;
}

public async Task<long> CountQueuesAsync(string searchGroupKey, RetryQueueStatus status)
{
var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters();

var findFilter = queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey)
& queuesFilterBuilder.Eq(q => q.Status, status);

return await _dbContext.RetryQueues.CountDocumentsAsync(findFilter).ConfigureAwait(false);
}

public async Task<DeleteQueuesResult> DeleteQueuesAsync(IEnumerable<Guid> queueIds)
{
var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters();
Expand Down
11 changes: 11 additions & 0 deletions src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems);
}

public async Task<long> CountQueuesAsync(CountQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();

return await _retryQueueRepository
.CountQueuesAsync(
input.SearchGroupKey,
input.Status)
.ConfigureAwait(false);
}

public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();
Expand Down
5 changes: 5 additions & 0 deletions src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
}
}

public Task<long> CountQueuesAsync(CountQueuesInput input)
{
throw new NotImplementedException();
}

public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();
Expand Down
Loading
Loading