错误:使用rabbitmq时读取ECONNRESET

问题描述 投票:0回答:1

我正在 MERN 堆栈中构建一个微服务应用程序,并在运行该服务时使用 RabbitMQ 作为消息代理,一段时间后我收到错误 错误:读取 ECONNRESET 在 TLSWrap.onStreamRead (节点:内部/stream_base_commons:217:20){ 错误号:-4077, 代码:'经济重置', 系统调用:“读” } 这是我的代码

import * as amqp from 'amqplib';
import dotenv from 'dotenv'
dotenv.config();


const rabbitURL: any = process.env.RABBIT_MQ
class RabbitMQ {
  private static connection: amqp.Connection | null = null;

  static async getConnection(): Promise<amqp.Connection> {
    try {
      if (!RabbitMQ.connection) {
        RabbitMQ.connection = await amqp.connect(rabbitURL);
      }
      return RabbitMQ.connection as amqp.Connection
    } catch (error) {
      console.error(error);
      throw error;
    }
  }

  static async createChannel(): Promise<amqp.Channel> {
    try {
      const connection = await RabbitMQ.getConnection();
      return connection.createChannel();
    } catch (error) {
      console.error(error);
      throw error;
    }
  }
}

export default RabbitMQ;

这是消费者文件

async gigCreatedConsumer(){
    try{
        console.log("starting rabbit mq channel ");
        const channel = await RabbitMQ.createChannel();
        const exchangeName = 'gig-exchange';
        const queueName = 'gig-service-queue';
        await channel.assertExchange(exchangeName, 'direct', {durable: false});
        const {queue} = await channel.assertQueue(queueName, {durable: false});
        const routingKey = 'gig-created';
        await channel.bindQueue(queue ,exchangeName, routingKey);
        return new Promise((resolve ,reject)=>{
            channel.consume(queue, (message)=>{
                if(message){
                    try {
                        const createdGig: any = JSON.parse(message.content.toString());
                        channel.ack(message);
                        resolve(createdGig)
                    } catch (error) {
                        console.error("error processing gig creation");
                        channel.ack(message);
                        reject(error)
                    }
                }
            })
        })
        await channel.close()
        
    }catch(err){
        console.error("error setting up consumer", err)
    }
},

我已经在控制器中调用了创建的函数,并在index.ts文件中调用了该函数

索引.ts

const app = express();
app.use(cors())


dotenv.config()
app.use(express.urlencoded({ extended: true }))
app.use(express.json());
const PORT = process.env.PORT || 8001;

userController.setup()
userController.gigAccept()
userController.gigReject()
userController.gigDeleteEvent()
app.use(router)
app.use(userRouter)

控制器.ts

  async gigDeleteEvent() {
    try {
      const data: any = await userGigConsumers.gigDeleteConsumer();
      const gigId = data;

      const objId = await GigUserModel.find({ refId: gigId })
      const gig = await GigUserModel.findByIdAndDelete(objId[0]._id);
      console.log("gig deleted from usergig database");
    } catch (error) {
      console.log(error);

    }
  },
node.js rabbitmq microservices express-gateway
1个回答
0
投票

我猜你必须启用心跳 你写“我在一段时间后收到错误”-->持续多长时间?

© www.soinside.com 2019 - 2024. All rights reserved.