我希望能够在 Rabbit 中的队列之间(手动)移动消息。
例如:
first-queue has messages ['a','b','c','d','e','f']
second-queue has messages ['x','y']
我希望能够将例如消息“a”从第一队列移动到第二队列。这可以是手动操作。两个队列都在同一个代理上,我不想通过任何交换机发送它们。有办法做到这一点吗?我一直在玩rabbitmqctl,但似乎无法让它工作。我对任何其他可以帮助我实现这一目标的工具持开放态度。最终我希望有某种消息选择器(例如将带有某些标头字段 = X 的所有消息从第一队列移动到第二队列)。
我对rabbitmq和amqp还是新手,但无法找到有关如何执行此操作的文档(如果可能的话)。
谢谢。
@Dax - 我刚刚在这里回答了同样的问题:是否可以在 RabbitMQ 队列之间移动/合并消息?
我在那里有很长的描述。为了避免重复的内容,我不想复制/粘贴。
听起来您正在寻找的是rabbitmq shovel插件。
它内置于核心中,只需启用它即可:
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
从 GUI 的管理部分,您将找到一个用于创建铲子的简单界面。
请参阅我的其他帖子!
它没有被记录的事实是因为它离消息传递模型很远。
将消息发送到特定队列很容易 - 例如,请参阅教程#1 - 但读取消息的唯一方法是按照代理发送到客户端的顺序“使用”它们。 不允许像使用 SQL 那样从队列中
select消息。 您可以做的是让客户端(或者最终是一个插件,但这是一个高级主题)
使用队列中的消息,并根据某些规则将它们重新发布到后续队列或另一个。
public void moveMessages(
final String sourceQueueName,
final String targetQueueName,
final String rabbitmqHost,
final String rabbitmqUsername,
final String rabbitmqPassword,
final String rabbitmqVirtualHost
) throws IOException {
// Initialize the consuming and publishing channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitmqHost);
factory.setUsername(rabbitmqUsername);
factory.setPassword(rabbitmqPassword);
factory.setVirtualHost(rabbitmqVirtualHost);
Connection connection = factory.newConnection();
Channel consumingChannel = connection.createChannel();
Channel publishingChannel = connection.createChannel();
while (true) {
// Get the first message in the queue (auto ack = false)
GetResponse response = consumingChannel.basicGet(sourceQueueName, false);
if (response == null) {
return;
}
BasicProperties properties = response.getProps();
// Publish the message to the origin queue
publishingChannel.txSelect();
publishingChannel.basicPublish("", targetQueueName, (AMQP.BasicProperties) properties, response.getBody());
publishingChannel.txCommit();
// Acknowledge the message in the dead letter queue
consumingChannel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
}
curl
-u "user:password"
-vvv 'http://localhost:15672/api/parameters/shovel/%2Foms/Move%20from%20sourceQueue'
-X PUT
-H 'content-type: application/json'
--data-binary '
{
"component": "shovel",
"vhost": "/vhost",
"name": "Move from sourceQueue",
"value": {
"src-uri": "amqp:///%2Fvhost",
"src-queue": "sourceQueue",
"src-protocol": "amqp091",
"src-prefetch-count": 1000,
"src-delete-after": "queue-length",
"dest-protocol": "amqp091",
"dest-uri": "amqp:///%2Fvhost",
"dest-add-forward-headers": false,
"ack-mode": "on-confirm",
"dest-queue": "destQueue"
}
}
' --compressed
激活
rabbitmq_shovel
QUEUE_TO_MIGRATE
OLD_VHOST
)迁移到QUEUE_DEST
(vhost:NEW_VHOST
)
rabbitmqctl set_parameter shovel migrate-QUEUE_TO_MIGRATE \
'{"src-protocol": "amqp091", "src-uri": "amqp:///OLD_VHOST", "src-queue": "QUEUE_TO_MIGRATE",
"dest-protocol": "amqp091", "dest-uri": "amqp:///NEW_VHOST", "dest-queue": "QUEUE_TO_MIGRATE"}'
QUEUE_TO_MIGRATE
OLD_VHOST
)取下铲子rabbitmqctl clear_parameter shovel migrate-QUEUE_TO_MIGRATE
来源:RabbitMQ 官方博文