Nestjs 使用 bullmq 的 Cron 作业不起作用

问题描述 投票:0回答:1

我在nestjs后端使用bullmq实现了cron-job。 生产者代码:

import { Queue } from 'bullmq';
import { BullQueues, BullWorkers } from '../../../../../apps/api-server/src/shared/utils/constants';
import { CronJobDataSource } from '../data-source';
import { PlatformIntegration } from '../../../../../apps/api-server/src/modules/platform-integrations/entities/platform-integration.entity';
import { CalendarPlatforms } from '../../../../../apps/api-server/src/modules/platform-integrations/domain/calendar-platforms.enum';

/* eslint-disable @typescript-eslint/no-var-requires */

require('dotenv').config();

async function getUsersToSyncWithPlatform(platform: CalendarPlatforms) {
  ...
}

export const eventCronJob = async () => {
  console.log('Event Cron Job Called');
  try {
    await CronJobDataSource.initialize();
    // Initialize the BullMQ queue
    const syncQueue = new Queue(BullQueues.SYNC_EVENTS, {
      connection: { host: process.env.REDIS_HOSTNAME, port: Number(process.env.REDIS_PORT) },
    });

    // Function to enqueue jobs
    const enqueueSyncJobs = async (
      platform: CalendarPlatforms,
      users: {
        id: string;
        account: string;
      }[],
      period: number,
    ) => {
      for await (const user of users) {
        await syncQueue.add(
          BullWorkers.SYNC_EVENTS_FOR_PLATFORM,
          {
            platform,
            userId: user.id,
            account: user.account,
          },
          { repeat: { every: period } },
        );
      }
    };

    // Enqueue jobs for Google Calendar
    const usersToSyncGoogle = await getUsersToSyncWithPlatform(CalendarPlatforms.GOOGLE);
    await enqueueSyncJobs(CalendarPlatforms.GOOGLE, usersToSyncGoogle, 20000);

    process.exit();
  } catch (error) {
    console.error(error);
  }
}

队列事件监听代码:

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor(BullQueues.SYNC_EVENTS)
export class SyncEventsConsumer {
   constructor() {}

   @Process(BullWorkers.SYNC_EVENTS_FOR_PLATFORM)
   async readOperationJob(job: Job<{ platform: CalendarPlatforms; userId: string; account: string  }>) {
   const {
      data: { platform, userId, account },
    } = job;
    console.log('Cron Job: >>>>>>>>>>>', platform, userId, account);
}

BullModule 在

app.module.ts
中作为 Redis 连接,事件监听器服务在子 module.ts 中注册为提供者 但 cron-job 一次都不起作用。我需要你的帮助。谢谢。

生产者代码位于应用程序模块之外,并在 Nest-cli.json 中创建为库。 当我运行 Nest 后端时,我可以看到该登录生产者。

Event Cron Job Called
但我看不到登录队列事件侦听器。
Cron Job: >>>>>>>>>>> ...
我在生产者中使用了
bullmq
模块。我在事件列表器中使用了
@nestjs/bull
bull
模块。

cron nestjs nestjs-config bull bullmq
1个回答
0
投票

我不确定为什么你需要一起使用 Bull 和 BullMQ,但是尝试使用 NestJS 和 BullMQ 设置可重复的作业,我最终遇到了你的问题。在应用程序模块中注册 BullMQ 连接并在单独的模块中注册队列后,我执行了以下操作:

import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Processor('example')
export class PhotoCleanupProcessor extends WorkerHost {
  constructor(
    @InjectQueue('example') private photoCleanupQueue: Queue,
    private photoCleanupService: PhotoCleanupService,
  ) {
    super();
    this.setupRepeatableJob();
  }

  private setupRepeatableJob() {
    console.log('Setting up repeatable job');
    this.photoCleanupQueue.add('example-job', undefined, {
      repeat: {
        // run it every 10 seconds
        pattern: '*/10 * * * * *',
      },
    });
  }

  async process(): Promise<void> {
    console.log('Processing job');
  }
}

这样,作业处理器及其处理程序。我可以运行服务的多个副本,并看到该作业每十秒仅由一个副本执行一次。我希望它有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.