在EventActivityBinder.Produce方法中发送消息密钥

问题描述 投票:0回答:1

我正在开发一个使用 Masstransit 以及 Sagas 和 Kafka 的项目,并且偶然发现了一个问题。我正在尝试使用

EventActivityBinder.Produce
向某些 Kafka 主题生成消息。问题是定义的生产者需要指定一个消息密钥,而我无法找到使用此
EventActivityBinder.Produce
方法发送消息密钥的方法,因此它会引发一个异常,表明生产者未注册。

所以,我的问题是:是否可以使用上面提到的

produce
方法发送消息密钥?

这是我实现的方法:

public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
    this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
    var @event = binder.Produce(context =>
    {
        context.Saga.UpdatedAt = DateTime.Now;

        var @event = new
        {
            Success = false,
            context.Saga.Reason,
            __Header_Reason = context.Saga.Reason,
        };

        return context.Init<ResponseWrapper<OrderResponseEvent>>(@event);
    });

    return @event;
}

这是我的制作人注册

rider.AddProducer<string, ResponseWrapper<OrderResponseEvent>>(kafkaTopics.SourceSystemTopic);
c# .net apache-kafka masstransit
1个回答
0
投票

我最终使用了这个

BehaviorContext.GetServiceOrCreateInstance
方法,传递了我的制作人的注册签名。显然,它有效,但我绝对不相信这是最好的方法。我确实相信,理想情况下,
EventActivityBinder.Produce
方法应该有一个选项,我们可以在必要时传递密钥!但无论如何,这里是代码。

public static EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> NotifySourceSystem(
    this EventActivityBinder<OrderRequestSagaInstance, ErrorMessageEvent> binder)
{
    Func<BehaviorContext<OrderRequestSagaInstance>, Task> asyncAction = async context =>
    {        
        await context.GetServiceOrCreateInstance<ITopicProducer<string, ErrorMessageEvent>()
            .Produce(
                Guid.NewGuid().ToString(),
                context.Saga.NotificationReply!,
                context.CancellationToken)
            .ConfigureAwait(false);
    };
    
    return binder.Add(new AsyncActivity<OrderRequestSagaInstance>(asyncAction));
}
© www.soinside.com 2019 - 2024. All rights reserved.