我是 MassTransit 和 Azure 服务总线的新手。
当由于意外异常或不幸的电源故障而导致接收到的消息失败时,我希望消息进入死信队列(重试N次后)。
我还想在死信队列中监听新消息。目标是在出现错误时进行一些内务处理/清理/任何事情。
我无法让我的消费者实际消费死信消息。消息移动到死信队列没问题......我只是无法让公共交通工具接收它们。
这是我启动的相关部分(.net6,program.cs,在 public static void Main(..) 内):
// All aboard
builder.Services.AddMassTransit(config =>
{
// Does not really apply since we name the queue explicitly
config.SetKebabCaseEndpointNameFormatter();
// A consumer (fully configured later)
config.AddConsumer<NormalHandler>();
config.AddConsumer<DeadletterHandler>();
// One service bus please
config.UsingAzureServiceBus((context, cfg) =>
{
// Service bus connection string
cfg.Host(builder.Configuration.GetConnectionString("ServiceBus"));
// Our request endpoint
cfg.ReceiveEndpoint("work-request", cfg =>
{
cfg.ConfigureConsumer<NormalHandler>(context);
cfg.ConfigureDeadLetterQueueDeadLetterTransport();
cfg.ConfigureDeadLetterQueueErrorTransport();
});
// Our request endpoint for unexpected errors
cfg.ReceiveEndpoint("work-request/$DeadLetterQueue", cfg =>
{
cfg.ConfigureConsumer<DeadletterHandler>(context);
});
// Finish up the config
cfg.ConfigureEndpoints(context);
});
});
我的假(针对这个问题)处理程序和消息:
public record WorkRequest(Guid id);
public class NormalHandler : IConsumer<WorkRequest>
{
public Task Consume(ConsumeContext<WorkRequest> context)
{
throw new NotImplementedException();
}
}
public class DeadletterHandler : IConsumer<WorkRequest>
{
public Task Consume(ConsumeContext<WorkRequest> context)
{
throw new NotImplementedException();
}
}
任何帮助将不胜感激!