返回作业实例的方法 this.aQueue.add(*args) 被无休止地等待。接收和反序列化数据后,我尝试在 app.service.ts 模块的 RequestService.sendData 方法中获取作业实例,但无法做到这一点,并且其余代码未运行
app.service.ts
import { Injectable } from '@nestjs/common';
import { Job, Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { plainToClass } from 'class-transformer';
import { RequestScheme, ResponseScheme } from './app.schemes';
@Injectable()
export class RequestService {
constructor(
@InjectQueue('request') private requestQueue: Queue
){}
async sendData(data: RequestScheme): Promise<ResponseScheme> {
let responseData: ResponseScheme
data = plainToClass(RequestScheme, data)
console.log("data in controller", data) // data is deserialized as i expect
const jobInstance = await this.requestQueue.add(
'request', data, { delay: data.wait }
) // this method is running and never awaited
console.log(`Job: ${jobInstance}`)
async function setJobData(jobInstance: Job){
return new Promise((resolve, reject) => {
this.requestQueue.on('completed', function(job: Job, result: ResponseScheme, error){
if (jobInstance.id == job.id) {
responseData = result
job.remove()
resolve(result)
}
if (error) reject(error)
})
})}
await setJobData(jobInstance)
return responseData
}
}
app.processor.ts
import { Job } from 'bull';
import {
Processor,
Process,
OnQueueActive,
OnQueueCompleted
} from '@nestjs/bull';
import { ResponseScheme } from './app.schemes';
@Processor('request')
export class RequestConsumer {
@Process('request')
async process_request(job: Job){
console.log(`Job ${job.id} proceed`)
}
@OnQueueActive()
onActive(job: Job){
console.log(`Data ${job.data} were sended`)
}
@OnQueueCompleted()
onComplete(job: Job){
const response = new ResponseScheme()
response.answer = job.data.answer
return response
}
}
app.module.ts
import { Global, Module, NestModule } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { RequestController } from './app.controller';
import { RequestService } from './app.service';
import { RequestConsumer } from './app.processor';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null
}
}),
BullModule.registerQueue({
name: 'request'
})
],
controllers: [RequestController],
providers: [
RequestService
],
exports: [
RequestService
]
})
export class AppModule {
configure(consumer: RequestConsumer){}
}
app.controller.ts
import { Job } from 'bull';
import { Body, Controller, Get, HttpException, HttpStatus, Post, Res } from '@nestjs/common';
import { RequestService } from './app.service';
import { RequestScheme, ResponseScheme } from './app.schemes';
@Controller('request')
export class RequestController {
constructor(private readonly requestService: RequestService) {}
@Get()//this works good
about(): string{
return "Hello! This is the request"
}
@Post()
async getHello(@Body() data: RequestScheme): Promise<ResponseScheme> {
console.log("POST", "data", data) //client data, good as it's expected
let responseData: ResponseScheme
responseData = await this.requestService.sendData(data)
return responseData
}
}
基于手册[https://docs.nestjs.com/techniques/queues],此方法是nestjs的标准方法。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true },
);
但是我的变体正在无休止地运行 - 无论我等待什么数据(字符串或数字):
const jobInstance = await this.requestQueue.add(
'request', data, { delay: data.wait }
)
另外,我尝试通过隐藏“添加作业”方法从 RequestService.sendData 获取硬编码数据,并且它有效。但我需要添加一份工作。
哦哦好吧。您想通过此设置实现什么目的? :)
不要误会我的意思,但从我的角度来看,您似乎正在从控制器分派作业,由处理器处理,该处理器除了 console.logging 之外不执行任何操作,但控制器需要等到作业完成完成并返回一些数据。
此外,promise 中的代码:
if (jobInstance.id == job.id) {
responseData = result
job.remove()
resolve(result)
}
应该更改
responseData
,它超出了其范围,并且从那里无法访问,因为该承诺甚至不在 sendData
的上下文中执行,而是从另一个承诺函数中运行,这也永远不会定义,因为您的处理器不返回任何内容。另外,job.remove()
本身就是一个承诺。
你还有一个
if-else
子句,上面写着:
jobInstance.id
与 job.id
相等,则做某事,jobInstance.id
与 job.id
不相等,但没有错误)- 不执行任何操作。等待。等待。等待。稍等一下;)还有一个在
async
函数内部带有 async
的 Promise 函数,这可能会导致许多奇怪的事情发生。另外,当你做async function xyz(...) { this.requestQueue...}
时,this
是什么意思?
如果您的目的是保留控制器直到作业完成,然后在控制器方法内部改变一些数据,以将其显示给最终用户,那么这绝对不是正确的方法。但我无法确定这是否真的是你的意图,所以我不知道这是否是你感兴趣的方向:)