Skip to content

Conversation

@maliming
Copy link
Member

@maliming maliming commented Dec 24, 2024

Resolves #20020


How to add an inbox/outbox worker for specific events?

1. Exclude your event from the default inbox/outbox worker.

public override void ConfigureServices(ServiceConfigurationContext context)
{
    Configure<AbpEventBusBoxesOptions>(options =>
    {
        options.InboxProcessorFilter = x => x.EventName != "MyEventName1" && x.EventName != "MyEventName2";
        options.OutboxProcessorFilter = x => x.EventName != "MyEventName1" && x.EventName != "MyEventName2";
    });
}

2. Add MyEventInboxProcessManager/MyEventOutboxSenderManager as background workers and InboxProcessor/OutboxSender for your events.

Override GetWaitingEventsAsync to query your events.

protected async override Task<List<IncomingEventInfo>> GetWaitingEventsAsync()
{
    return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, x => x.EventName == "MyEventName1" || x.EventName == "MyEventName2", StoppingToken);
}

protected async override Task<List<OutgoingEventInfo>> GetWaitingEventsAsync()
{
    return await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, x => x.EventName == "MyEventName1" || x.EventName == "MyEventName2", StoppingToken);
}
public class MyEventInboxProcessManager : IBackgroundWorker
{
    protected AbpDistributedEventBusOptions Options { get; }
    protected IServiceProvider ServiceProvider { get; }
    protected List<IInboxProcessor> Processors { get; }

    public MyEventInboxProcessManager(
        IOptions<AbpDistributedEventBusOptions> options,
        IServiceProvider serviceProvider)
    {
        ServiceProvider = serviceProvider;
        Options = options.Value;
        Processors = new List<IInboxProcessor>();
    }

    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        foreach (var inboxConfig in Options.Inboxes.Values)
        {
            if (inboxConfig.IsProcessingEnabled)
            {
                var processor = ServiceProvider.GetRequiredService<MyEventInboxProcessor>();
                await processor.StartAsync(inboxConfig, cancellationToken);
                Processors.Add(processor);
            }
        }
    }

    public async Task StopAsync(CancellationToken cancellationToken = default)
    {
        foreach (var processor in Processors)
        {
            await processor.StopAsync(cancellationToken);
        }
    }
}


public class MyEventOutboxSenderManager : IBackgroundWorker
{
    protected AbpDistributedEventBusOptions Options { get; }
    protected IServiceProvider ServiceProvider { get; }
    protected List<IOutboxSender> Senders { get; }

    public MyEventOutboxSenderManager(
        IOptions<AbpDistributedEventBusOptions> options,
        IServiceProvider serviceProvider)
    {
        ServiceProvider = serviceProvider;
        Options = options.Value;
        Senders = new List<IOutboxSender>();
    }

    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        foreach (var outboxConfig in Options.Outboxes.Values)
        {
            if (outboxConfig.IsSendingEnabled)
            {
                var sender = ServiceProvider.GetRequiredService<MyEventOutboxSender>();
                await sender.StartAsync(outboxConfig, cancellationToken);
                Senders.Add(sender);
            }
        }
    }

    public async Task StopAsync(CancellationToken cancellationToken = default)
    {
        foreach (var sender in Senders)
        {
            await sender.StopAsync(cancellationToken);
        }
    }
}

public class MyEventInboxProcessor : InboxProcessor
{
    public MyEventInboxProcessor(
        [NotNull] IServiceProvider serviceProvider,
        [NotNull] AbpAsyncTimer timer,
        [NotNull] IDistributedEventBus distributedEventBus,
        [NotNull] IAbpDistributedLock distributedLock,
        [NotNull] IUnitOfWorkManager unitOfWorkManager,
        [NotNull] IClock clock,
        [NotNull] [ItemNotNull] IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
        : base(serviceProvider, timer, distributedEventBus, distributedLock, unitOfWorkManager, clock, eventBusBoxesOptions)
    {
        DistributedLockName = $"AbpInbox_MyEvent_{InboxConfig.DatabaseName}";
    }

    protected async override Task<List<IncomingEventInfo>> GetWaitingEventsAsync()
    {
        return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, x => x.EventName == "MyEventName1" || x.EventName == "MyEventName2", StoppingToken);
    }
}


public class MyEventOutboxSender : OutboxSender
{
    public MyEventOutboxSender(
        [NotNull] IServiceProvider serviceProvider,
        [NotNull] AbpAsyncTimer timer,
        [NotNull] IDistributedEventBus distributedEventBus,
        [NotNull] IAbpDistributedLock distributedLock,
        [NotNull] [ItemNotNull] IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
        : base(serviceProvider, timer, distributedEventBus, distributedLock, eventBusBoxesOptions)
    {
        DistributedLockName = $"AbpOutbox_MyEvent_{OutboxConfig.DatabaseName}";
    }

    protected async override Task<List<OutgoingEventInfo>> GetWaitingEventsAsync()
    {
        return await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, x => x.EventName == "MyEventName1" || x.EventName == "MyEventName2", StoppingToken);
    }
}

3. Register MyEventInboxProcessManager/MyEventOutboxSenderManager background workers.

public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
    await context.AddBackgroundWorkerAsync<MyEventInboxProcessManager>();
    await context.AddBackgroundWorkerAsync<MyEventOutboxSenderManager>();
}

@maliming maliming marked this pull request as draft December 24, 2024 06:51
@maliming maliming added this to the 9.1-preview milestone Dec 24, 2024
@maliming maliming marked this pull request as ready for review December 24, 2024 07:39
@maliming maliming requested a review from EngincanV December 25, 2024 03:09
@EngincanV EngincanV enabled auto-merge December 26, 2024 08:17
@EngincanV EngincanV merged commit edae8a6 into dev Dec 26, 2024
3 checks passed
@EngincanV EngincanV deleted the GetWaitingEventsAsync branch December 26, 2024 11:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

How to avoid global blocking in distributed event handlers

3 participants