MassTransit 是否允许构建发布过滤器,以便在发送之前可以用另一条消息替换原始消息?
我们将 MassTransit 与 Azure 服务总线结合使用。服务总线中的最大消息大小有限制。 我们发现某些消息超出了最大消息大小限制。 为了解决这个问题,我想实现一个发布过滤器。该过滤器将:
然后消费方在收到消息后,会尝试通过消息id从存储中检索消息正文。
我尝试使用
PublishContextProxy
,它允许传递原始上下文和消息,但这没有帮助,因为来自原始上下文的原始消息被发布,而不是我在构造函数中传递的“虚拟”消息,例如
public class MyEventEventPublishFilter<T> : IFilter<PublishContext<T>>
where T : class
{
private readonly IStorageService _storage;
public CampaignTraffickingEventPublishFilter(IStorageService storage)
{
_storage = storage;
}
public async Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next)
{
if (context is PublishContext<MyBigEvent> { Message: var msg })
{
await _storageService.SetAsync(context.MessageId, evt);
var newContext = new PublishContextProxy<MyBigEvent>(
context,
new MyBigEvent(evt.Id, null, null, 0, null, null, null, null)); // this attempts to replace the message
await next.Send((newContext as PublishContextProxy<T>)!);
evt.Fields = null;
return;
}
await next.Send(context);
}
}
这会导致原始的“非空”消息被发布到主题......
基于 Chris Patterson 的评论并查看 TransformFilter 后, 以下实施有效:
var newContext = context.CreateProxy(new MyBigEvent(evt.Id, null, null, 0, null, null, null, null));
这会产生一个新的上下文,其中包含预期的替换消息。