Skip to content

Commit bf21faf

Browse files
feat: add Postgres dev
1 parent 8c72a9b commit bf21faf

File tree

5 files changed

+35
-5
lines changed

5 files changed

+35
-5
lines changed

src/KafkaFlow.Retry.Postgres/Repositories/IRetryQueueRepository.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ namespace KafkaFlow.Retry.Postgres.Repositories;
1010
internal interface IRetryQueueRepository
1111
{
1212
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo);
13-
13+
14+
Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus);
15+
1416
Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete);
1517

1618
Task<bool> ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey);

src/KafkaFlow.Retry.Postgres/Repositories/RetryQueueRepository.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,24 @@ public async Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retry
3333
}
3434
}
3535

36+
public async Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus)
37+
{
38+
using (var command = dbConnection.CreateCommand())
39+
{
40+
command.CommandType = CommandType.Text;
41+
command.CommandText =
42+
$@"SELECT COUNT(1)
43+
FROM retry_queues
44+
WHERE SearchGroupKey = @SearchGroupKey
45+
AND IdStatus = @IdStatus";
46+
47+
command.Parameters.AddWithValue("SearchGroupKey", searchGroupKey);
48+
command.Parameters.AddWithValue("IdStatus", (byte)retryQueueStatus);
49+
50+
return Convert.ToInt64(await command.ExecuteScalarAsync().ConfigureAwait(false));
51+
}
52+
}
53+
3654
public async Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey,
3755
RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete)
3856
{

src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,18 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
107107
}
108108
}
109109

110-
public Task<long> CountQueuesAsync(CountQueuesInput input)
110+
public async Task<long> CountQueuesAsync(CountQueuesInput input)
111111
{
112-
throw new NotImplementedException();
112+
Guard.Argument(input, nameof(input)).NotNull();
113+
114+
using (var dbConnection = _connectionProvider.Create(_postgresDbSettings))
115+
{
116+
return await _retryQueueRepository.CountQueueAsync(
117+
dbConnection,
118+
input.SearchGroupKey,
119+
input.Status)
120+
.ConfigureAwait(false);
121+
}
113122
}
114123

115124
public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)

src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ internal interface IRetryQueueRepository
1111
{
1212
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo);
1313

14+
Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus);
15+
1416
Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete);
1517

1618
Task<bool> ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey);
1719

1820
Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, string queueGroupKey);
1921

20-
Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus);
21-
2222
Task<IList<RetryQueueDbo>> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top);
2323

2424
Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution);

tests/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CountQueuesTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public CountQueuesTests(BootstrapperRepositoryFixture bootstrapperRepositoryFixt
1818
[Theory]
1919
[InlineData(RepositoryType.MongoDb)]
2020
[InlineData(RepositoryType.SqlServer)]
21+
[InlineData(RepositoryType.Postgres)]
2122
public async Task f(RepositoryType repositoryType)
2223
{
2324
// Arrange

0 commit comments

Comments
 (0)