我有两个应用程序通过RabbitMQ虚拟主机相互通信,使用Masstransit作为抽象层。
我正在尝试创建第三个应用程序,它将所有已发布的消息记录到此虚拟主机,而不知道这些消息类型是什么。
如果我事先知道这些消息,那将很容易;我只想创造一些消费者。不幸的是,情况并非如此。
我尝试了Observers,但是如果观察到的总线与发送消息的总线相同,它们似乎只能工作。所以这不适用于跨应用程序。
从rabbit's docs我发现使用兔子MQ绑定很容易,但是,这似乎不受支持:Publish message using exchange and routing key using MassTransit
我也试过:How to log all Rabbit MQ messages?,但它似乎也是一个死胡同,因为我想将自定义格式的数据记录到数据库中。
似乎我错过了一件小事,但在搜索了一整天之后,我还没有找到理想的结果。你能让我走上正轨吗?
对于总线配置,在所有3个应用程序中,我使用的是直接配置模式:
_bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
如果您知道使用服务的队列名称,则可以使用新队列创建新服务,该新队列将明确指定与其他服务队列的交换名称的绑定。这会将传递给该服务的每条消息的副本传递给新服务。
然后,该服务可以使用JToken
,它将消息的JSON主体传递给消费者。通过这样做,然后可以根据您的喜好翻译/存储每条消息。
class LogConsumer : IConsumer<JToken> {...}
然后,在接收端点中创建绑定:
cfg.ReceiveEndpoint(host, "log-queue", ep =>
{
ep.Bind("service1-queue");
ep.Bind("service2-queue");
ep.Consumer<LogConsumer>();
}
这应该足以让你入门!