重新传递公共交通信息

问题描述 投票:0回答:0

我正在为使用 Masstransit 的消费者工作,对于特定情况,我希望能够在给定处理状态的情况下重新排队消息。 我在框架设置中插入了

AddDelayedMessageScheduler()
UseDelayedMessageScheduler()
并且我在消费者上使用了
await context.Defer(TimeSpan.FromSeconds(10))
方法。但是,我注意到调度程序仅适用于 Fanout 类型的交换。我没有在文档中找到有关此的详细信息。 是否真的有必要使用 Fanout 交换器,或者我没有注意到什么,我也可以使用直接类型的交换器吗?

我已经将 rabbitmq_delayed_message_exchange 插件添加到我的本地 rabbitmq 并配置 masstransit 如下

services.AddMassTransit(bus =>
    {
        bus.AddDelayedMessageScheduler();

        bus.AddConsumer<SomeConsumer>();

        bus.UsingRabbitMq((context, config) =>
        {
            config.UseDelayedMessageScheduler();
            config.ConfigureEndpoints(context);

            config.ReceiveEndpoint("my-queue", _ =>
            {
                _.ExchangeType = ExchangeType.Direct;
                _.ConfigureConsumeTopology = false;
                _.PrefetchCount = consumerOptions.NumberOfMessages;
                _.UseRawJsonSerializer();

                _.ConfigureConsumer<SomeConsumer>(context, _ =>
                {
                    _.UseConcurrentMessageLimit(2);
                });
            });
            config.ConfigureEndpoints(context);
        });
    });

我的消费者看起来像这样:

public sealed class SomeConsumer : IConsumer<SomeMessage>
{
    public async Task Consume(ConsumeContext<SomeMessage> context)
    {
        var result = await DoSomethingAsync();
        if (result.ShouldRedelivery())
        {
           await context.Redeliver(TimeSpan.FromSeconds(30));
        }
    }
}
.net-core rabbitmq masstransit schedule
© www.soinside.com 2019 - 2024. All rights reserved.