大众交通发布过滤器替换消息

问题描述 投票:0回答:1

MassTransit 是否允许构建发布过滤器,以便在发送之前可以用另一条消息替换原始消息?

我们将 MassTransit 与 Azure 服务总线结合使用。服务总线中的最大消息大小有限制。 我们发现某些消息超出了最大消息大小限制。 为了解决这个问题,我想实现一个发布过滤器。该过滤器将:

  • 将消息放入存储中。
  • 用相同类型的“空”实例替换原始消息。
  • 发送替换的消息而不是原始消息。

然后消费方在收到消息后,会尝试通过消息id从存储中检索消息正文。

我尝试使用

PublishContextProxy
,它允许传递原始上下文和消息,但这没有帮助,因为来自原始上下文的原始消息被发布,而不是我在构造函数中传递的“虚拟”消息,例如

public class MyEventEventPublishFilter<T> : IFilter<PublishContext<T>>
where T : class 
{
    private readonly IStorageService _storage;

    public CampaignTraffickingEventPublishFilter(IStorageService storage)
    {
        _storage = storage;
    }

    public async Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next)
    {
        if (context is PublishContext<MyBigEvent> { Message: var msg })
        {
            await _storageService.SetAsync(context.MessageId, evt);

            var newContext = new PublishContextProxy<MyBigEvent>(
                context,
                new MyBigEvent(evt.Id, null, null, 0, null, null, null, null)); // this attempts to replace the message
            await next.Send((newContext as PublishContextProxy<T>)!);
            evt.Fields = null;

            return;
        }

        await next.Send(context);
    }

}

这会导致原始的“非空”消息被发布到主题......

azure azureservicebus masstransit
1个回答
0
投票

基于 Chris Patterson 的评论并查看 TransformFilter 后, 以下实施有效:

var newContext = context.CreateProxy(new MyBigEvent(evt.Id, null, null, 0, null, null, null, null)); 

这会产生一个新的上下文,其中包含预期的替换消息。

© www.soinside.com 2019 - 2024. All rights reserved.