Skip to content

Commit 7ac211c

Browse files
committed
refactor(workflows-service): remove unnecessary middleware and update port config
- Remove MetricsAuthMiddleware configuration from WorkerAppModule - Change retrieved port from 'PORT' to 'WORKER_PORT' in main.worker.ts - Update BullBoardAuthMiddleware usage in WebhooksModule configuration fix(queue): refactor redis connection handling in QueueService - Remove Redis connection initialization from constructor - Inject Redis client through dependency injection - Improve code structure by eliminating unnecessary methods refactor(queue): streamline queue registration process and improve imports - Remove redundant queue registration lines in AlertQueueService - Integrate BullBoard instance creation within QueueModule - Simplify dependency handling in WebhooksService and related modules refactor(queue): improve alert queue service and worker registration - Simplify dependency injection for services in AlertQueueService - Replace queue setup logic with createQueue method and refactor worker processing - Eliminate unused code and streamline error handling in webhook job processing feat(queue): implement job scheduler in alert queue service - Add job scheduling setup for alert-check jobs with specific intervals - Update relevant imports and refactor the QueueService to utilize RedisService - Enhance IQueueService interface with additional documentation and job-related methods refactor(queue): simplify queue configuration and job setup - Remove unnecessary injection of BullBoard instance in AlertQueueService - Update job options to reduce default attempts for better performance - Enhance queue service methods for clearer job scheduling and worker registration fix(webhook): correct forceDirect logic in webhook callers - Update forceDirect assignment to explicitly check for enabled state - Ensure consistent behavior for document and workflow state changes add feautres refactor(queue): streamline job options and improve queue management - Remove deprecated job option settings for consistency - Introduce default job options in queue creation function - Add validation to ensure valid queue names are provided
1 parent 4b8d1a2 commit 7ac211c

15 files changed

+309
-315
lines changed

pnpm-lock.yaml

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
2-
import { Job } from 'bullmq';
32
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
43
import { AlertService } from './alert.service';
5-
import { QueueService } from '@/common/queue/queue.service';
6-
import { QueueBullboardService } from '@/common/queue/queue-bullboard.service';
7-
import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service';
8-
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types';
9-
import type { BullBoardInjectedInstance } from '@/common/queue/types';
4+
import type { IQueueService } from '@/common/queue/types';
105
import { env } from '@/env';
116

127
export interface AlertCheckJobData extends Record<string, unknown> {
@@ -21,53 +16,34 @@ export class AlertQueueService implements OnModuleInit {
2116
constructor(
2217
private readonly logger: AppLoggerService,
2318
private readonly alertService: AlertService,
24-
private readonly queueService: QueueService,
25-
private readonly queueBullboardService: QueueBullboardService,
26-
private readonly bullMQPrometheusService: BullMQPrometheusService,
27-
@Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN)
28-
private bullBoard: BullBoardInjectedInstance,
19+
@Inject('IQueueService') private readonly queueService: IQueueService,
2920
) {}
3021

3122
async onModuleInit() {
3223
if (!env.QUEUE_SYSTEM_ENABLED) {
3324
return;
3425
}
26+
3527
await this.setupAlertQueue();
3628
}
3729

3830
private async setupAlertQueue() {
3931
try {
40-
const queue = this.queueService.getQueue<AlertCheckJobData>({
41-
name: this.QUEUE_NAME,
42-
jobOptions: {
43-
attempts: 10,
44-
backoff: {
45-
type: 'exponential',
46-
delay: 10_000,
47-
},
48-
removeOnComplete: { count: 100, age: 3600 * 24 },
49-
removeOnFail: false,
32+
await this.queueService.setupJobScheduler(
33+
this.QUEUE_NAME,
34+
this.SCHEDULER_ID,
35+
{ every: 60 * 60 * 1000 },
36+
{
37+
name: 'alert-check',
38+
data: { timestamp: Date.now() },
5039
},
51-
});
52-
53-
this.bullMQPrometheusService.registerQueue(queue);
54-
55-
if (this.queueService.isWorkerEnabled()) {
56-
this.queueBullboardService.registerQueue(this.bullBoard, queue);
57-
}
58-
59-
await this.queueService.setupJobScheduler(queue, this.SCHEDULER_ID, {
60-
every: 60 * 60 * 1000,
61-
jobName: 'check-transaction-monitoring-alerts',
62-
data: { timestamp: Date.now() },
63-
jobOptions: {
64-
attempts: 10,
65-
backoff: {
66-
type: 'exponential',
67-
delay: 10_000,
40+
{
41+
jobOptions: {
42+
attempts: 2,
43+
backoff: { type: 'exponential', delay: 10000 },
6844
},
6945
},
70-
});
46+
);
7147

7248
this.registerWorker();
7349

@@ -78,22 +54,21 @@ export class AlertQueueService implements OnModuleInit {
7854
}
7955

8056
private registerWorker() {
81-
this.queueService.registerWorker<AlertCheckJobData>(
82-
this.QUEUE_NAME,
83-
async (job: Job<AlertCheckJobData>) => {
84-
this.logger.log('Processing transaction monitoring alerts check job', {
85-
jobId: job.id,
86-
});
87-
88-
await this.alertService.checkAllAlerts();
57+
this.queueService.registerWorker(this.QUEUE_NAME, this.processAlertCheckJob.bind(this), {
58+
concurrency: 1,
59+
});
60+
}
8961

90-
this.logger.log('Completed transaction monitoring alerts check', {
91-
jobId: job.id,
92-
});
62+
private async processAlertCheckJob(job: any) {
63+
this.logger.log('Processing transaction monitoring alerts check job', { jobId: job.id });
64+
try {
65+
await this.alertService.checkAllAlerts();
66+
this.logger.log('Completed transaction monitoring alerts check', { jobId: job.id });
9367

94-
return { success: true, timestamp: Date.now() };
95-
},
96-
{ concurrency: 1 },
97-
);
68+
return { success: true, timestamp: Date.now() };
69+
} catch (error) {
70+
this.logger.error('Alert check job failed', { jobId: job.id, error });
71+
throw error;
72+
}
9873
}
9974
}

services/workflows-service/src/app.worker.module.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
import { MiddlewareConsumer, Module } from '@nestjs/common';
21
import { ConfigModule } from '@nestjs/config';
32
import { ClsModule } from 'nestjs-cls';
43

54
import { AnalyticsModule } from '@/common/analytics-logger/analytics.module';
65
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
76
import { QueueModule } from '@/common/queue/queue.module';
8-
import { configs, env } from '@/env';
7+
import { configs } from '@/env';
98
import { validate } from '@/env-validate';
109
import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module';
1110
import { SentryModule } from '@/sentry/sentry.module';
1211
import { WebhooksModule } from '@/webhooks/webhooks.module';
1312
import { HealthModule } from './health/health.module';
1413
import { PrismaModule } from './prisma/prisma.module';
1514
import { AlertModule } from './alert/alert.module';
16-
import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middleware';
15+
import { Module } from '@nestjs/common';
1716

1817
@Module({
1918
imports: [
@@ -37,8 +36,4 @@ import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middlew
3736
AlertModule,
3837
],
3938
})
40-
export class WorkerAppModule {
41-
configure(consumer: MiddlewareConsumer) {
42-
consumer.apply(MetricsAuthMiddleware).forRoutes('*');
43-
}
44-
}
39+
export class WorkerAppModule {}

services/workflows-service/src/common/queue/queue-bullboard.service.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,4 @@ export class QueueBullboardService {
2222
this.logger.error(`Error registering queue ${queue.name} with BullBoard`, { error });
2323
}
2424
}
25-
26-
registerQueues(bullBoardInstance: any, queues: Queue[]) {
27-
try {
28-
const adapters = queues.map(queue => new BullMQAdapter(queue));
29-
bullBoardInstance.boardInstance.setQueues(adapters);
30-
this.logger.log(`Registered ${queues.length} queues with BullBoard`);
31-
} catch (error) {
32-
this.logger.error('Error registering queues with BullBoard', { error });
33-
}
34-
}
3525
}
Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,37 @@
11
import { Module } from '@nestjs/common';
2-
import { QueueService } from './queue.service';
2+
import { BullMQQueueService } from './queue.service';
33
import { QueueBullboardService } from './queue-bullboard.service';
44
import { QueueOtelService } from './otel.service';
5+
import { MonitoringModule } from '@/common/monitoring/monitoring.module';
6+
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from './types';
7+
import { createBullBoard } from '@bull-board/api';
8+
import { ExpressAdapter } from '@bull-board/express';
9+
import { RedisModule } from '../redis/redis.module';
510

611
@Module({
7-
providers: [QueueService, QueueBullboardService, QueueOtelService],
8-
exports: [QueueService, QueueBullboardService, QueueOtelService],
12+
imports: [MonitoringModule, RedisModule],
13+
providers: [
14+
BullMQQueueService,
15+
{ provide: 'IQueueService', useExisting: BullMQQueueService },
16+
QueueBullboardService,
17+
QueueOtelService,
18+
{
19+
provide: BULLBOARD_INSTANCE_INJECTION_TOKEN,
20+
useFactory: () => {
21+
const serverAdapter = new ExpressAdapter();
22+
serverAdapter.setBasePath('/api/queues');
23+
const boardInstance = createBullBoard({ queues: [], serverAdapter });
24+
25+
return { boardInstance, serverAdapter };
26+
},
27+
},
28+
],
29+
exports: [
30+
BullMQQueueService,
31+
'IQueueService',
32+
QueueBullboardService,
33+
QueueOtelService,
34+
BULLBOARD_INSTANCE_INJECTION_TOKEN,
35+
],
936
})
1037
export class QueueModule {}

0 commit comments

Comments
 (0)