NServiceBus中早期消息调度的问题6.批量消息调度不起作用?

问题描述 投票:0回答:1

我们使用NServiceBus 6来传输MSMQ和RabbitMq。当我们使用分布式事务的MSMQ时,我们从未在分布式事务提交之前调度消息。然而,当我们将其关闭并手动将其包装在事务范围内时,我们看到Nsb在处理程序执行结束之前开始发送消息。我们的代码看起来像这样

public Task Handle(ICommand1 message, IMessageHandlerContext context)
{
   using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
   {    
       HandleMessage1(message1);

       _session.Flush();
       tx.Commit();
    }   
}

private void HandleMessage1(ICommand1 message1)
{
    // Updating database
    ...
    // Sending other Command2 to separate handler
    bus.Send<ICommand2>(x =>
    {
       ...
    });
}

从日志中我可以看到ICommand2开始处理早于ICommand1处理程序,该处理程序设法在数据库中提交数据更改,在Command1处理程序更新之前获取“旧”数据。

我的印象是我们不会遇到这类问题,因为Nsb6为我们提供了Batched message dispatch。看起来这不是我们的情况,我想知道为什么以及如何解决这个问题。我试图在MSMQ(没有distrib。事务)和RabbitMq下运行它,结果是一样的。

Command2处理程序适用于Command1处理程序所做的更改,那么我们如何使它们按顺序工作呢?

nservicebus
1个回答
2
投票

从处理程序发送消息应该使用传递给处理程序的context来完成。当你使用bus.Send()时,你可能会使用IMessageSession而不是IMessageHandlerContext,这相当于立即发送。

将代码更改为以下内容应解决此问题:

public async Task Handle(ICommand1 message, IMessageHandlerContext context)
{
   using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
   {    
       await HandleMessage1(message1, context);

       _session.Flush();
       tx.Commit();
    }   
}

private async Task  HandleMessage1(ICommand1 message1, IMessageHandlerContext context)
{
    // Updating database
    ...
    // Sending other Command2 to separate handler
    await context.Send<ICommand2>(x =>
    {
       ...
    });
}

© www.soinside.com 2019 - 2024. All rights reserved.