我正在开发一个使用 Masstransit 以及 Sagas 和 Kafka 的项目,并且偶然发现了一个问题。我正在尝试使用
EventActivityBinder.Produce
向某些 Kafka 主题生成消息。问题是定义的生产者需要指定一个消息密钥,而我无法找到使用此 EventActivityBinder.Produce
方法发送消息密钥的方法,因此它会引发一个异常,表明生产者未注册。
所以,我的问题是:是否可以使用上面提到的
produce
方法发送消息密钥?
这是我实现的方法:
public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
var @event = binder.Produce(context =>
{
context.Saga.UpdatedAt = DateTime.Now;
var @event = new
{
Success = false,
context.Saga.Reason,
__Header_Reason = context.Saga.Reason,
};
return context.Init<ResponseWrapper<OrderResponseEvent>>(@event);
});
return @event;
}
这是我的制作人注册
rider.AddProducer<string, ResponseWrapper<OrderResponseEvent>>(kafkaTopics.SourceSystemTopic);
我最终使用了这个
BehaviorContext.GetServiceOrCreateInstance
方法,传递了我的制作人的注册签名。显然,它有效,但我绝对不相信这是最好的方法。我确实相信,理想情况下,EventActivityBinder.Produce
方法应该有一个选项,我们可以在必要时传递密钥!但无论如何,这里是代码。
public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
Func<BehaviorContext<OrderRequestSagaInstance>, Task> asyncAction = async context =>
{
await context.GetServiceOrCreateInstance<ITopicProducer<string, ErrorMessageEvent>()
.Produce(
Guid.NewGuid().ToString(),
context.Saga.NotificationReply!,
context.CancellationToken)
.ConfigureAwait(false);
};
return binder.Add(new AsyncActivity<OrderRequestSagaInstance>(asyncAction));
}