“添加工作”方法永远不会被等待

问题描述 投票:0回答:1

返回作业实例的方法 this.aQueue.add(*args) 被无休止地等待。接收和反序列化数据后,我尝试在 app.service.ts 模块的 RequestService.sendData 方法中获取作业实例,但无法做到这一点,并且其余代码未运行

app.service.ts

import { Injectable } from '@nestjs/common';
import { Job, Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { plainToClass } from 'class-transformer';
import { RequestScheme, ResponseScheme } from './app.schemes';
 
 
@Injectable()
export class RequestService {
  constructor(
    @InjectQueue('request') private requestQueue: Queue
  ){}
 
   async sendData(data: RequestScheme): Promise<ResponseScheme> {
    let responseData: ResponseScheme
    data = plainToClass(RequestScheme, data)
    console.log("data in controller", data) // data is deserialized as i expect
 
    const jobInstance = await this.requestQueue.add(
      'request', data, { delay: data.wait }
    ) // this method is running and never awaited
 
    console.log(`Job: ${jobInstance}`)
 
    async function setJobData(jobInstance: Job){
      return new Promise((resolve, reject) => {
        this.requestQueue.on('completed', function(job: Job, result: ResponseScheme, error){
        if (jobInstance.id == job.id) {
          responseData = result
          job.remove()
          resolve(result)
        }
        if (error) reject(error)
      })
    })}
    
    await setJobData(jobInstance)
    return responseData
   }
  }

app.processor.ts

import { Job } from 'bull';
import { 
  Processor, 
  Process, 
  OnQueueActive,
  OnQueueCompleted
} from '@nestjs/bull';
import { ResponseScheme } from './app.schemes';
 
@Processor('request')
export class RequestConsumer {
 
  @Process('request')
  async process_request(job: Job){
    console.log(`Job ${job.id} proceed`)
  }
 
  @OnQueueActive()
  onActive(job: Job){
    console.log(`Data ${job.data} were sended`)
  }
 
  @OnQueueCompleted()
  onComplete(job: Job){
    const response = new ResponseScheme()
    response.answer = job.data.answer
    return response
  }
}

app.module.ts

import { Global, Module, NestModule } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { RequestController } from './app.controller';
import { RequestService } from './app.service';
import { RequestConsumer } from './app.processor';
 
@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
        maxRetriesPerRequest: null
      }
    }),
    BullModule.registerQueue({
      name: 'request'
    })
  ],
  controllers: [RequestController],
  providers: [
    RequestService
  ],
  exports: [
    RequestService
  ]
})
export class AppModule {
  configure(consumer: RequestConsumer){}
}

app.controller.ts

import { Job } from 'bull';
import { Body, Controller, Get, HttpException, HttpStatus, Post, Res } from '@nestjs/common';
import { RequestService } from './app.service';
import { RequestScheme, ResponseScheme } from './app.schemes';
 
@Controller('request')
export class RequestController {
  constructor(private readonly requestService: RequestService) {}
 
  @Get()//this works good
  about(): string{
    return "Hello! This is the request"
  }
 
  @Post()
  async getHello(@Body() data: RequestScheme): Promise<ResponseScheme> {
    console.log("POST", "data", data) //client data, good as it's expected
    let responseData: ResponseScheme
    responseData = await this.requestService.sendData(data)
    return responseData
  }
}

基于手册[https://docs.nestjs.com/techniques/queues],此方法是nestjs的标准方法。

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { lifo: true },
);

但是我的变体正在无休止地运行 - 无论我等待什么数据(字符串或数字):

const jobInstance = await this.requestQueue.add(
      'request', data, { delay: data.wait }
    )

另外,我尝试通过隐藏“添加作业”方法从 RequestService.sendData 获取硬编码数据,并且它有效。但我需要添加一份工作。

nestjs
1个回答
0
投票

哦哦好吧。您想通过此设置实现什么目的? :)

不要误会我的意思,但从我的角度来看,您似乎正在从控制器分派作业,由处理器处理,该处理器除了 console.logging 之外不执行任何操作,但控制器需要等到作业完成完成并返回一些数据。

此外,promise 中的代码:

        if (jobInstance.id == job.id) {
          responseData = result
          job.remove()
          resolve(result)
        }

应该更改

responseData
,它超出了其范围,并且从那里无法访问,因为该承诺甚至不在
sendData
的上下文中执行,而是从另一个承诺函数中运行,这也永远不会定义,因为您的处理器不返回任何内容。另外,
job.remove()
本身就是一个承诺。

你还有一个

if-else
子句,上面写着:

  1. 如果
    jobInstance.id
    job.id
    相等,则做某事,
  2. 在其他情况下,如果我们有错误,则拒绝,
  3. 在所有其他情况下(
    jobInstance.id
    job.id
    不相等,但没有错误)- 不执行任何操作。等待。等待。等待。稍等一下;)

还有一个在

async
函数内部带有
async
的 Promise 函数,这可能会导致许多奇怪的事情发生。另外,当你做
async function xyz(...) { this.requestQueue...}
时,
this
是什么意思?


如果您的目的是保留控制器直到作业完成,然后在控制器方法内部改变一些数据,以将其显示给最终用户,那么这绝对不是正确的方法。但我无法确定这是否真的是你的意图,所以我不知道这是否是你感兴趣的方向:)

© www.soinside.com 2019 - 2024. All rights reserved.