Nest.js RabbitMQ 未发送/接收消息

问题描述 投票:0回答:2

我有两个带有 Nest.js 的微服务,它们都连接到 RabbitMQ 服务(一个是发布者,一个是接收者) 我试图从发布者那里向接收者发送消息,但似乎它根本没有做任何事情

出版商:

auth.module.ts :
 imports:[ClientsModule.registerAsync([
      {
        name: 'RMQ_SERVICE',
        imports: [ConfigModule],
        useFactory: (configService: ConfigService) => ({
          transport: Transport.RMQ,
          options: {
            urls: [`amqp://${configService.get('RMQ_HOST')}:5672`],
            queue: 'api-queue',
            queueOptions: {
              durable: false,
            },
          },
        }),
        inject: [ConfigService],
      },
    ]),
]

auth.service.ts : 
@Inject('RMQ_SERVICE') private readonly client: ClientProxy,

and using it like that : 
 this.client.send({ cmd: 'create-user-data' },{});

接收者:

main.ts : 
app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.RMQ,
    options: {
      noAck: false,
      urls: [`amqp://${process.env.RMQ_HOST}:5672`],
      queue: 'api-queue',
      queueOptions: {
        durable: false,
      },
    },
  });
  await app.startAllMicroservices();

users-data.controler.ts : 
  @MessagePattern({ cmd: 'create-user-data'})
  async createUserData() {
    console.log('create-user-data');
  }

看不到任何错误,我也有rabbitmq网络监视器,看不到任何消息

知道出了什么问题吗?

如果使用了emit和EventPattern它的工作原理我不明白为什么?

rabbitmq nestjs
2个回答
5
投票

好吧,经过长时间的挖掘,我只需要做

const result = await this.client.send(
        { cmd: 'create-user-data' },
        {
          userId: user.id,
          provider,
          ...socialUser,
        },
      );
      await result.subscribe();

我在接收器上收到了消息


0
投票

我将与您分享我的

rabbitmq.service.ts
,您可以使用它向带有
event name
和您的
payload
的队列发送消息,在我的服务中我正在使用
lastValueFrom
observer
转换为
promise
也是。

import { Inject, Injectable, Logger } from '@nestjs/common';
import { lastValueFrom } from 'rxjs';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class RabbitMQService {
  private readonly logger = new Logger(RabbitMQService.name);
  constructor(
    @Inject("queue-name")
    private readonly recurringClient: ClientProxy,
  ) {}
  async sendMessage(eventName: any, payload: object) {
    try {
      await lastValueFrom(this.recurringClient.emit(eventName, payload));
    } catch (error) {
      this.logger.error(
        `Problem with RMQ in ${eventName} error: ${JSON.stringify(error)}`,
      );
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.