BullMq - 面临作业被处理两次的错误,这会导致诸如缺少密钥/锁之类的错误

问题描述 投票:0回答:1

提供了详细信息,包括我用来通过 Nodejs 实现作业的代码

需要记住一些事情,当 GCP 运行多个实例时,它们都可以访问这一个工作进程和同一个 Redis 实例。

作为一般工作的新手,我以前没有看到这个问题,但现在随着平台上用户的增加,至少有 10% 的人会遇到这个问题,但其余的工作都是成功的。

我一直面临这个问题,其中 BullMq 不断显示以下错误:

  1. 缺少作业“abc”的密钥
  2. 缺少作业“abc”的锁
  3. 错误:作业“abc”未处于活动状态。失败了
  4. 错误:作业“abc”的锁不匹配。 Cmd 从活动状态失败

技术堆栈:

  • GCP 应用引擎标准
  • GCP Redis 内存存储
  • Nodejs
  • 快递

至于我的代码,我将 bullmq 代码分为 4 部分:

  1. 第一部分是
    QUEUE
    的设置:
import { JobsOptions, Queue } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { DEFAULT_REMOVE_CONFIG, QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { setUpReplyWorker } from './reply-worker';

// reply queue
export const replyQueue = new Queue(QUEUE_NAME_REPLY, {
  connection: {
    host: REDIS_QUEUE_HOST,
    port: parseInt(REDIS_QUEUE_PORT || '6379'),
    connectTimeout: 1000 * 60 * 5,
  },
});

setUpReplyWorker();

export const addReplyJob = async <T>(jobName: string, data: T, config?: JobsOptions) => {
  return replyQueue.add(jobName, data, {
    removeOnComplete: true,
    removeOnFail: true,
    ...config,
  });
};
  1. 第二部分是
    WORKER
    :
import { Job, Worker } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { replyJobProcessor } from './reply-jobProcessor';

export const setUpReplyWorker = (): void => {
  const worker = new Worker(QUEUE_NAME_REPLY, replyJobProcessor, {
    connection: {
      host: REDIS_QUEUE_HOST,
      port: parseInt(REDIS_QUEUE_PORT || '6379'),
    },
    autorun: true,
    concurrency: 20000,
    // 5 mins
    lockDuration: 1000 * 60 * 5,
  });

  worker.on('completed', (job: Job, returnvalue: 'DONE') => {
    console.debug(`Completed job "reply" with id ${job.id}`, returnvalue);
  });

  worker.on('active', (job: Job<unknown>) => {
    console.debug(`Active job "reply" with id ${job.id}`);
  });
  worker.on('error', (failedReason: Error) => {
    console.error(`Job encountered an error "reply"`, failedReason);
  });
};
  1. 第三个是我在
    Job processor
    中作为
    arg
    通过的
    worker
import { Job } from 'bullmq';
import { someFunction } from '../../automate/someFunction';

export const replyJobProcessor = async (
  job: Job<{
    someId: string;
  }>,
): Promise<'DONE'> => {
  await job.log(`Started processing job with id ${job.id}`);

  await someFunction({
    someId: "id"
  });

  await job.log(`Finished processing job with id ${job.id}`);
  return 'DONE';
};
  1. FUNCTION
    本身:
export const someFunction = async ({ someId }: { someId: string }) => {
  let incorrectData;
  
  // Call API to get the data

  if (incorrectData) {
    someFunction({ someId });
    return;
  }

  await continueToAnotherFunction({ someId });

  return;
};

我希望尝试复制它,即使在查阅了 BullMq 的文档之后,我也不明白为什么我会处理两次。因为每当一个作业被处理两次时就会发生这些问题。但我已经在我的代码中确保代码不会运行两次。我什至看到,当一个作业被锁定并处于活动状态时,无论

job id
是否相同,它都不会处理该作业。

此外,我不会手动将作业移动到任何状态。我根本没接触过这方面。

我还能做什么来解决这个问题?请帮忙。

node.js google-cloud-platform redis node-redis bullmq
1个回答
0
投票

我面临着完全相同的问题。 在我看来,问题的根源在于处理器由于某种原因被阻塞,因此库解锁了消息并再次处理它。 也许处理器需要太长时间才能完成该操作。 我完全迷失了。

© www.soinside.com 2019 - 2024. All rights reserved.