NestJS 中是否可以防止计划任务重叠?

问题描述 投票:0回答:3

目前,如果完成询问所需的时间大于间隔,长时间运行的任务将重叠(同一任务同时运行多个实例)。 下面是 NestJS 服务示例

import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';

function timeout(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

let c = 1

@Injectable()
export class TasksService {
  private readonly logger = new Logger(TasksService.name);

  @Interval(1000)
  async handleCron() {
    this.logger.debug(`start ${c}`);
    await timeout(3000)
    this.logger.debug(`end ${c}`);
    c += 1
  }
}

是否可以防止这些任务重叠并且一次只调用一个实例的任务?从技术上讲,我们可以跟踪

lock
变量,但这只会允许我们跳过实例(如果实例已经在运行)。理想情况下,我们可以调用设置选项来允许基于任务结束时间的间隔,而不是固定间隔(也称为开始时间)。

javascript scheduled-tasks nestjs
3个回答
6
投票

这在 NestJS 中很容易完成,但你需要重构你的代码。

不要使用

@Interval
,而是使用
@Cron
,因为它有更多我们将在下一行中使用的选项。

让我们首先替换间隔装饰器:

@Interval(1000)
  async handleCron() {
    this.logger.debug(`start ${c}`);
    await timeout(3000)
    this.logger.debug(`end ${c}`);
    c += 1
  }

变成:

@Cron(CronExpression.EVERY_SECOND, {
     name: 'cron_job_name_here',
   })
async handleCron() {
        this.logger.debug(`start ${c}`);
        await timeout(3000)
        this.logger.debug(`end ${c}`);
        c += 1
   }

我们在这里所做的是,我们使用了更具表现力的 cron 表达式,即使非程序员也能理解这个作业每秒都会运行。另外,我们命名了 cron 作业,因此现在我们可以通过调度程序的模块 API 来操作 cron 作业。

现在我们的 cron 作业有了名称,我们可以使用调度程序注册表轻松启动/停止它。所以你的功能就变成了:

@Cron(CronExpression.EVERY_SECOND, {
     name: 'cron_job_name_here',
   })
async handleCron() {
        const job = this.schedulerRegistry.getCronJob('cron_job_name_here');
        job.stop(); // pausing the cron job

        this.logger.debug(`start ${c}`);
        await timeout(3000)
        this.logger.debug(`end ${c}`);
        c += 1;

        job.start(); // restarting the cron job
   }

不要忘记将调度程序注册表服务注入到服务的构造函数中。

private schedulerRegistry: SchedulerRegistry,

还有进口:

import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'

0
投票

我正在寻找相同的解决方案,发现 Nest 的调度程序库不再提供此问题支持。顺便说一句,我遇到了一个旧库 https://github.com/miaowing/nest-schedule 我猜它不再处于维护状态,但有一个名为 config.waiting 的功能,它实际上的作用是“调度程序不会调度当该作业正在运行时,如果 waiting 为 true,请尝试该操作。


0
投票

我的解决方案 - 用 runTaskOnce 包装执行目标函数,如果前一个函数未完成则跳过执行

export class ExampleService {
  private readonly logger = new Logger(ExampleService.name);
  private readonly runningTasks = new Set();

  @Interval(10000)
  async exampleFunction() {
    this.runTaskOnce("exampleFunction", async () => {
      return new Promise((resolve) => {
        setTimeout(() => {
          resolve();
        }, 5000);
      });
    });
  }

  private async runTaskOnce(
    taskName: string,
    task: () => Promise<void>
  ): Promise<void> {
    if (this.runningTasks.has(taskName)) {
      this.logger.warn(`Task "${taskName}" is running now, skip this tick`);
      return;
    } else {
      this.runningTasks.add(taskName);
      return task().finally(() => {
        this.runningTasks.delete(taskName);
      });
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.