我正在为使用 Masstransit 的消费者工作,对于特定情况,我希望能够在给定处理状态的情况下重新排队消息。 我在框架设置中插入了
AddDelayedMessageScheduler()
和 UseDelayedMessageScheduler()
并且我在消费者上使用了 await context.Defer(TimeSpan.FromSeconds(10))
方法。但是,我注意到调度程序仅适用于 Fanout 类型的交换。我没有在文档中找到有关此的详细信息。
是否真的有必要使用 Fanout 交换器,或者我没有注意到什么,我也可以使用直接类型的交换器吗?
我已经将 rabbitmq_delayed_message_exchange 插件添加到我的本地 rabbitmq 并配置 masstransit 如下
services.AddMassTransit(bus =>
{
bus.AddDelayedMessageScheduler();
bus.AddConsumer<SomeConsumer>();
bus.UsingRabbitMq((context, config) =>
{
config.UseDelayedMessageScheduler();
config.ConfigureEndpoints(context);
config.ReceiveEndpoint("my-queue", _ =>
{
_.ExchangeType = ExchangeType.Direct;
_.ConfigureConsumeTopology = false;
_.PrefetchCount = consumerOptions.NumberOfMessages;
_.UseRawJsonSerializer();
_.ConfigureConsumer<SomeConsumer>(context, _ =>
{
_.UseConcurrentMessageLimit(2);
});
});
config.ConfigureEndpoints(context);
});
});
我的消费者看起来像这样:
public sealed class SomeConsumer : IConsumer<SomeMessage>
{
public async Task Consume(ConsumeContext<SomeMessage> context)
{
var result = await DoSomethingAsync();
if (result.ShouldRedelivery())
{
await context.Redeliver(TimeSpan.FromSeconds(30));
}
}
}