5-4 进阶队列任务的生命周期事件
队列任务完成后的清理策略
当任务执行完成后,如果不做任何清理,Redis 中会积累大量的历史记录。在 Another Redis Desktop Manager 中可以看到 complete 队列里堆积了许多已完成的定时任务和计划任务。默认情况下,我们应该在任务完成后将其从 Redis 缓存中删除。
在 BullModule.forRoot() 配置中添加 defaultJobOptions,启用自动清理:
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST,
port: Number(process.env.REDIS_PORT),
password: process.env.REDIS_PASSWORD,
},
defaultJobOptions: {
removeOnComplete: true, // 任务完成后自动删除
removeOnFail: false, // 保留失败任务,便于后续重试
},
})
typescript
将 removeOnComplete 设为 true,任务完成后自动从 Redis 中移除。将 removeOnFail 设为 false,保留失败任务以便读取并执行重试逻辑。
Bull 队列生命周期事件概述
Bull 队列提供了丰富的事件监听机制,分为两类:
- Local Events:绑定到单个处理进程的事件,适用于默认的单进程场景
- Global Events:绑定到队列的全局事件,适用于多处理进程(多个 named processor)的场景
关键区别在于:当队列只有一个默认 process 时,使用 local events;当队列注册了多个具名 processor(如 @Processor('schedule-tasks') 中包含多个处理方法)时,需要使用 global events。
队列生命周期事件列表
| 事件名 | 触发时机 |
|---|---|
waiting | 任务进入等待队列 |
active | 任务被激活,开始处理 |
completed | 任务处理成功完成 |
failed | 任务处理失败 |
paused | 队列被暂停 |
resumed | 队列恢复运行 |
cleaned | 队列执行清理操作 |
drained | 队列中所有等待任务已处理完毕 |
removed | 任务被移除 |
progress | 任务进度更新 |
stalled | 任务停滞(处理器未响应) |
error | 队列发生错误 |
实现 Global Event Listener
创建 tasks-events.service.ts,使用 @Processor 装饰器绑定到对应的队列名:
import { Processor } from '@nestjs/bull';
import { Job, Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Processor('schedule-tasks')
export class TasksEventsService {
constructor(
@InjectQueue('schedule-tasks') private queue: Queue,
) {}
@OnQueueActive()
onActive(job: Job) {
console.log(`Job ${job.id} is now active`);
}
@OnQueueCompleted()
onCompleted(job: Job, result: any) {
console.log(`Job ${job.id} completed with result: ${result}`);
}
@OnQueueFailed()
onFailed(job: Job, err: Error) {
console.log(`Job ${job.id} failed: ${err.message}`);
}
@OnQueueDrained()
onDrained() {
console.log('All waiting jobs have been processed');
}
@OnQueueWaiting()
onWaiting(jobId: number | string) {
console.log(`Job ${jobId} is waiting`);
}
@OnQueueError()
onError(error: Error) {
console.log(`Queue error: ${error.message}`);
}
@OnQueueStalled()
onStalled(jobId: string) {
console.log(`Job ${jobId} has stalled`);
}
@OnQueueProgress()
onProgress(job: Job, progress: number) {
console.log(`Job ${job.id} progress: ${progress}%`);
}
@OnQueuePaused()
onPaused() {
console.log('Queue paused');
}
@OnQueueResumed()
onResumed() {
console.log('Queue resumed');
}
@OnQueueCleaned()
onCleaned(jobs: Job[], type: string) {
console.log(`Cleaned ${jobs.length} ${type} jobs`);
}
@OnQueueRemoved()
onRemoved(job: Job) {
console.log(`Job ${job.id} removed`);
}
}
typescript
将 service 注册到对应的 module 中:
providers: [
// ... 其他 providers
TasksEventsService,
]
typescript
测试生命周期事件
发起请求后,控制台会按顺序打印事件信息:
Job 1 is waiting
Job 1 is now active
Job 1 completed with result: OK
All waiting jobs have been processed
text
事件触发顺序为:waiting -> active -> completed -> drained。
通过依赖注入获取队列中的失败任务
在实际应用中,我们需要在失败回调中获取失败的任务详情并执行重试逻辑。使用 @InjectQueue 注入队列实例:
@OnQueueActive()
async onActive(job: Job) {
const failedJobs = await this.queue.getFailed();
console.log('Failed jobs:', failedJobs);
// 每个 failed job 包含以下信息:
// - job.data: 传递给任务的数据
// - job.failedReason: 失败原因
// - job.stacktrace: 错误堆栈(可通过 stackTraceLimit 配置限制长度)
}
typescript
获取到失败 job 后,可以调用 job 实例上的方法进行操作:
job.retry():重新执行失败的任务job.moveToCompleted():将任务移入已完成队列job.moveToFailed():将任务移入失败队列
邮件服务配置的边界处理
在使用队列发送邮件时,需要处理邮件服务未配置的情况:
// schedule-task-consumer.ts
if (!this.mailService) {
const log = new Logger('ScheduleTaskConsumer');
log.warn('邮件配置异常,请检查 .env 配置文件中的邮件相关配置');
return;
}
typescript
同时注意 mailOn 从配置文件读取时是字符串类型,需要转换为布尔值:
// queue.ts
const mailOn = toBoolean(this.configService.get('MAIL_ON'));
typescript
小结
- 使用
defaultJobOptions配置任务的自动清理策略 - Bull 队列事件分为 Local(单进程)和 Global(多进程)两种
- 通过
@InjectQueue注入队列实例,可以获取失败任务并执行重试逻辑 - 在实际项目中,通常只需要关注
failed、cleaned、removed等关键事件
↑