RoutingSlipActivityCompleted不触发。

问题描述 投票:0回答:1

我添加了一个消费者来观察路由滑移事件,但没有达到预期的效果。RoutingSlipCompleted 消费者总是被触发。RoutingSlipActivityCompletedRoutingSlipActivityFaulted 消费者永远不会被触发。这是我的消费者代码。

public abstract class RoutingSlipExecuteActivityResponseProxy<TRequest, TResponse, TFaultResponse> :
    IConsumer<RoutingSlipActivityCompleted>,
    IConsumer<RoutingSlipActivityFaulted>,
    IConsumer<RoutingSlipCompleted>
    where TRequest : class
    where TResponse : class
    where TFaultResponse : class
{
    public abstract string ActivityName { get; }
    public async Task Consume(ConsumeContext<RoutingSlipActivityCompleted> context)
    {
        if(context.Message.ActivityName!= ActivityName)
        {
            return;
        }
        var request = context.Message.GetVariable<TRequest>("Request");
        var requestId = context.Message.GetVariable<Guid>("RequestId");

        Uri responseAddress = null;
        if (context.Message.Variables.ContainsKey("ResponseAddress"))
            responseAddress = context.Message.GetVariable<Uri>("ResponseAddress");

        if (responseAddress == null)
            throw new ArgumentException($"The response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}");

        var endpoint = await context.GetResponseEndpoint<TResponse>(responseAddress, requestId).ConfigureAwait(false);

        var response = await CreateResponseMessage(context, request);

        await endpoint.Send(response).ConfigureAwait(false);
    }

    public async Task Consume(ConsumeContext<RoutingSlipActivityFaulted> context)
    {
        if (context.Message.ActivityName != ActivityName)
        {
            return;
        }
        var request = context.Message.GetVariable<TRequest>("Request");
        var requestId = context.Message.GetVariable<Guid>("RequestId");

        Uri faultAddress = null;
        if (context.Message.Variables.ContainsKey("FaultAddress"))
            faultAddress = context.Message.GetVariable<Uri>("FaultAddress");
        if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress"))
            faultAddress = context.Message.GetVariable<Uri>("ResponseAddress");

        if (faultAddress == null)
            throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}");

        var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false);

        var response = await CreateFaultedResponseMessage(context, request, requestId);

        await endpoint.Send(response).ConfigureAwait(false);
    }
    protected abstract Task<TResponse> CreateResponseMessage(ConsumeContext<RoutingSlipActivityCompleted> context, TRequest request);

    protected abstract Task<TFaultResponse> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipActivityFaulted> context, TRequest request, Guid requestId);

    public Task Consume(ConsumeContext<RoutingSlipCompleted> context)
    {
        throw new NotImplementedException();
    }
}

我的活动没有额外的配置,基本上是按照文档写的。

masstransit
1个回答
0
投票

您可能需要 看看这个样本,它使用的是 RequestResponseProxy 来处理一个通过路由滑移的请求,然后根据RoutingSlipCompletedRoutingSlipFaulted事件生成响应。

© www.soinside.com 2019 - 2024. All rights reserved.