Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 340 additions & 0 deletions .cursor/rules/queues.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
---
description: Unified queue system supporting QStash and Redis (BullMQ) with automatic system selection, worker management, and job enqueueing
globs:
- "apps/web/utils/queue/**"
- "apps/web/worker.js"
- "apps/web/instrumentation.ts"
- "docker-compose.yml"
alwaysApply: false
---
# Queue System

Unified queue system supporting both QStash and Redis (BullMQ) with automatic system selection based on `QUEUE_SYSTEM` environment variable.

## Quick Start

### 1. Job Enqueueing

```typescript
import { enqueueJob } from "@/utils/queue/queue-manager";

// Basic job (will be distributed across ai-categorize-senders-0 to ai-categorize-senders-6)
const job = await enqueueJob("ai-categorize-senders-0", {
emailAccountId: "user-123",
senders: ["[email protected]"],
});

// Delayed job (5 seconds)
const delayedJob = await enqueueJob("scheduled-actions", {
scheduledActionId: "action-456",
}, {
delay: 5000,
});

console.log("Jobs enqueued:", job.id || job, delayedJob.id || delayedJob);
```

### 2. Bulk Job Enqueueing

```typescript
import { bulkEnqueueJobs } from "@/utils/queue/queue-manager";

const jobs = await bulkEnqueueJobs("ai-categorize-senders-0", {
jobs: [
{ data: { emailAccountId: "user-1", senders: ["[email protected]"] } },
{ data: { emailAccountId: "user-2", senders: ["[email protected]"] } },
{ data: { emailAccountId: "user-3", senders: ["[email protected]"] } },
],
});

console.log("Bulk jobs enqueued:", jobs.length);
```

### 3. Worker Setup (Redis only)

**How workers run**

- **Development**: workers can start in two ways:
1. **Within Next.js server**: Auto-start inside the Next.js server via `apps/web/instrumentation.ts` when both `NODE_ENV=development` and `ENABLE_WORKER_QUEUES=true` are set.
2. **Separate worker container**: Use the `worker` service in `docker-compose.yml` which runs `worker.js` as a separate container (keep `ENABLE_WORKER_QUEUES=false` or unset).
- **Production**: workers always run as a separate process/container (never within Next.js):
- **Standalone build**: Run a separate worker process using the standalone build output:
```bash
# Build app and copy worker into standalone output
pnpm --filter inbox-zero-ai build

# Start Next.js (prod)
pnpm --filter inbox-zero-ai start:standalone

# Start worker (separate process)
pnpm --filter inbox-zero-ai worker
```
- **Docker Compose**: Use the `worker` service in `docker-compose.yml` which runs `worker.js` as a separate container.

The worker process automatically:
- Creates one worker per registered queue
- Handles all queue types:
- `digest-item-summarize` - Process digest email items
- `ai-categorize-senders-0` to `ai-categorize-senders-6` - 7 distributed queues for AI sender categorization (load balanced)
- `scheduled-actions` - Execute scheduled email actions
- `ai-clean` - AI-powered email cleaning
- `email-digest-all` - Send digest emails to all users
- `email-summary-all` - Send summary emails to all users
- `clean-gmail` - Clean Gmail-specific operations
- Runs with concurrency of 3 per queue
- Implements graceful shutdown
- Reconnects on database/Redis failures

**Why separate process?**
- Prevents blocking the main application event loop
- Allows independent scaling of workers
- Improves fault isolation
- Enables better resource management

**Load Balancing for AI Categorization:**
- Email account IDs are distributed across 7 queues (`ai-categorize-senders-0` to `ai-categorize-senders-6`)
- Uses a simple hash function to ensure even distribution
- Each queue can process up to 3 jobs concurrently (21 total concurrent categorizations)
- QStash still uses per-email-account queues for maximum parallelization

## Configuration

### Environment Variables

```bash
# Choose queue system
QUEUE_SYSTEM=redis # Use Redis + BullMQ
QUEUE_SYSTEM=upstash # Use QStash (default)

# For Redis system (BullMQ)
# Must be a full URL. Do NOT pass this as host/port separately to BullMQ.
REDIS_URL=redis://localhost:6379

# For QStash system
QSTASH_TOKEN=your_qstash_token
```

### System Information

```typescript
import { getQueueSystemInfo } from "@/utils/queue/queue-manager";

const info = getQueueSystemInfo();
console.log("Queue system:", info.system);
console.log("Is Redis:", info.isRedis);
console.log("Is QStash:", info.isQStash);
```

### Retry & Concurrency Configuration

- **QStash**: Retry count is handled by QStash service (see [QStash documentation](https://docs.upstash.com/qstash) for details)
- **BullMQ (Redis)**: Retries up to **5 times** (configured in `bullmq-manager.ts`)
- **Concurrency**:
- **Worker Registry**: Default concurrency of **3** (configured in `worker.ts`)
- **BullMQ Manager**: Default concurrency of **3** (configured in `bullmq-manager.ts`)
- **QStash**: Uses parallelism of **3** for flow control (configured in `qstash-manager.ts`)

> Note: For authoritative concurrency and retry settings, see `apps/web/utils/queue/worker.ts` and `apps/web/utils/queue/bullmq-manager.ts`.

## Migration Examples

### From Direct QStash Usage

**Old way (direct QStash):**
```typescript
import { publishToQstashQueue } from "@/utils/upstash";

await publishToQstashQueue({
queueName: "digest-item-summarize",
parallelism: 3,
url: "/api/ai/digest",
body: { emailAccountId: "user-123", message: {...} },
});
```

**New way (unified queue system):**
```typescript
import { enqueueJob } from "@/utils/queue/queue-manager";

await enqueueJob("digest-item-summarize", {
emailAccountId: "user-123",
message: {...},
});
// Retry and parallelism handled automatically by system defaults
```

## Error Handling

```typescript
try {
const job = await enqueueJob("ai-categorize-senders-0", data, {
delay: 5000,
priority: 1,
});
} catch (error) {
console.error("Failed to enqueue job:", error);
}
```

### Worker Error Handling

```typescript
import { registerWorker } from "@/utils/queue/worker";

registerWorker("ai-categorize-senders-0", async (job) => {
try {
await processJob(job.data);
} catch (error) {
console.error("Job failed:", job.id, error);
throw error; // Will trigger retry logic
}
});
```

## Monitoring

### Queue Health Check

```typescript
import { getQueueSystemInfo } from "@/utils/queue/queue-manager";

function checkQueueHealth() {
const info = getQueueSystemInfo();

if (info.isRedis) {
console.log("Using Redis + BullMQ - Full feature set available");
// Set up BullMQ monitoring here
} else {
console.log("Using QStash - HTTP-based processing");
// Use QStash dashboard for monitoring
}
}
```

### Worker Monitoring

```typescript
import { getAllWorkers } from "@/utils/queue/worker";

const workers = getAllWorkers();
console.log("Active workers:", workers.size);

for (const [queueName, worker] of workers) {
console.log(`Worker for ${queueName}:`, {
isRunning: worker.isRunning(),
concurrency: worker.opts.concurrency,
});
}
```

## Complete Example

Here's a complete example showing how to set up and use the queue system:

```typescript
import {
enqueueJob,
bulkEnqueueJobs,
getQueueSystemInfo,
closeQueueManager
} from "@/utils/queue/queue-manager";

async function main() {
// Check system info
const systemInfo = getQueueSystemInfo();
console.log("Queue system:", systemInfo.system);

// Workers are automatically initialized when QUEUE_SYSTEM=redis
// No manual setup needed!

// Enqueue some jobs
await enqueueJob("email-digest-all", { emailAccountId: "account-123" });
await bulkEnqueueJobs("ai-categorize-senders-0", {
jobs: [
{ data: { emailAccountId: "account-123", senders: ["[email protected]"] } },
{ data: { emailAccountId: "account-456", senders: ["[email protected]"] } },
],
});

// Wait for processing
await new Promise(resolve => setTimeout(resolve, 5000));

// Cleanup
await closeQueueManager();
}

main().catch(console.error);
```

## Best Practices

1. **Choose the right system**: Use Redis for self-hosted deployments, QStash for managed cloud deployments (default)
2. **Use system defaults**: Retry and parallelism are configured automatically - don't override unless necessary
3. **Handle errors gracefully**: Always implement proper error handling in job processors
4. **Monitor queue health**: Set up monitoring for queue depth, processing rates, and error rates
5. **Use job IDs for deduplication**: Prevent duplicate jobs by using meaningful job IDs
6. **Clean up completed jobs**: Default cleanup policies are set appropriately

## Troubleshooting

### Common Issues

- **Workers not processing jobs**: In dev, confirm `NODE_ENV=development` and that `instrumentation.ts` is loaded. In prod, ensure `pnpm worker` is running.
- **Connection errors (BullMQ)**: Use `connection: { url: env.REDIS_URL }` with BullMQ. Passing `host: "redis://..."` causes DNS errors like `ENOTFOUND redis://localhost:6379`.
- **Jobs stuck in queue**: Check worker logs for errors and ensure workers are running
- **Memory issues**: Adjust concurrency settings and job cleanup policies

### Environment validation & server-only pitfalls

- `@t3-oss/env-nextjs` validates at import time. If the worker is run outside Next without preloading env, you'll see "Invalid environment variables". Next dev/prod automatically load `.env`, but raw `node/tsx` doesn’t.
- The `server-only` package throws when imported outside Next’s server graph. Running app modules directly with `node/tsx` can crash.

Solutions we use:
- Development workers run inside Next via `instrumentation.ts` (no env/server-only issues).
- Production workers use the standalone build and a simple `apps/web/worker.js` that imports from `.next/standalone/...`.

If you need a standalone TypeScript worker entrypoint, bundle it (tsup/esbuild) and:
- preload env (`@next/env`) before importing `env.ts`
- alias `server-only` to a no-op only for the worker bundle
- register tsconfig paths
This keeps code clean but adds a bundling step.


## API Reference

### Core Functions

- `enqueueJob(queueName, data, options?)`: Enqueue a single job
- `bulkEnqueueJobs(queueName, options)`: Enqueue multiple jobs
- `createQueueWorker(queueName, processor, options?)`: Create a worker (Redis only)
- `getQueueManager()`: Get the queue manager instance
- `getQueueSystemInfo()`: Get current system information

### Worker Functions

- `registerWorker(queueName, processor, config?)`: Register a worker
- `unregisterWorker(queueName)`: Unregister a worker
- `shutdownAllWorkers()`: Shutdown all workers gracefully

## File Structure

```
apps/web/
├── worker.js # Worker process entry point (copied to standalone on build)
└── utils/queue/
├── queue-manager.ts # Main queue abstraction
├── bullmq-manager.ts # BullMQ implementation
├── qstash-manager.ts # QStash implementation
├── queues.ts # Queue handlers and job data types
├── types.ts # Type definitions
├── worker.ts # Worker management
├── queue.test.ts # Comprehensive test suite
├── ai-queue.ts # AI-specific queue utilities
├── email-action-queue.ts # Email action queue utilities
└── email-actions.ts # Email action definitions

docker/
└── Dockerfile.prod # Production image (supports both web and worker)

docker-compose.yml # Includes both web and worker services (reuses same image)
```
34 changes: 34 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# VCS
.git

# Node / package managers
node_modules
pnpm-store

# Build outputs / caches
.next
.turbo
out
coverage
*.log

# OS/editor junk
.DS_Store
*.swp
*.swo
.idea
.vscode

# Local env files (do not bake secrets)
*.env
*.env.*
!.env.example

# Docker
Dockerfile*
!.dockerignore

# Misc
*.local
_tmp
.tmp
7 changes: 2 additions & 5 deletions apps/web/app/api/clean/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { z } from "zod";
import { NextResponse } from "next/server";
import { withError } from "@/utils/middleware";
import { publishToQstash } from "@/utils/upstash";
import { enqueueJob } from "@/utils/queue/queue-manager";
import { getThreadMessages } from "@/utils/gmail/thread";
import { getGmailClientWithRefresh } from "@/utils/gmail/client";
import type { CleanGmailBody } from "@/app/api/clean/gmail/route";
Expand Down Expand Up @@ -271,10 +271,7 @@ function getPublish({
});

await Promise.all([
publishToQstash("/api/clean/gmail", cleanGmailBody, {
key: `gmail-action-${emailAccountId}`,
ratePerSecond: maxRatePerSecond,
}),
enqueueJob("clean-gmail", cleanGmailBody),
updateThread({
emailAccountId,
jobId,
Expand Down
Loading
Loading