目前,如果完成询问所需的时间大于间隔,长时间运行的任务将重叠(同一任务同时运行多个实例)。 下面是 NestJS 服务示例
import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
function timeout(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
let c = 1
@Injectable()
export class TasksService {
private readonly logger = new Logger(TasksService.name);
@Interval(1000)
async handleCron() {
this.logger.debug(`start ${c}`);
await timeout(3000)
this.logger.debug(`end ${c}`);
c += 1
}
}
是否可以防止这些任务重叠并且一次只调用一个实例的任务?从技术上讲,我们可以跟踪
lock
变量,但这只会允许我们跳过实例(如果实例已经在运行)。理想情况下,我们可以调用设置选项来允许基于任务结束时间的间隔,而不是固定间隔(也称为开始时间)。
这在 NestJS 中很容易完成,但你需要重构你的代码。
不要使用
@Interval
,而是使用 @Cron
,因为它有更多我们将在下一行中使用的选项。
让我们首先替换间隔装饰器:
@Interval(1000)
async handleCron() {
this.logger.debug(`start ${c}`);
await timeout(3000)
this.logger.debug(`end ${c}`);
c += 1
}
变成:
@Cron(CronExpression.EVERY_SECOND, {
name: 'cron_job_name_here',
})
async handleCron() {
this.logger.debug(`start ${c}`);
await timeout(3000)
this.logger.debug(`end ${c}`);
c += 1
}
我们在这里所做的是,我们使用了更具表现力的 cron 表达式,即使非程序员也能理解这个作业每秒都会运行。另外,我们命名了 cron 作业,因此现在我们可以通过调度程序的模块 API 来操作 cron 作业。
现在我们的 cron 作业有了名称,我们可以使用调度程序注册表轻松启动/停止它。所以你的功能就变成了:
@Cron(CronExpression.EVERY_SECOND, {
name: 'cron_job_name_here',
})
async handleCron() {
const job = this.schedulerRegistry.getCronJob('cron_job_name_here');
job.stop(); // pausing the cron job
this.logger.debug(`start ${c}`);
await timeout(3000)
this.logger.debug(`end ${c}`);
c += 1;
job.start(); // restarting the cron job
}
不要忘记将调度程序注册表服务注入到服务的构造函数中。
private schedulerRegistry: SchedulerRegistry,
还有进口:
import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'
我正在寻找相同的解决方案,发现 Nest 的调度程序库不再提供此问题支持。顺便说一句,我遇到了一个旧库 https://github.com/miaowing/nest-schedule 我猜它不再处于维护状态,但有一个名为 config.waiting 的功能,它实际上的作用是“调度程序不会调度当该作业正在运行时,如果 waiting 为 true,请尝试该操作。
我的解决方案 - 用 runTaskOnce 包装执行目标函数,如果前一个函数未完成则跳过执行
export class ExampleService {
private readonly logger = new Logger(ExampleService.name);
private readonly runningTasks = new Set();
@Interval(10000)
async exampleFunction() {
this.runTaskOnce("exampleFunction", async () => {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 5000);
});
});
}
private async runTaskOnce(
taskName: string,
task: () => Promise<void>
): Promise<void> {
if (this.runningTasks.has(taskName)) {
this.logger.warn(`Task "${taskName}" is running now, skip this tick`);
return;
} else {
this.runningTasks.add(taskName);
return task().finally(() => {
this.runningTasks.delete(taskName);
});
}
}
}