Skip to content

Commit 4ea3d5e

Browse files
refactor: use two, linked buffer blocks instead of a single buffer block
1 parent e9eeece commit 4ea3d5e

File tree

1 file changed

+16
-5
lines changed

1 file changed

+16
-5
lines changed

src/EntityDb.Common/Transactions/Subscribers/ProcessorQueues/BufferBlockTransactionQueue.cs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,16 @@ namespace EntityDb.Common.Transactions.Subscribers.ProcessorQueues;
1111
internal class BufferBlockTransactionQueue<TTransactionProcessor> : BackgroundService, ITransactionProcessorQueue<TTransactionProcessor>
1212
where TTransactionProcessor : ITransactionProcessor
1313
{
14-
private readonly BufferBlock<ITransaction> _transactionQueue = new();
14+
private readonly BufferBlock<ITransaction> _foregroundQueue = new();
15+
private readonly BufferBlock<ITransaction> _backgroundQueue = new();
16+
private readonly IDisposable _link;
1517
private readonly ILogger<BufferBlockTransactionQueue<TTransactionProcessor>> _logger;
1618
private readonly TTransactionProcessor _transactionProcessor;
1719

1820
public BufferBlockTransactionQueue(ILogger<BufferBlockTransactionQueue<TTransactionProcessor>> logger, TTransactionProcessor transactionProcessor)
1921
{
22+
_link = _foregroundQueue.LinkTo(_backgroundQueue);
23+
2024
_logger = logger;
2125
_transactionProcessor = transactionProcessor;
2226
}
@@ -25,18 +29,18 @@ public void Enqueue(ITransaction transaction)
2529
{
2630
_logger.LogInformation("Enqueueing Transaction {TransactionId} to Transaction Queue.", transaction.Id.Value);
2731

28-
var enqueued = _transactionQueue.Post(transaction);
29-
32+
var enqueued = _foregroundQueue.Post(transaction);
33+
3034
_logger.LogInformation("{Enqueued} Transaction {TransactionId} to Transaction Queue.", enqueued, transaction.Id.Value);
3135
}
3236

3337
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
3438
{
35-
while (await _transactionQueue.OutputAvailableAsync(stoppingToken))
39+
while (await _backgroundQueue.OutputAvailableAsync(stoppingToken))
3640
{
3741
try
3842
{
39-
var transaction = await _transactionQueue.ReceiveAsync(stoppingToken);
43+
var transaction = await _backgroundQueue.ReceiveAsync(stoppingToken);
4044

4145
await _transactionProcessor.ProcessTransaction(transaction, stoppingToken);
4246
}
@@ -46,4 +50,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4650
}
4751
}
4852
}
53+
54+
public override void Dispose()
55+
{
56+
base.Dispose();
57+
58+
_link.Dispose();
59+
}
4960
}

0 commit comments

Comments
 (0)