提供了详细信息,包括我用来通过 Nodejs 实现作业的代码
需要记住一些事情,当 GCP 运行多个实例时,它们都可以访问这一个工作进程和同一个 Redis 实例。
作为一般工作的新手,我以前没有看到这个问题,但现在随着平台上用户的增加,至少有 10% 的人会遇到这个问题,但其余的工作都是成功的。
我一直面临这个问题,其中 BullMq 不断显示以下错误:
技术堆栈:
至于我的代码,我将 bullmq 代码分为 4 部分:
QUEUE
的设置:import { JobsOptions, Queue } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { DEFAULT_REMOVE_CONFIG, QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { setUpReplyWorker } from './reply-worker';
// reply queue
export const replyQueue = new Queue(QUEUE_NAME_REPLY, {
connection: {
host: REDIS_QUEUE_HOST,
port: parseInt(REDIS_QUEUE_PORT || '6379'),
connectTimeout: 1000 * 60 * 5,
},
});
setUpReplyWorker();
export const addReplyJob = async <T>(jobName: string, data: T, config?: JobsOptions) => {
return replyQueue.add(jobName, data, {
removeOnComplete: true,
removeOnFail: true,
...config,
});
};
WORKER
:import { Job, Worker } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { replyJobProcessor } from './reply-jobProcessor';
export const setUpReplyWorker = (): void => {
const worker = new Worker(QUEUE_NAME_REPLY, replyJobProcessor, {
connection: {
host: REDIS_QUEUE_HOST,
port: parseInt(REDIS_QUEUE_PORT || '6379'),
},
autorun: true,
concurrency: 20000,
// 5 mins
lockDuration: 1000 * 60 * 5,
});
worker.on('completed', (job: Job, returnvalue: 'DONE') => {
console.debug(`Completed job "reply" with id ${job.id}`, returnvalue);
});
worker.on('active', (job: Job<unknown>) => {
console.debug(`Active job "reply" with id ${job.id}`);
});
worker.on('error', (failedReason: Error) => {
console.error(`Job encountered an error "reply"`, failedReason);
});
};
Job processor
中作为
arg
通过的
worker
import { Job } from 'bullmq';
import { someFunction } from '../../automate/someFunction';
export const replyJobProcessor = async (
job: Job<{
someId: string;
}>,
): Promise<'DONE'> => {
await job.log(`Started processing job with id ${job.id}`);
await someFunction({
someId: "id"
});
await job.log(`Finished processing job with id ${job.id}`);
return 'DONE';
};
FUNCTION
本身:export const someFunction = async ({ someId }: { someId: string }) => {
let incorrectData;
// Call API to get the data
if (incorrectData) {
someFunction({ someId });
return;
}
await continueToAnotherFunction({ someId });
return;
};
我希望尝试复制它,即使在查阅了 BullMq 的文档之后,我也不明白为什么我会处理两次。因为每当一个作业被处理两次时就会发生这些问题。但我已经在我的代码中确保代码不会运行两次。我什至看到,当一个作业被锁定并处于活动状态时,无论
job id
是否相同,它都不会处理该作业。
此外,我不会手动将作业移动到任何状态。我根本没接触过这方面。
我还能做什么来解决这个问题?请帮忙。
我面临着完全相同的问题。 在我看来,问题的根源在于处理器由于某种原因被阻塞,因此库解锁了消息并再次处理它。 也许处理器需要太长时间才能完成该操作。 我完全迷失了。