66using EntityDb . Common . Entities ;
77using EntityDb . Common . Projections ;
88using EntityDb . Common . Transactions . Builders ;
9- using EntityDb . Common . Transactions . Processors ;
109using EntityDb . Common . Transactions . Subscribers ;
10+ using EntityDb . Common . Transactions . Subscribers . ProcessorQueues ;
11+ using EntityDb . Common . Transactions . Subscribers . Processors ;
1112using EntityDb . Common . TypeResolvers ;
1213using Microsoft . Extensions . DependencyInjection ;
1314using System ;
1415using System . Diagnostics . CodeAnalysis ;
1516using System . Reflection ;
17+ using System . Threading . Tasks . Dataflow ;
1618
1719namespace EntityDb . Common . Extensions ;
1820
@@ -21,34 +23,24 @@ namespace EntityDb.Common.Extensions;
2123/// </summary>
2224public static class ServiceCollectionExtensions
2325{
24- [ ExcludeFromCodeCoverage ( Justification = "Don't need coverage for non-test mode." ) ]
25- private static void AddSnapshotTransactionProcessor < TTransactionProcessor > (
26- this IServiceCollection serviceCollection , bool testMode ,
27- Func < IServiceProvider , TTransactionProcessor > transactionProcessorFactory )
26+ private static void AddTestModeTransactionProcessorQueue < TTransactionProcessor > ( this IServiceCollection serviceCollection )
2827 where TTransactionProcessor : ITransactionProcessor
2928 {
30- serviceCollection . AddSingleton ( serviceProvider =>
31- {
32- var transactionProcessor = transactionProcessorFactory . Invoke ( serviceProvider ) ;
29+ serviceCollection . AddSingleton < ITransactionProcessorQueue < TTransactionProcessor > , TestModeTransactionQueue < TTransactionProcessor > > ( ) ;
30+ }
3331
34- return TransactionProcessorSubscriber < TTransactionProcessor > . Create ( serviceProvider , transactionProcessor ,
35- testMode ) ;
36- } ) ;
32+ [ ExcludeFromCodeCoverage ( Justification = "Tests are only meant to run in test mode." ) ]
33+ private static void AddBufferBlockTransactionProcessorQueue < TTransactionProcessor > ( this IServiceCollection serviceCollection )
34+ where TTransactionProcessor : ITransactionProcessor
35+ {
36+ serviceCollection . AddSingleton < BufferBlockTransactionQueue < TTransactionProcessor > > ( ) ;
3737
38- serviceCollection . AddSingleton < ITransactionSubscriber > (
39- serviceProvider =>
40- serviceProvider . GetRequiredService < TransactionProcessorSubscriber < TTransactionProcessor > > ( )
41- ) ;
38+ serviceCollection . AddSingleton < ITransactionProcessorQueue < TTransactionProcessor > > ( serviceProvider => serviceProvider . GetRequiredService < BufferBlockTransactionQueue < TTransactionProcessor > > ( ) ) ;
4239
43- if ( testMode )
44- {
45- return ;
46- }
40+ serviceCollection . AddHostedService ( serviceProvider =>
41+ serviceProvider . GetRequiredService < BufferBlockTransactionQueue < TTransactionProcessor > > ( ) ) ;
4742
48- serviceCollection . AddHostedService (
49- serviceProvider =>
50- serviceProvider . GetRequiredService < TransactionProcessorSubscriber < TTransactionProcessor > > ( )
51- ) ;
43+ serviceCollection . AddSingleton < ITransactionSubscriber , TransactionProcessorSubscriber < TTransactionProcessor > > ( ) ;
5244 }
5345
5446 internal static void Add < TService > ( this IServiceCollection serviceCollection , ServiceLifetime serviceLifetime ,
@@ -64,6 +56,39 @@ internal static void Add<TService>(this IServiceCollection serviceCollection, Se
6456 serviceCollection . Add ( new ServiceDescriptor ( typeof ( TService ) , typeof ( TService ) , serviceLifetime ) ) ;
6557 }
6658
59+ /// <summary>
60+ /// Registers the transaction processor provided, along with a transaction processor subscriber,
61+ /// and a transaction processor queue. For test mode, the queue is not actually a queue and will
62+ /// immediately process the transaction. For non-test mode, the queue uses a <see cref="BufferBlock{ITransaction}"/>.
63+ /// </summary>
64+ /// <typeparam name="TTransactionProcessor">The type of the transaction processor.</typeparam>
65+ /// <param name="serviceCollection">The service collection.</param>
66+ /// <param name="testMode">Wether or not to run in test mode.</param>
67+ /// <param name="transactionProcessorFactory">A factory for creating the transaction processor.</param>
68+ [ ExcludeFromCodeCoverage ( Justification = "Tests are only meant to run in test mode." ) ]
69+ public static void AddTransactionProcessorSubscriber < TTransactionProcessor > ( this IServiceCollection serviceCollection ,
70+ bool testMode , Func < IServiceProvider , TTransactionProcessor > transactionProcessorFactory )
71+ where TTransactionProcessor : class , ITransactionProcessor
72+ {
73+ serviceCollection . AddSingleton ( transactionProcessorFactory . Invoke ) ;
74+
75+ serviceCollection . AddSingleton < TransactionProcessorSubscriber < TTransactionProcessor > > ( ) ;
76+
77+ serviceCollection . AddSingleton < ITransactionSubscriber > (
78+ serviceProvider =>
79+ serviceProvider . GetRequiredService < TransactionProcessorSubscriber < TTransactionProcessor > > ( )
80+ ) ;
81+
82+ if ( testMode )
83+ {
84+ serviceCollection . AddTestModeTransactionProcessorQueue < TTransactionProcessor > ( ) ;
85+ }
86+ else
87+ {
88+ serviceCollection . AddBufferBlockTransactionProcessorQueue < TTransactionProcessor > ( ) ;
89+ }
90+ }
91+
6792 /// <summary>
6893 /// Adds an internal implementation of <see cref="IPartialTypeResolver" /> which resolves types by using assembly
6994 /// information.
@@ -137,7 +162,7 @@ public static void AddEntitySnapshotTransactionSubscriber<TEntity>(this IService
137162 string transactionSessionOptionsName , string snapshotSessionOptionsName , bool testMode = false )
138163 where TEntity : IEntity < TEntity >
139164 {
140- serviceCollection . AddSnapshotTransactionProcessor ( testMode , serviceProvider => EntitySnapshotTransactionProcessor < TEntity > . Create (
165+ serviceCollection . AddTransactionProcessorSubscriber ( testMode , serviceProvider => EntitySnapshotTransactionProcessor < TEntity > . Create (
141166 serviceProvider , transactionSessionOptionsName , snapshotSessionOptionsName ) ) ;
142167 }
143168
@@ -167,7 +192,7 @@ public static void AddProjectionSnapshotTransactionSubscriber<TProjection>(
167192 string transactionSessionOptionsName , string snapshotSessionOptionsName , bool testMode = false )
168193 where TProjection : IProjection < TProjection >
169194 {
170- serviceCollection . AddSnapshotTransactionProcessor ( testMode , serviceProvider => ProjectionSnapshotTransactionProcessor < TProjection > . Create (
195+ serviceCollection . AddTransactionProcessorSubscriber ( testMode , serviceProvider => ProjectionSnapshotTransactionProcessor < TProjection > . Create (
171196 serviceProvider , transactionSessionOptionsName , snapshotSessionOptionsName ) ) ;
172197 }
173198}
0 commit comments