-
Notifications
You must be signed in to change notification settings - Fork 1.1k
WIP: Flexible queue system with BullMQ and Qstash #856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
edulelis
wants to merge
20
commits into
elie222:main
Choose a base branch
from
edulelis:bullmq-queue-system
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
cbbfa25
Initial PR to support both Qstash and BullMQ
edulelis 9d39a5f
Update worker initialization. Add 7 queues to large queue
edulelis dc265b6
PR feedback
edulelis a688952
Restore pnpm-lock
edulelis 1d1788a
Merge branch 'main' of github.com:elie222/inbox-zero into bullmq-queu…
edulelis 1ca7d0f
Partial queue implementation with docker improvements
edulelis e485bd2
Fix package versions. Remove shx. Update worker command
edulelis 593e63b
Merge branch 'main' of github.com:elie222/inbox-zero into bullmq-queu…
edulelis 452fef5
PR feedback. fix biome checks
edulelis 781a9b3
PR feedback
edulelis 3f3213d
Build fixes and PR feedback
edulelis 40dca13
Remove server-only modifications
edulelis 7d8123c
PR Feedback
edulelis 60a3ba2
PR Feedback
edulelis 0d16445
Add UPSTASH TOKEN
edulelis 837f033
Refactor to queue system
edulelis dc8da7c
Merge branch 'main' of github.com:elie222/inbox-zero into bullmq-queu…
edulelis f813203
Refactor generic queue calls
edulelis dcc3336
Refactor generic queue calls
edulelis 6c2682b
Refactor generic queue calls
edulelis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,328 @@ | ||
| # Queue System | ||
|
|
||
| Unified queue system supporting both QStash and Redis (BullMQ) with automatic system selection based on `QUEUE_SYSTEM` environment variable. | ||
|
|
||
| ## Quick Start | ||
|
|
||
| ### 1. Job Enqueueing | ||
|
|
||
| ```typescript | ||
| import { enqueueJob } from "@/utils/queue/queue-manager"; | ||
|
|
||
| // Basic job (will be distributed across ai-categorize-senders-0 to ai-categorize-senders-6) | ||
| const job = await enqueueJob("ai-categorize-senders-0", { | ||
| emailAccountId: "user-123", | ||
| senders: ["[email protected]"], | ||
| }); | ||
|
|
||
| // Delayed job (5 seconds) | ||
| const delayedJob = await enqueueJob("scheduled-actions", { | ||
| scheduledActionId: "action-456", | ||
| }, { | ||
| delay: 5000, | ||
| }); | ||
|
|
||
| console.log("Jobs enqueued:", job.id || job, delayedJob.id || delayedJob); | ||
| ``` | ||
|
|
||
| ### 2. Bulk Job Enqueueing | ||
|
|
||
| ```typescript | ||
| import { bulkEnqueueJobs } from "@/utils/queue/queue-manager"; | ||
|
|
||
| const jobs = await bulkEnqueueJobs("ai-categorize-senders-0", { | ||
| jobs: [ | ||
| { data: { emailAccountId: "user-1", senders: ["[email protected]"] } }, | ||
| { data: { emailAccountId: "user-2", senders: ["[email protected]"] } }, | ||
| { data: { emailAccountId: "user-3", senders: ["[email protected]"] } }, | ||
| ], | ||
| }); | ||
|
|
||
| console.log("Bulk jobs enqueued:", jobs.length); | ||
| ``` | ||
|
|
||
| ### 3. Worker Setup (Redis only) | ||
|
|
||
| **How workers run** | ||
|
|
||
| - Development: workers auto-start inside the Next.js server via `apps/web/instrumentation.ts` when `NODE_ENV=development`. | ||
edulelis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| - Production: run a separate worker process using the standalone build output: | ||
|
|
||
| ```bash | ||
| # Build app and copy worker into standalone output | ||
| pnpm --filter inbox-zero-ai build | ||
|
|
||
| # Start Next.js (prod) | ||
| pnpm --filter inbox-zero-ai start:standalone | ||
|
|
||
| # Start worker (separate process) | ||
| pnpm --filter inbox-zero-ai worker | ||
| ``` | ||
|
|
||
| The worker process automatically: | ||
| - Creates one worker per registered queue | ||
| - Handles all queue types: | ||
| - `digest-item-summarize` - Process digest email items | ||
| - `ai-categorize-senders-0` to `ai-categorize-senders-6` - 7 distributed queues for AI sender categorization (load balanced) | ||
| - `scheduled-actions` - Execute scheduled email actions | ||
| - `ai-clean` - AI-powered email cleaning | ||
| - `email-digest-all` - Send digest emails to all users | ||
| - `email-summary-all` - Send summary emails to all users | ||
| - `clean-gmail` - Clean Gmail-specific operations | ||
| - Runs with concurrency of 3 per queue | ||
| - Implements graceful shutdown | ||
| - Reconnects on database/Redis failures | ||
|
|
||
| **Why separate process?** | ||
| - Prevents blocking the main application event loop | ||
| - Allows independent scaling of workers | ||
| - Improves fault isolation | ||
| - Enables better resource management | ||
|
|
||
| **Load Balancing for AI Categorization:** | ||
| - Email account IDs are distributed across 7 queues (`ai-categorize-senders-0` to `ai-categorize-senders-6`) | ||
| - Uses a simple hash function to ensure even distribution | ||
| - Each queue can process up to 3 jobs concurrently (21 total concurrent categorizations) | ||
| - QStash still uses per-email-account queues for maximum parallelization | ||
|
|
||
| ## Configuration | ||
|
|
||
| ### Environment Variables | ||
|
|
||
| ```bash | ||
| # Choose queue system | ||
| QUEUE_SYSTEM=redis # Use Redis + BullMQ | ||
| QUEUE_SYSTEM=upstash # Use QStash (default) | ||
|
|
||
| # For Redis system (BullMQ) | ||
| # Must be a full URL. Do NOT pass this as host/port separately to BullMQ. | ||
| REDIS_URL=redis://localhost:6379 | ||
|
|
||
| # For QStash system | ||
| QSTASH_TOKEN=your_qstash_token | ||
| ``` | ||
|
|
||
| ### System Information | ||
|
|
||
| ```typescript | ||
| import { getQueueSystemInfo } from "@/utils/queue/queue-manager"; | ||
|
|
||
| const info = getQueueSystemInfo(); | ||
| console.log("Queue system:", info.system); | ||
| console.log("Is Redis:", info.isRedis); | ||
| console.log("Is QStash:", info.isQStash); | ||
| ``` | ||
|
|
||
| ### Retry & Concurrency Configuration | ||
|
|
||
| - **QStash**: Retry count is handled by QStash service (see [QStash documentation](https://docs.upstash.com/qstash) for details) | ||
| - **BullMQ (Redis)**: Retries up to **5 times** (configured in `bullmq-manager.ts`) | ||
| - **Concurrency**: | ||
| - **Worker Registry**: Default concurrency of **3** (configured in `worker.ts`) | ||
| - **BullMQ Manager**: Default concurrency of **3** (configured in `bullmq-manager.ts`) | ||
| - **QStash**: Uses parallelism of **3** for flow control (configured in `qstash-manager.ts`) | ||
|
|
||
| > Note: For authoritative concurrency and retry settings, see `apps/web/utils/queue/worker.ts` and `apps/web/utils/queue/bullmq-manager.ts`. | ||
|
|
||
| ## Migration Examples | ||
|
|
||
| ### From Direct QStash Usage | ||
|
|
||
| **Old way (direct QStash):** | ||
| ```typescript | ||
| import { publishToQstashQueue } from "@/utils/upstash"; | ||
|
|
||
| await publishToQstashQueue({ | ||
| queueName: "digest-item-summarize", | ||
| parallelism: 3, | ||
| url: "/api/ai/digest", | ||
| body: { emailAccountId: "user-123", message: {...} }, | ||
| }); | ||
| ``` | ||
|
|
||
| **New way (unified queue system):** | ||
| ```typescript | ||
| import { enqueueJob } from "@/utils/queue/queue-manager"; | ||
|
|
||
| await enqueueJob("digest-item-summarize", { | ||
| emailAccountId: "user-123", | ||
| message: {...}, | ||
| }); | ||
| // Retry and parallelism handled automatically by system defaults | ||
| ``` | ||
|
|
||
| ## Error Handling | ||
|
|
||
| ```typescript | ||
| try { | ||
| const job = await enqueueJob("ai-categorize-senders-0", data, { | ||
| delay: 5000, | ||
| priority: 1, | ||
| }); | ||
| } catch (error) { | ||
| console.error("Failed to enqueue job:", error); | ||
| } | ||
| ``` | ||
|
|
||
| ### Worker Error Handling | ||
|
|
||
| ```typescript | ||
| import { registerWorker } from "@/utils/queue/worker"; | ||
|
|
||
| registerWorker("ai-categorize-senders-0", async (job) => { | ||
| try { | ||
| await processJob(job.data); | ||
| } catch (error) { | ||
| console.error("Job failed:", job.id, error); | ||
| throw error; // Will trigger retry logic | ||
| } | ||
| }); | ||
| ``` | ||
|
|
||
| ## Monitoring | ||
|
|
||
| ### Queue Health Check | ||
|
|
||
| ```typescript | ||
| import { getQueueSystemInfo } from "@/utils/queue/queue-manager"; | ||
|
|
||
| function checkQueueHealth() { | ||
| const info = getQueueSystemInfo(); | ||
|
|
||
| if (info.isRedis) { | ||
| console.log("Using Redis + BullMQ - Full feature set available"); | ||
| // Set up BullMQ monitoring here | ||
| } else { | ||
| console.log("Using QStash - HTTP-based processing"); | ||
| // Use QStash dashboard for monitoring | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ### Worker Monitoring | ||
|
|
||
| ```typescript | ||
| import { getAllWorkers } from "@/utils/queue/worker"; | ||
|
|
||
| const workers = getAllWorkers(); | ||
| console.log("Active workers:", workers.size); | ||
|
|
||
| for (const [queueName, worker] of workers) { | ||
| console.log(`Worker for ${queueName}:`, { | ||
| isRunning: worker.isRunning(), | ||
| concurrency: worker.opts.concurrency, | ||
| }); | ||
| } | ||
| ``` | ||
|
|
||
| ## Complete Example | ||
|
|
||
| Here's a complete example showing how to set up and use the queue system: | ||
|
|
||
| ```typescript | ||
| import { | ||
| enqueueJob, | ||
| bulkEnqueueJobs, | ||
| getQueueSystemInfo, | ||
| closeQueueManager | ||
| } from "@/utils/queue/queue-manager"; | ||
|
|
||
| async function main() { | ||
| // Check system info | ||
| const systemInfo = getQueueSystemInfo(); | ||
| console.log("Queue system:", systemInfo.system); | ||
|
|
||
| // Workers are automatically initialized when QUEUE_SYSTEM=redis | ||
| // No manual setup needed! | ||
|
|
||
| // Enqueue some jobs | ||
| await enqueueJob("email-digest-all", { emailAccountId: "account-123" }); | ||
| await bulkEnqueueJobs("ai-categorize-senders-0", { | ||
| jobs: [ | ||
| { data: { emailAccountId: "account-123", senders: ["[email protected]"] } }, | ||
| { data: { emailAccountId: "account-456", senders: ["[email protected]"] } }, | ||
| ], | ||
| }); | ||
|
|
||
| // Wait for processing | ||
| await new Promise(resolve => setTimeout(resolve, 5000)); | ||
|
|
||
| // Cleanup | ||
| await closeQueueManager(); | ||
| } | ||
|
|
||
| main().catch(console.error); | ||
| ``` | ||
|
|
||
| ## Best Practices | ||
|
|
||
| 1. **Choose the right system**: Use Redis for self-hosted deployments, QStash for managed cloud deployments (default) | ||
| 2. **Use system defaults**: Retry and parallelism are configured automatically - don't override unless necessary | ||
| 3. **Handle errors gracefully**: Always implement proper error handling in job processors | ||
| 4. **Monitor queue health**: Set up monitoring for queue depth, processing rates, and error rates | ||
| 5. **Use job IDs for deduplication**: Prevent duplicate jobs by using meaningful job IDs | ||
| 6. **Clean up completed jobs**: Default cleanup policies are set appropriately | ||
|
|
||
| ## Troubleshooting | ||
|
|
||
| ### Common Issues | ||
|
|
||
| - **Workers not processing jobs**: In dev, confirm `NODE_ENV=development` and that `instrumentation.ts` is loaded. In prod, ensure `pnpm worker` is running. | ||
| - **Connection errors (BullMQ)**: Use `connection: { url: env.REDIS_URL }` with BullMQ. Passing `host: "redis://..."` causes DNS errors like `ENOTFOUND redis://localhost:6379`. | ||
| - **Jobs stuck in queue**: Check worker logs for errors and ensure workers are running | ||
| - **Memory issues**: Adjust concurrency settings and job cleanup policies | ||
|
|
||
| ### Environment validation & server-only pitfalls | ||
|
|
||
| - `@t3-oss/env-nextjs` validates at import time. If the worker is run outside Next without preloading env, you'll see "Invalid environment variables". Next dev/prod automatically load `.env`, but raw `node/tsx` doesn’t. | ||
| - The `server-only` package throws when imported outside Next’s server graph. Running app modules directly with `node/tsx` can crash. | ||
|
|
||
| Solutions we use: | ||
| - Development workers run inside Next via `instrumentation.ts` (no env/server-only issues). | ||
| - Production workers use the standalone build and a simple `apps/web/worker.js` that imports from `.next/standalone/...`. | ||
|
|
||
| If you need a standalone TypeScript worker entrypoint, bundle it (tsup/esbuild) and: | ||
| - preload env (`@next/env`) before importing `env.ts` | ||
| - alias `server-only` to a no-op only for the worker bundle | ||
| - register tsconfig paths | ||
| This keeps code clean but adds a bundling step. | ||
|
|
||
|
|
||
| ## API Reference | ||
|
|
||
| ### Core Functions | ||
|
|
||
| - `enqueueJob(queueName, data, options?)`: Enqueue a single job | ||
| - `bulkEnqueueJobs(queueName, options)`: Enqueue multiple jobs | ||
| - `createQueueWorker(queueName, processor, options?)`: Create a worker (Redis only) | ||
| - `getQueueManager()`: Get the queue manager instance | ||
| - `getQueueSystemInfo()`: Get current system information | ||
|
|
||
| ### Worker Functions | ||
|
|
||
| - `registerWorker(queueName, processor, config?)`: Register a worker | ||
| - `unregisterWorker(queueName)`: Unregister a worker | ||
| - `shutdownAllWorkers()`: Shutdown all workers gracefully | ||
|
|
||
| ## File Structure | ||
|
|
||
| ``` | ||
| apps/web/ | ||
| ├── worker.js # Worker process entry point (copied to standalone on build) | ||
| └── utils/queue/ | ||
| ├── queue-manager.ts # Main queue abstraction | ||
| ├── bullmq-manager.ts # BullMQ implementation | ||
| ├── qstash-manager.ts # QStash implementation | ||
| ├── queues.ts # Queue handlers and job data types | ||
| ├── types.ts # Type definitions | ||
| ├── worker.ts # Worker management | ||
| ├── queue.test.ts # Comprehensive test suite | ||
| ├── ai-queue.ts # AI-specific queue utilities | ||
| ├── email-action-queue.ts # Email action queue utilities | ||
| └── email-actions.ts # Email action definitions | ||
|
|
||
| docker/ | ||
| └── Dockerfile.prod # Production image (supports both web and worker) | ||
|
|
||
| docker-compose.yml # Includes both web and worker services (reuses same image) | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # VCS | ||
| .git | ||
|
|
||
| # Node / package managers | ||
| node_modules | ||
| pnpm-store | ||
|
|
||
| # Build outputs / caches | ||
| .next | ||
| .turbo | ||
| out | ||
| coverage | ||
| *.log | ||
|
|
||
| # OS/editor junk | ||
| .DS_Store | ||
| *.swp | ||
| *.swo | ||
| .idea | ||
| .vscode | ||
|
|
||
| # Local env files (do not bake secrets) | ||
| *.env | ||
| *.env.* | ||
| !.env.example | ||
|
|
||
| # Docker | ||
| Dockerfile* | ||
| !.dockerignore | ||
|
|
||
| # Misc | ||
| *.local | ||
| _tmp | ||
| .tmp |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.