在队列之间移动消息rabbitMQ

问题描述 投票:0回答:5

我希望能够在 Rabbit 中的队列之间(手动)移动消息。

例如:

first-queue has messages ['a','b','c','d','e','f']
second-queue has messages ['x','y']

我希望能够将例如消息“a”从第一队列移动到第二队列。这可以是手动操作。两个队列都在同一个代理上,我不想通过任何交换机发送它们。有办法做到这一点吗?我一直在玩rabbitmqctl,但似乎无法让它工作。我对任何其他可以帮助我实现这一目标的工具持开放态度。最终我希望有某种消息选择器(例如将带有某些标头字段 = X 的所有消息从第一队列移动到第二队列)。

我对rabbitmq和amqp还是新手,但无法找到有关如何执行此操作的文档(如果可能的话)。

谢谢。

rabbitmq message messaging amqp
5个回答
16
投票

@Dax - 我刚刚在这里回答了同样的问题:是否可以在 RabbitMQ 队列之间移动/合并消息?

我在那里有很长的描述。为了避免重复的内容,我不想复制/粘贴。

听起来您正在寻找的是rabbitmq shovel插件。

它内置于核心中,只需启用它即可:

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

从 GUI 的管理部分,您将找到一个用于创建铲子的简单界面。

请参阅我的其他帖子!


8
投票

它没有被记录的事实是因为它离消息传递模型很远。

将消息发送到特定队列很容易 - 例如,请参阅教程#1 - 但读取消息的唯一方法是按照代理发送到客户端的顺序“使用”它们。 不允许像使用 SQL 那样从队列中

select

消息。 您可以做的是让客户端(或者最终是一个插件,但这是一个高级主题)

使用

队列中的消息,并根据某些规则将它们重新发布到后续队列或另一个。


2
投票

public void moveMessages( final String sourceQueueName, final String targetQueueName, final String rabbitmqHost, final String rabbitmqUsername, final String rabbitmqPassword, final String rabbitmqVirtualHost ) throws IOException { // Initialize the consuming and publishing channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost(rabbitmqHost); factory.setUsername(rabbitmqUsername); factory.setPassword(rabbitmqPassword); factory.setVirtualHost(rabbitmqVirtualHost); Connection connection = factory.newConnection(); Channel consumingChannel = connection.createChannel(); Channel publishingChannel = connection.createChannel(); while (true) { // Get the first message in the queue (auto ack = false) GetResponse response = consumingChannel.basicGet(sourceQueueName, false); if (response == null) { return; } BasicProperties properties = response.getProps(); // Publish the message to the origin queue publishingChannel.txSelect(); publishingChannel.basicPublish("", targetQueueName, (AMQP.BasicProperties) properties, response.getBody()); publishingChannel.txCommit(); // Acknowledge the message in the dead letter queue consumingChannel.basicAck(response.getEnvelope().getDeliveryTag(), false); } }



2
投票

curl -u "user:password" -vvv 'http://localhost:15672/api/parameters/shovel/%2Foms/Move%20from%20sourceQueue' -X PUT -H 'content-type: application/json' --data-binary ' { "component": "shovel", "vhost": "/vhost", "name": "Move from sourceQueue", "value": { "src-uri": "amqp:///%2Fvhost", "src-queue": "sourceQueue", "src-protocol": "amqp091", "src-prefetch-count": 1000, "src-delete-after": "queue-length", "dest-protocol": "amqp091", "dest-uri": "amqp:///%2Fvhost", "dest-add-forward-headers": false, "ack-mode": "on-confirm", "dest-queue": "destQueue" } } ' --compressed



0
投票

激活
    rabbitmq_shovel
  • 插件
    定义一个shovel,将所有消息从
  • QUEUE_TO_MIGRATE
  • (vhost:
    OLD_VHOST
    )迁移到
    QUEUE_DEST
    (vhost:
    NEW_VHOST
    
    
  • rabbitmqctl set_parameter shovel migrate-QUEUE_TO_MIGRATE \ '{"src-protocol": "amqp091", "src-uri": "amqp:///OLD_VHOST", "src-queue": "QUEUE_TO_MIGRATE", "dest-protocol": "amqp091", "dest-uri": "amqp:///NEW_VHOST", "dest-queue": "QUEUE_TO_MIGRATE"}'
确保
    QUEUE_TO_MIGRATE
  • 上不再有消息(虚拟主机:
    OLD_VHOST
    取下铲子
  • rabbitmqctl clear_parameter shovel migrate-QUEUE_TO_MIGRATE
来源:
RabbitMQ 官方博文

© www.soinside.com 2019 - 2024. All rights reserved.