使用NServicebus saga来序列化长时间运行的端点处理程序的执行

问题描述 投票:1回答:1

我们正在尝试使用Saga序列化业务对象列表的处理。

现在,没有Saga,我们只需循环遍历一个对象列表,然后触发bus.Send(new ProcessBusinessObejct(obj))异步以执行处理程序。所以处理或多或少并行发生,取决于这个设置,我相信:

endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );

这工作正常,但现在并发处理程序的数量很难在数据库上。

可以串行触发这些处理程序,即仅在当前进程完成(失败或成功)时继续执行下一个处理程序。我们不希望将并发性设置为1,它会影响端点中的所有处理程序。

我们的想法是使用Scatter / Gather模式和Saga来跟踪对象的数量,并使用计数(总计数,失败计数,成功计数)更新状态机,最后在列表为时触发事件做/空。

问题是

A)我不确定如何跟踪传奇中的列表。 SagaData需要一个List来保存所有对象吗?然后在处理程序发出完成处理信号时删除实例。该传奇不支持分层数据,因此不支持列表或列表。我相信在NSB v7中仍然如此。

并且B)这种使用saga是可行的还是过度的,还是有更简单的方法来实现这一点?

我们正在使用Sql Server持久性和传输以及NSB 7。

任何输入都非常感谢!

c# .net nservicebus nservicebus-sagas nservicebus7
1个回答
1
投票

我想你是想做到这一点的。请注意,根据您使用的持久层,您可能需要将实际导入与更新saga状态分开。我在博客上写了这个here

Saga数据也可以存储List,但我认为在大多数场景中你都可以获得计数。另一个重要的注意事项(尽管应该很明显)是,如果消息无法处理并进入错误队列(例如ImportData中未捕获的异常),则整个传奇将保持不完整状态,直到重试并处理该消息。

public class MySaga : Saga<MySagaData>
   : IAmStartedByMessages<StartTheProcess>,
     IHandleMessages<ImportData>,
     IHandleMessages<ImportFinished>
{
    public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
    {
        Data.ObjectsToImport = message.ObjectCount;
        Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance

        foreach(var id in message.ObjectIdsToImport)
        {
            await context.SendLocal(new ImportData
            {
                JobID = Data.JobID //You need this to correlate messages back to the saga
                //Anything else you need to pass on to ImportData
                ObjectIdToImport = id
            }
        });
    }

    public async Task Handle(ImportData message, IMessageHandlerContext context)
    {
        //import the data and increment the counter
        var result = ImportData(message.ObjectIdToImport);
        if(result == Result.Success)
        {
            Data.SuccessImport++;
        }
        else
        {
            Data.FailedImport++;
        }

        await CheckIfFinished(context);
    }

    public async Task Handle(ImportFinished message, IMessageHandlerContext context)
    {
        //do any post cleanups or Mark as complete 
        MarkAsComplete();
        return Task.CompletedTask;
    }

    private async Task CheckIfFinished(IMessageHandlerContext context)
    {
        if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
        {
            //Everything is done
            context.SendLocal(new ImportFinished { JobID = Data.JobID });
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.