我正在使用 MassTransit 8.0.10 和 Kafka 作为骑手,我想要完成的是处理错误条件下无法处理的消息,并对它们做一些事情(比如将它们记录到死信话题。
这是我到目前为止所做的:
Action<IRetryConfigurator> retryConfigurator = configurator =>
{
configurator.Incremental(
2,
TimeSpan.FromMilliseconds(5000),
TimeSpan.FromMilliseconds(5000));
};
internal class FaultConsumer<TMessage> : IConsumer<Fault<TMessage>>
where TMessage : class
massTransit.AddConsumer<FaultConsumer<TMessageType>>(); //massTransit is IRegistrationConfigurator
到目前为止,我还无法让我的过错消费者介入。
我错过了什么? 将 MassTransit 与 Kafka 结合使用时是否支持此功能?