如何使用MassTransit从RabbitMQ DeadLetter队列中检索消息?

问题描述 投票:0回答:1

我试图在一段时间后使用MassTransit和RabbitMQ收到消息时通知用户。

根据我的阅读,在发布消息时使用TimeToLive属性设置超时。当指定的时间用完时,消息应自动添加到死信队列中,最后以“_skipped”命名。

如何从Dead Letter队列中检索消息?在下面的尝试中,消息会立即添加到两个队列中,并且永远不会超时。

我想我可以使用传说来做到这一点,但对于这样一个简单的问题,它似乎是一个过于复杂的解决方案,所以我想尽可能避免使用它。

static void Main(string[] args)
{
    var bus = CreateBus("rabbitmq://localhost/", "guest", "guest", true);

    var msg = new TestMessage("First Message");
    LogMessageSent(msg);
    bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));

    Console.ReadKey();

    bus.Stop();

    bus = CreateBus("rabbitmq://localhost/", "guest", "guest", false);

    msg = new TestMessage("SecondMessage");
    LogMessageSent(msg);
    bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));

    Console.ReadKey();

    bus.Stop();
}

private static IBusControl CreateBus(string rabbitUrl, string username, string password, bool enableEndpoint)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(c =>
    {
        var host = c.Host(new Uri(rabbitUrl), h =>
        {
            h.Username(username);
            h.Password(password);
        });

        if (enableEndpoint)
        {
            c.ReceiveEndpoint(host, "TestQueue", x =>
            {
                x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
            });
        }

        c.ReceiveEndpoint(host, "TestQueue_skipped", x =>
        {
            x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_skipped"));
        });
    });

    bus.Start();

    return bus;
}

private static void LogMessageSent(TestMessage msg)
{
    Console.WriteLine(string.Format("{0} - Message \"{1}\" sent.", DateTime.Now.ToString("HH:mm:ss"), msg.Content));
}

private static Task LogMessageReceived(TestMessage msg, string queueName)
{
    Console.WriteLine(string.Format("{0} - Message \"{1}\" received on queue \"{2}\".", DateTime.Now.ToString("HH:mm:ss"), msg.Content, queueName));
    return Task.CompletedTask;
}

public class TestMessage
{
    public string Content { get; }

    public TestMessage(string content)
    {
        Content = content;
    }
}
c# rabbitmq masstransit
1个回答
1
投票

因为您正在调用Publish,所以该消息将发送给每个订阅者。由于每个接收端点都在添加使用者,因此会为该消息类型创建订阅(以及RabbitMQ中的后续交换绑定)。您可以通过在跳过的接收端点上指定BindMessageExchanges = false来禁用此功能。您需要手动删除代理上的交换绑定。

至于你的TimeToLive问题,那不是它的工作原理。 TimeToLive传递给代理,如果消息过期,则将其移动到代理指定的死信队列(如果已配置)。它不会移动到跳过的队列,该队列在MassTransit中具有不同的含义。在MassTransit中,跳过的队列用于传递到接收端点的消息,但在该端点上没有配置使用者来使用该消息。

对于RabbitMQ,您可以使用以下命令在MassTransit中配置死信队列:

endpoint.BindDeadLetterQueue("dead-letter-queue-name");

这将配置代理,以便将到达其TTL的消息移动到指定的交换/队列。然后,您在该接收端点上的消费者将能够使用它们(再次,确保在死信接收端点上设置BindMessageExchanges = false

例如:

c.ReceiveEndpoint(host, "TestQueue_expired", x =>
{
    x.BindMessageExchanges = false;
    x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_expired"));
});

然后你的原始接收端点:

c.ReceiveEndpoint(host, "TestQueue", x =>
{
    x.BindDeadLetterQueue("TestQueue_expired");
    x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});
© www.soinside.com 2019 - 2024. All rights reserved.