当我尝试在项目中将MassTransit软件包从版本[[3.2.4升级到版本[[6.2.3]时,一些现有代码停止工作。我使用ConnectHandler扩展名将消息处理程序附加到总线已经创建并配置了总线之后。它以前工作正常,但似乎不再工作了。向此扩展注册的处理程序不会触发。在两种MassTransit版本之间,在总线配置期间向ReceiveEndpoint注册处理程序的另一种方法似乎仍然可以正常工作。这是一些简化的代码:
[TestFixture]
public class When_MassTransit_handler_should_consume_published_message
{
[Test]
public void This_works_fine_with_MassTransit_3_2_4_but_never_enters_handler_with_MassTransit_6_2_3()
{
var someEvent = new SomeEvent { Data = 123 };
var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();
var bus = Bus.Factory.CreateUsingInMemory(config => { });
bus.Start();
bus.ConnectHandler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();
if (!tcs.Task.Wait(5000))
Assert.Fail("Event consuming takes too long (> 5000 ms)"); // fails here for 6.2.3 because the handler never fires
Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));
bus.Stop();
}
[Test]
public void This_works_fine_with_both_MassTransit_versions()
{
var someEvent = new SomeEvent { Data = 123 };
var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();
var bus = Bus.Factory.CreateUsingInMemory(config =>
{
config.ReceiveEndpoint("input_queue", endpoint =>
{
endpoint.Handler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
});
});
bus.Start();
bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();
if (!tcs.Task.Wait(5000))
Assert.Fail("Event consuming takes too long (> 5000 ms)");
Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));
bus.Stop();
}
class SomeEvent
{
public int Data;
}
}
MassTransit中是否有引起此行为的重大更改? MassTransit扩展方法是否确实存在问题,或者我的代码已过时并且应进行调整以匹配最新版本?需要进行哪些调整?在已经创建并配置了总线之后,是否还有其他方法可以将\ handler处理程序附加到总线上?await bus.ConnectReceiveEndpoint("queue-name", x =>
{
x.Handler<SomeEvent>(...);
});
对于直接发送到总线地址的消息,ConnectHandler
工作正常。只有已发布的消息不会到达,因为已将发布交换不再绑定到总线接收端点队列。