Skip to content

Commit 449e060

Browse files
committed
feat:init event
1 parent 3fb1fea commit 449e060

File tree

62 files changed

+895
-7
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+895
-7
lines changed

Masa.Framework.sln

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Utils.DynamicsCrm.Core
625625
EndProject
626626
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Utils.DynamicsCrm.EntityFrameworkCore", "src\Utils\DynamicsCrm\Masa.Utils.DynamicsCrm.EntityFrameworkCore\Masa.Utils.DynamicsCrm.EntityFrameworkCore.csproj", "{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}"
627627
EndProject
628+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Framework.EventApiTest", "test\Masa.Framework.EventApiTest\Masa.Framework.EventApiTest.csproj", "{345F5F33-8788-43F3-9005-B3EEBC007C72}"
629+
EndProject
628630
Global
629631
GlobalSection(SolutionConfigurationPlatforms) = preSolution
630632
Debug|Any CPU = Debug|Any CPU
@@ -2185,6 +2187,14 @@ Global
21852187
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|Any CPU.Build.0 = Release|Any CPU
21862188
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|x64.ActiveCfg = Release|Any CPU
21872189
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|x64.Build.0 = Release|Any CPU
2190+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
2191+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|Any CPU.Build.0 = Debug|Any CPU
2192+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|x64.ActiveCfg = Debug|Any CPU
2193+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|x64.Build.0 = Debug|Any CPU
2194+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|Any CPU.ActiveCfg = Release|Any CPU
2195+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|Any CPU.Build.0 = Release|Any CPU
2196+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|x64.ActiveCfg = Release|Any CPU
2197+
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|x64.Build.0 = Release|Any CPU
21882198
EndGlobalSection
21892199
GlobalSection(SolutionProperties) = preSolution
21902200
HideSolutionNode = FALSE
@@ -2493,6 +2503,7 @@ Global
24932503
{64B54122-44F1-4379-9422-953EF706A3A6} = {5944A182-13B8-4DA6-AEE2-0A01E64A9648}
24942504
{83310F46-E1C7-4438-B32A-9F6F7EA13FCF} = {64B54122-44F1-4379-9422-953EF706A3A6}
24952505
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC} = {64B54122-44F1-4379-9422-953EF706A3A6}
2506+
{345F5F33-8788-43F3-9005-B3EEBC007C72} = {E747043D-81E2-4A89-8B5B-1258ED45F941}
24962507
EndGlobalSection
24972508
GlobalSection(ExtensibilityGlobals) = postSolution
24982509
SolutionGuid = {40383055-CC50-4600-AD9A-53C14F620D03}

src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,16 @@ Task SaveEventAsync(
3737

3838
Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default);
3939

40+
Task BulkMarkEventAsPublishedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default);
41+
4042
Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default);
4143

44+
Task BulkMarkEventAsInProgressAsync(IEnumerable<Guid> eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default);
45+
4246
Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default);
4347

48+
Task BulkMarkEventAsFailedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default);
49+
4450
/// <summary>
4551
/// Delete successfully published and expired data
4652
/// </summary>

src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr/Publisher.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,59 @@ public async Task PublishAsync<T>(
6767
@event);
6868
}
6969
}
70+
71+
public async Task BulkPublishAsync<T>(
72+
string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events,
73+
CancellationToken stoppingToken = default)
74+
{
75+
76+
_logger?.LogDebug("-----BulkPublishEvent Integration event publishing is in progress from {AppId} with DaprAppId as '{DaprAppId}'", _appId,
77+
_daprAppId);
78+
79+
if (!@events.Any())
80+
return;
81+
82+
MasaArgumentException.ThrowIfNullOrWhiteSpace(_daprAppId);
83+
84+
var masaCloudEvents = new List<MasaCloudEvent<IntegrationEventMessage>>();
85+
var waitEvents = new List<T>();
86+
87+
@events.ForEach(item =>
88+
{
89+
if (item.eventMessageExpand is { Isolation.Count: > 0 })
90+
{
91+
var eventMessage = new IntegrationEventMessage(item.@event, item.eventMessageExpand);
92+
var masaCloudEvent = new MasaCloudEvent<IntegrationEventMessage>(eventMessage)
93+
{
94+
Source = new Uri(_daprAppId, UriKind.RelativeOrAbsolute)
95+
};
96+
97+
masaCloudEvents.Add(masaCloudEvent);
98+
}
99+
else
100+
{
101+
waitEvents.Add(item.@event);
102+
}
103+
});
104+
105+
if (masaCloudEvents.Any())
106+
{
107+
await DaprClient.PublishEventAsync(_pubSubName, topicName, masaCloudEvents, stoppingToken);
108+
_logger?.LogDebug(
109+
"-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
110+
_appId,
111+
_daprAppId,
112+
masaCloudEvents);
113+
}
114+
115+
if (waitEvents.Any())
116+
{
117+
await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, @events.ToList(), cancellationToken: stoppingToken);
118+
_logger?.LogDebug(
119+
"-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
120+
_appId,
121+
_daprAppId,
122+
@events);
123+
}
124+
}
70125
}

src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,25 @@ public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellati
106106
}, cancellationToken);
107107
}
108108

109+
public Task BulkMarkEventAsPublishedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
110+
{
111+
return BulkUpdateEventStatus(eventIds, IntegrationEventStates.Published, eventLogs =>
112+
{
113+
eventLogs.ForEach(eventLog =>
114+
{
115+
if (eventLog.State != IntegrationEventStates.InProgress)
116+
{
117+
_logger?.LogWarning(
118+
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
119+
IntegrationEventStates.Published, eventLog.State, eventLog.Id);
120+
throw new UserFriendlyException(
121+
$"Failed to modify the state of the local message table to {IntegrationEventStates.Published}, the current State is {eventLog.State}, Id: {eventLog.Id}");
122+
}
123+
});
124+
125+
}, cancellationToken);
126+
}
127+
109128
public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default)
110129
{
111130
return UpdateEventStatus(eventId, IntegrationEventStates.InProgress, eventLog =>
@@ -132,6 +151,35 @@ public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, C
132151
}, cancellationToken);
133152
}
134153

154+
public Task BulkMarkEventAsInProgressAsync(IEnumerable<Guid> eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default)
155+
{
156+
return BulkUpdateEventStatus(eventIds, IntegrationEventStates.InProgress, eventLogs =>
157+
{
158+
eventLogs.ForEach(eventLog =>
159+
{
160+
if (eventLog.State is IntegrationEventStates.InProgress or IntegrationEventStates.PublishedFailed &&
161+
(eventLog.GetCurrentTime() - eventLog.ModificationTime).TotalSeconds < minimumRetryInterval)
162+
{
163+
_logger?.LogInformation(
164+
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}, Multitasking execution error, waiting for the next retry",
165+
IntegrationEventStates.InProgress, eventLog.State, eventLog.Id);
166+
throw new UserFriendlyException(
167+
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}, Multitasking execution error, waiting for the next retry");
168+
}
169+
if (eventLog.State != IntegrationEventStates.NotPublished &&
170+
eventLog.State != IntegrationEventStates.InProgress &&
171+
eventLog.State != IntegrationEventStates.PublishedFailed)
172+
{
173+
_logger?.LogWarning(
174+
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
175+
IntegrationEventStates.InProgress, eventLog.State, eventLog.Id);
176+
throw new UserFriendlyException(
177+
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}");
178+
}
179+
});
180+
}, cancellationToken);
181+
}
182+
135183
public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default)
136184
{
137185
return UpdateEventStatus(eventId, IntegrationEventStates.PublishedFailed, eventLog =>
@@ -147,6 +195,24 @@ public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationT
147195
}, cancellationToken);
148196
}
149197

198+
public Task BulkMarkEventAsFailedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
199+
{
200+
return BulkUpdateEventStatus(eventIds, IntegrationEventStates.PublishedFailed, eventLogs =>
201+
{
202+
eventLogs.ForEach(eventLog =>
203+
{
204+
if (eventLog.State != IntegrationEventStates.InProgress)
205+
{
206+
_logger?.LogWarning(
207+
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
208+
IntegrationEventStates.PublishedFailed, eventLog.State, eventLog.Id);
209+
throw new UserFriendlyException(
210+
$"Failed to modify the state of the local message table to {IntegrationEventStates.PublishedFailed}, the current State is {eventLog.State}, Id: {eventLog.Id}");
211+
}
212+
});
213+
}, cancellationToken);
214+
}
215+
150216
public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, CancellationToken token = default)
151217
{
152218
var eventLogs = _eventLogContext.EventLogs.Where(e => e.ModificationTime < expiresAt && e.State == IntegrationEventStates.Published)
@@ -164,6 +230,51 @@ public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, Cancell
164230
}
165231
}
166232

233+
private async Task BulkUpdateEventStatus(IEnumerable<Guid> eventIds,
234+
IntegrationEventStates status,
235+
Action<List<IntegrationEventLog>>? action = null,
236+
CancellationToken cancellationToken = default)
237+
{
238+
var eventLogEntrys =
239+
await _eventLogContext.EventLogs.Where(e => eventIds.Contains(e.EventId)).ToListAsync();
240+
if (eventLogEntrys == null || !eventLogEntrys.Any())
241+
throw new ArgumentException(
242+
$"The local message record does not exist, please confirm whether the local message record has been deleted or other reasons cause the local message record to not be inserted successfully In EventId: {eventIds}",
243+
nameof(eventIds));
244+
245+
action?.Invoke(eventLogEntrys);
246+
247+
var updateEventLogEntry = new List<IntegrationEventLog>();
248+
foreach (var eventLogEntry in eventLogEntrys)
249+
{
250+
if (eventLogEntry.State == status)
251+
{
252+
continue;
253+
}
254+
255+
eventLogEntry.State = status;
256+
eventLogEntry.ModificationTime = eventLogEntry.GetCurrentTime();
257+
258+
if (status == IntegrationEventStates.InProgress)
259+
eventLogEntry.TimesSent++;
260+
261+
updateEventLogEntry.Add(eventLogEntry);
262+
}
263+
264+
_eventLogContext.EventLogs.UpdateRange(updateEventLogEntry);
265+
266+
try
267+
{
268+
await _eventLogContext.DbContext.SaveChangesAsync(cancellationToken);
269+
}
270+
catch (DbUpdateConcurrencyException ex)
271+
{
272+
throw new UserFriendlyException($"Concurrency conflict, update exception. {ex.Message}");
273+
}
274+
275+
updateEventLogEntry.ForEach(CheckAndDetached);
276+
}
277+
167278
private async Task UpdateEventStatus(Guid eventId,
168279
IntegrationEventStates status,
169280
Action<IntegrationEventLog>? action = null,

src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IPublisher.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) MASA Stack All rights reserved.
1+
// Copyright (c) MASA Stack All rights reserved.
22
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.
33

44
namespace Masa.Contrib.Dispatcher.IntegrationEvents;
@@ -10,4 +10,8 @@ Task PublishAsync<T>(
1010
T @event,
1111
IntegrationEventExpand? eventMessageExpand,
1212
CancellationToken stoppingToken = default);
13+
14+
Task BulkPublishAsync<T>(
15+
string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events,
16+
CancellationToken stoppingToken = default);
1317
}

src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Internal/LocalQueueProcessor.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public static void SetLogger(IServiceCollection services)
2222
public void AddJobs(IntegrationEventLogItem items)
2323
=> _retryEventLogs.TryAdd(items.EventId, items);
2424

25+
public void BulkAddJobs(List<IntegrationEventLogItem> items)
26+
=> items.ForEach(item => _retryEventLogs.TryAdd(item.EventId, item));
27+
2528
public void RemoveJobs(Guid eventId)
2629
=> _retryEventLogs.TryRemove(eventId, out _);
2730

src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/SendByDataProcessor.cs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,68 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync(
3535
_options.Value.BatchSize,
3636
stoppingToken);
3737

38-
if(!retrieveEventLogs.Any())
38+
if (!retrieveEventLogs.Any())
39+
return;
40+
41+
var publisher = serviceProvider.GetRequiredService<IPublisher>();
42+
var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic)
43+
.Select(eventLog => new
44+
{
45+
TopicName = eventLog.Key,
46+
Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(),
47+
}).ToList();
48+
49+
foreach (var eventLog in retrieveEventLogsGroupByTopic)
50+
{
51+
var eventIds = eventLog.Events.Select(item => item.EventId);
52+
var events = eventLog.Events.Select(item => (item.Event, item.EventExpand)).ToList();
53+
54+
try
55+
{
56+
await eventLogService.BulkMarkEventAsInProgressAsync(eventIds, _options.Value.MinimumRetryInterval, stoppingToken);
57+
58+
_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
59+
eventLog,
60+
eventLog.TopicName);
61+
62+
await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken);
63+
64+
await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken);
65+
}
66+
catch (UserFriendlyException)
67+
{
68+
//Update state due to multitasking contention, no processing required
69+
}
70+
catch (Exception ex)
71+
{
72+
_logger?.LogError(ex,
73+
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
74+
eventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
75+
await eventLogService.BulkMarkEventAsFailedAsync(eventIds, stoppingToken);
76+
77+
var integrationEventLogItem = eventLog.Events.Select(item =>
78+
new IntegrationEventLogItem(item.EventId, eventLog.TopicName, item.Event, item.EventExpand)).ToList();
79+
80+
LocalQueueProcessor.Default.BulkAddJobs(integrationEventLogItem);
81+
}
82+
}
83+
}
84+
85+
[Obsolete]
86+
private async Task ExecuteByObsoleteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
87+
{
88+
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
89+
if (unitOfWork != null)
90+
unitOfWork.UseTransaction = false;
91+
92+
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();
93+
94+
var retrieveEventLogs =
95+
await eventLogService.RetrieveEventLogsPendingToPublishAsync(
96+
_options.Value.BatchSize,
97+
stoppingToken);
98+
99+
if (!retrieveEventLogs.Any())
39100
return;
40101

41102
var publisher = serviceProvider.GetRequiredService<IPublisher>();
@@ -65,7 +126,8 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync(
65126
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
66127
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);
67128

68-
LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event, eventLog.EventExpand));
129+
LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event,
130+
eventLog.EventExpand));
69131
}
70132
}
71133
}

src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) MASA Stack All rights reserved.
1+
// Copyright (c) MASA Stack All rights reserved.
22
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.
33

44
namespace Masa.Contrib.Dispatcher.IntegrationEvents.Tests.Infrastructure;
@@ -47,4 +47,19 @@ public Task SaveEventAsync(
4747
{
4848
return Task.CompletedTask;
4949
}
50+
51+
public Task BulkMarkEventAsPublishedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
52+
{
53+
return Task.CompletedTask;
54+
}
55+
56+
public Task BulkMarkEventAsInProgressAsync(IEnumerable<Guid> eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default)
57+
{
58+
return Task.CompletedTask;
59+
}
60+
61+
public Task BulkMarkEventAsFailedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
62+
{
63+
return Task.CompletedTask;
64+
}
5065
}

src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Options/DispatcherOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ public DispatcherOptions(IServiceCollection services, Assembly[] assemblies)
2828
.Where(type => type.IsClass && typeof(IEvent).IsAssignableFrom(type))
2929
.ToList();
3030

31-
allEventTypes.AddRange(GetGenericTypeEventType(assemblies));
31+
allEventTypes.AddRange(GetGenericEventType(assemblies));
3232

3333
UnitOfWorkRelation = allEventTypes.ToDictionary(type => type, IsSupportUnitOfWork);
3434
}
3535

36-
private List<Type> GetGenericTypeEventType(Assembly[] assemblies)
36+
private List<Type> GetGenericEventType(Assembly[] assemblies)
3737
{
3838
var methods = assemblies
3939
.SelectMany(assembly => assembly.GetTypes().SelectMany(method => method.GetMethods()))

0 commit comments

Comments
 (0)