我是 RabbitMQ 的新手,我想这应该是一个简单的问题,但我已经坚持了很长一段时间。
我有这个 rabbitMQ docker 容器正在运行:
version: '3'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
ports:
- 5672:5672
- 15672:15672
volumes:
- ./.rabbitmq/data/:/var/lib/rabbitmq
- ./.rabbitmq/log/:/var/log/rabbitmq
它运行良好。
我有一个 nodejs 脚本
put_message_into_queue.ts
,它连接到它并尝试将一些消息放入队列中:
import amqplib from "amqplib";
const RabbitMQ_URL =
process.env.MANAGER_RABBITMQ_URL || "amqp://localhost:5672";
const JOB_QUEUE_NAME = process.env.JOB_QUEUE_NAME || "job_queue";
console.log(`RabbitMQ_URL: ${RabbitMQ_URL}`);
console.log(`JOB_QUEUE_NAME: ${JOB_QUEUE_NAME}`);
const test = async () => {
try {
// Connect to the rabbitMQ server
const connection = await amqplib.connect(RabbitMQ_URL);
// Create a channel
const channel = await connection.createChannel();
// Assert the queue
await channel.assertQueue(JOB_QUEUE_NAME, { durable: true });
// await channel.assertQueue(REPORT_QUEUE_NAME, { durable: true });
// Send the result to the manager's result queue
const test_message = "Hello 123";
console.log(`test_message: ${test_message}`);
await channel.sendToQueue(REPORT_QUEUE_NAME, Buffer.from(test_message));
console.log(`test_message is sent to queue`);
} catch (error) {
console.log(error);
}
};
test();
现在,当我运行这个脚本时:
john@MacBook-Pro:~/myapp/test_scripts$ npx ts-node put_message_into_queue.ts
npx: installed 17 in 3.75s
RabbitMQ_URL: amqp://localhost:5672
JOB_QUEUE_NAME: job_queue
test_message: Hello 123
test_message is sent to queue
如您所见,代码运行良好,没有出现任何错误。
我还有一些工作代码在同一个队列中消费消息:
import amqplib from 'amqplib';
import { processTask } from 'services/TaskProcessor';
import { Task, TaskProgress } from 'interfaces/task';
import { stopTask } from 'utils/AWS/ECSController';
import { AutomaticShutDownCounter } from 'services/counter';
const RabbitMQ_URL = process.env.MANAGER_RABBITMQ_URL || 'amqp://localhost:5672';
const JOB_QUEUE_NAME = process.env.JOB_QUEUE_NAME || 'job_queue';
const worker_start = async () => {
try {
// Connect to the rabbitMQ server
const connection = await amqplib.connect(RabbitMQ_URL);
// Create a channel
const channel = await connection.createChannel();
// Assert the queue
await channel.assertQueue(JOB_QUEUE_NAME, { durable: true });
console.log(`RabbitMQ_URL: ${RabbitMQ_URL}`);
console.log(`JOB_QUEUE_NAME: ${JOB_QUEUE_NAME}`);
// Consume the queue
channel.consume(
JOB_QUEUE_NAME,
message => {
console.log(`damn!!!`);
// Parse the message
const parsedMessage = JSON.parse(message.content.toString());
// Log the message
console.log(`this is parsedMessage: ${parsedMessage}`);
// Acknowledge the message
channel.ack(message);
},
{ noAck: false },
);
} catch (error) {
console.log(error);
}
};
worker_start();
它也运行良好:
john@MacBook-Pro:~/myapp/worker$ nodemon --exec ts-node --files src/index.ts
[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: ts,json
[nodemon] starting `ts-node --files src/index.ts`
RabbitMQ_URL: amqp://localhost:5672
JOB_QUEUE_NAME: job_queue
但是测试脚本将消息
Hello 123
发送到队列后,worker代码根本没有响应,好像队列一直是空的;
我在浏览器的
http://127.0.0.1:15672/#/
处检查了控制台窗口,发现所有内容都是 0 - 没有活动连接,没有频道,根本没有队列。
这里到底出了什么问题?