Job Queue
The job queue system provides background job processing with support for multiple backends, scheduled jobs, recurring jobs, and flexible worker deployment.
Overview
The queue system consists of:
- Queue Providers: Pluggable backends (memory for dev, SQS for production)
- Workers: Process jobs with configurable concurrency
- Scheduler: Database-backed scheduling for delayed and recurring jobs
- Handler Registry: Register functions to process specific job types
Quick Start
1. Enable the Queue
// config/app.config.ts
export const config: AppConfig = {
// ... other config
queue: {
enabled: true,
providerConfig: { type: 'memory' },
worker: { mode: 'in-process', concurrency: 5 },
scheduler: { enabled: true },
},
};
2. Register Job Handlers
// extensions/jobs/email.ts
import { registerJobHandler } from '@chaaskit/server';
registerJobHandler('email:send', async (job, ctx) => {
ctx.log('Sending email to', job.payload.to);
await sendEmail(job.payload);
});
registerJobHandler('report:generate', async (job, ctx) => {
ctx.log('Generating report', job.payload.reportId);
ctx.progress(50); // Report progress
const report = await generateReport(job.payload);
ctx.progress(100);
return report;
});
3. Enqueue Jobs
import { getQueueProvider } from '@chaaskit/server';
const queue = getQueueProvider();
// Immediate job
await queue.enqueue('email:send', {
to: 'user@example.com',
subject: 'Welcome!'
});
// Delayed job (up to 15 min for SQS)
await queue.enqueue('reminder:send', { userId: '123' }, {
delay: 60000 // 1 minute
});
// Job with retry configuration
await queue.enqueue('webhook:send', { url: '...' }, {
maxRetries: 5,
timeout: 30000,
});
4. Push Schema Changes
pnpm db:push
Configuration
Full Config Options
queue: {
enabled: boolean;
// Provider configuration (discriminated union)
providerConfig:
| { type: 'memory'; maxHistorySize?: number }
| {
type: 'sqs';
region: string;
queueUrl: string;
deadLetterQueueUrl?: string;
visibilityTimeout?: number; // Default: 30s
};
// Worker configuration
worker?: {
mode: 'in-process' | 'standalone'; // Default: 'in-process'
concurrency?: number; // Default: 5
pollInterval?: number; // Default: 1000ms
shutdownTimeout?: number; // Default: 30000ms
};
// Scheduler configuration
scheduler?: {
enabled: boolean; // Default: false
pollInterval?: number; // Default: 60000ms (1 min)
};
}
Development Config
queue: {
enabled: true,
providerConfig: { type: 'memory', maxHistorySize: 1000 },
worker: { mode: 'in-process', concurrency: 2 },
scheduler: { enabled: true },
}
Production Config (SQS)
queue: {
enabled: true,
providerConfig: {
type: 'sqs',
region: 'us-east-1',
queueUrl: process.env.SQS_QUEUE_URL!,
deadLetterQueueUrl: process.env.SQS_DLQ_URL,
visibilityTimeout: 60,
},
worker: { mode: 'in-process', concurrency: 10 },
scheduler: { enabled: true, pollInterval: 30000 },
}
Queue Providers
Memory Provider
In-memory queue for development and testing. Uses EventEmitter for instant job notification (no polling).
providerConfig: {
type: 'memory',
maxHistorySize: 1000 // Keep last N completed jobs
}
Features:
- Instant job processing (EventEmitter-based)
- Job history for debugging
- Deduplication support
- Priority queues
SQS Provider
AWS SQS for production deployments. Requires @aws-sdk/client-sqs:
pnpm add @aws-sdk/client-sqs
providerConfig: {
type: 'sqs',
region: 'us-east-1',
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
deadLetterQueueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-dlq',
visibilityTimeout: 30,
}
Environment Variables:
AWS_ACCESS_KEY_ID=your-key
AWS_SECRET_ACCESS_KEY=your-secret
SQS_QUEUE_URL=https://sqs...
Features:
- Long polling (efficient, no wasted requests)
- Visibility timeout for at-least-once delivery
- Dead letter queue support
- FIFO queue support with deduplication
Limitations:
- Max delay: 15 minutes (use scheduler for longer delays)
- Max message size: 256 KB
Job Handlers
Registering Handlers
import { registerJobHandler } from '@chaaskit/server';
registerJobHandler<PayloadType, ResultType>(
'job:type',
async (job, ctx) => {
// job.id - Unique job ID
// job.type - Job type string
// job.payload - Your typed payload
// job.attempts - Current attempt number
// job.options - Job options (maxRetries, timeout, etc.)
// ctx.jobId - Same as job.id
// ctx.attempt - Current attempt (1-based)
// ctx.log() - Job-specific logging
// ctx.progress() - Report progress (0-100)
// ctx.signal - AbortSignal for cancellation
return result;
},
'Optional description for debugging'
);
Error Handling & Retries
Jobs automatically retry on failure:
registerJobHandler('flaky:job', async (job, ctx) => {
if (Math.random() < 0.5) {
throw new Error('Random failure');
// Job will be retried up to maxRetries times
}
return 'success';
});
// Configure retries per job
await queue.enqueue('flaky:job', {}, {
maxRetries: 5, // Default: 3
timeout: 60000, // Job timeout in ms
});
Timeout Handling
registerJobHandler('long:task', async (job, ctx) => {
// Check if job was cancelled/timed out
if (ctx.signal.aborted) {
throw new Error('Job cancelled');
}
// For long operations, periodically check the signal
for (const item of items) {
if (ctx.signal.aborted) break;
await processItem(item);
}
});
Scheduler
The scheduler enables delayed execution and recurring jobs using database storage.
One-Time Scheduled Jobs
For delays longer than provider limits (e.g., > 15 min for SQS):
import { getScheduler } from '@chaaskit/server';
const scheduler = getScheduler();
// Schedule for a specific time
await scheduler.schedule(
'report:send',
{ reportId: '123', recipients: ['user@example.com'] },
new Date('2024-12-25T09:00:00Z')
);
// Cancel a scheduled job
await scheduler.cancelScheduledJob(jobId);
Recurring Jobs
// Cron expression (daily at 9am)
await scheduler.registerRecurring({
name: 'daily-digest',
type: 'email:digest',
payload: { digestType: 'daily' },
schedule: '0 9 * * *',
timezone: 'America/New_York',
});
// Interval syntax
await scheduler.registerRecurring({
name: 'cache-cleanup',
type: 'maintenance:cleanup',
payload: {},
schedule: 'every 1 hour',
});
// Manage recurring jobs
await scheduler.disableRecurring('daily-digest');
await scheduler.enableRecurring('daily-digest');
await scheduler.deleteRecurring('daily-digest');
// List all recurring jobs
const jobs = await scheduler.listRecurring();
Supported Schedule Formats:
- Cron:
"0 9 * * *"(daily at 9am),"*/15 * * * *"(every 15 min) - Intervals:
"every 1 hour","every 30 minutes","every 1 day"
Worker Modes
In-Process Mode (Default)
Worker runs within the main server process:
worker: {
mode: 'in-process',
concurrency: 5
}
Pros: Simple setup, no extra processes Cons: Shares resources with web server
Standalone Mode
Run workers as separate processes for better scaling:
// config/app.config.ts
worker: {
mode: 'standalone' // Server won't start workers
}
Then run workers separately:
# Using the CLI
pnpm queue-worker
# Or with environment overrides
QUEUE_CONCURRENCY=10 pnpm queue-worker
SCHEDULER_ENABLED=true pnpm queue-worker
Environment Variables for Standalone Workers:
QUEUE_CONCURRENCY=5 # Number of concurrent jobs
QUEUE_POLL_INTERVAL=1000 # Poll interval (ms)
QUEUE_SHUTDOWN_TIMEOUT=30000 # Graceful shutdown timeout (ms)
SCHEDULER_ENABLED=false # Enable scheduler in this worker
SCHEDULER_POLL_INTERVAL=60000 # Scheduler poll interval (ms)
API Reference
Queue Provider
interface QueueProvider {
readonly name: string;
// Enqueue a job
enqueue<T>(type: string, payload: T, options?: EnqueueOptions): Promise<Job<T>>;
// Receive jobs (long polling)
receive(maxMessages?: number, waitTimeSeconds?: number): Promise<ReceivedJob[]>;
// Acknowledge successful completion
acknowledge(receiptHandle: string): Promise<void>;
// Mark job as failed (will retry if attempts remain)
fail(receiptHandle: string, error: Error): Promise<void>;
// Get job by ID
getJob(jobId: string): Promise<Job | null>;
// Get queue statistics
getStats(): Promise<QueueStats>;
// Close provider
close(): Promise<void>;
}
Enqueue Options
interface EnqueueOptions {
delay?: number; // Delay in ms
scheduledFor?: Date; // Specific execution time
maxRetries?: number; // Default: 3
timeout?: number; // Job timeout in ms (default: 30000)
deduplicationKey?: string; // Prevent duplicate jobs
priority?: number; // Lower = higher priority (default: 0)
}
Job Context
interface JobContext {
jobId: string;
attempt: number; // 1-based attempt number
log: (msg: string, ...args) => void; // Job-specific logging
progress: (percent: number) => void; // Report progress 0-100
signal: AbortSignal; // Check for cancellation
}
Database Models
The scheduler uses two database tables:
model ScheduledJob {
id String @id
type String
payload String @db.Text
options String @db.Text
scheduledFor DateTime
status String // scheduled | enqueued | cancelled
error String?
createdAt DateTime
updatedAt DateTime
}
model RecurringJob {
id String @id
name String @unique
type String
payload String @db.Text
options String @db.Text
schedule String // Cron or interval
timezone String
enabled Boolean
nextRunAt DateTime?
lastRunAt DateTime?
lastError String?
createdAt DateTime
updatedAt DateTime
}
Best Practices
Job Design
- Keep jobs idempotent - Jobs may run more than once
- Store minimal data - Put IDs in payload, not full objects
- Handle timeouts gracefully - Check
ctx.signal.aborted - Use meaningful job types -
email:welcome,report:daily,webhook:send
Production Setup
- Use SQS with DLQ - Failed jobs go to dead letter queue for inspection
- Run standalone workers - Scale workers independently from web servers
- Monitor queue depth - Alert when jobs back up
- Set appropriate timeouts - Match job execution time
Error Handling
registerJobHandler('critical:job', async (job, ctx) => {
try {
await riskyOperation();
} catch (error) {
// Log with context
ctx.log('Operation failed', { error: error.message, attempt: ctx.attempt });
// Re-throw to trigger retry
throw error;
// Or handle gracefully and don't retry
// return { status: 'failed', reason: error.message };
}
});
Troubleshooting
Jobs Not Processing
- Check queue is enabled:
queue.enabled: true - Check worker mode matches your setup
- Verify handler is registered for job type
- Check server logs for errors
SQS Connection Issues
- Verify AWS credentials are set
- Check queue URL is correct
- Ensure IAM permissions include SQS actions
- Test with AWS CLI:
aws sqs get-queue-attributes --queue-url ...
Scheduler Not Running
- Enable scheduler:
scheduler.enabled: true - Run
pnpm db:pushto create tables - Check database connectivity
- Verify recurring job schedules are valid