我们使用NServiceBus 6来传输MSMQ和RabbitMq。当我们使用分布式事务的MSMQ时,我们从未在分布式事务提交之前调度消息。然而,当我们将其关闭并手动将其包装在事务范围内时,我们看到Nsb在处理程序执行结束之前开始发送消息。我们的代码看起来像这样
public Task Handle(ICommand1 message, IMessageHandlerContext context)
{
using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
{
HandleMessage1(message1);
_session.Flush();
tx.Commit();
}
}
private void HandleMessage1(ICommand1 message1)
{
// Updating database
...
// Sending other Command2 to separate handler
bus.Send<ICommand2>(x =>
{
...
});
}
从日志中我可以看到ICommand2开始处理早于ICommand1处理程序,该处理程序设法在数据库中提交数据更改,在Command1处理程序更新之前获取“旧”数据。
我的印象是我们不会遇到这类问题,因为Nsb6为我们提供了Batched message dispatch。看起来这不是我们的情况,我想知道为什么以及如何解决这个问题。我试图在MSMQ(没有distrib。事务)和RabbitMq下运行它,结果是一样的。
Command2处理程序适用于Command1处理程序所做的更改,那么我们如何使它们按顺序工作呢?
从处理程序发送消息应该使用传递给处理程序的context
来完成。当你使用bus.Send()
时,你可能会使用IMessageSession
而不是IMessageHandlerContext
,这相当于立即发送。
将代码更改为以下内容应解决此问题:
public async Task Handle(ICommand1 message, IMessageHandlerContext context)
{
using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
{
await HandleMessage1(message1, context);
_session.Flush();
tx.Commit();
}
}
private async Task HandleMessage1(ICommand1 message1, IMessageHandlerContext context)
{
// Updating database
...
// Sending other Command2 to separate handler
await context.Send<ICommand2>(x =>
{
...
});
}