我们正在尝试使用Saga序列化业务对象列表的处理。
现在,没有Saga,我们只需循环遍历一个对象列表,然后触发bus.Send(new ProcessBusinessObejct(obj))
异步以执行处理程序。所以处理或多或少并行发生,取决于这个设置,我相信:
endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );
这工作正常,但现在并发处理程序的数量很难在数据库上。
可以串行触发这些处理程序,即仅在当前进程完成(失败或成功)时继续执行下一个处理程序。我们不希望将并发性设置为1,它会影响端点中的所有处理程序。
我们的想法是使用Scatter / Gather模式和Saga来跟踪对象的数量,并使用计数(总计数,失败计数,成功计数)更新状态机,最后在列表为时触发事件做/空。
问题是
A)我不确定如何跟踪传奇中的列表。 SagaData需要一个List来保存所有对象吗?然后在处理程序发出完成处理信号时删除实例。该传奇不支持分层数据,因此不支持列表或列表。我相信在NSB v7中仍然如此。
并且B)这种使用saga是可行的还是过度的,还是有更简单的方法来实现这一点?
我们正在使用Sql Server持久性和传输以及NSB 7。
任何输入都非常感谢!
我想你是想做到这一点的。请注意,根据您使用的持久层,您可能需要将实际导入与更新saga状态分开。我在博客上写了这个here。
Saga数据也可以存储List,但我认为在大多数场景中你都可以获得计数。另一个重要的注意事项(尽管应该很明显)是,如果消息无法处理并进入错误队列(例如ImportData中未捕获的异常),则整个传奇将保持不完整状态,直到重试并处理该消息。
public class MySaga : Saga<MySagaData>
: IAmStartedByMessages<StartTheProcess>,
IHandleMessages<ImportData>,
IHandleMessages<ImportFinished>
{
public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
{
Data.ObjectsToImport = message.ObjectCount;
Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance
foreach(var id in message.ObjectIdsToImport)
{
await context.SendLocal(new ImportData
{
JobID = Data.JobID //You need this to correlate messages back to the saga
//Anything else you need to pass on to ImportData
ObjectIdToImport = id
}
});
}
public async Task Handle(ImportData message, IMessageHandlerContext context)
{
//import the data and increment the counter
var result = ImportData(message.ObjectIdToImport);
if(result == Result.Success)
{
Data.SuccessImport++;
}
else
{
Data.FailedImport++;
}
await CheckIfFinished(context);
}
public async Task Handle(ImportFinished message, IMessageHandlerContext context)
{
//do any post cleanups or Mark as complete
MarkAsComplete();
return Task.CompletedTask;
}
private async Task CheckIfFinished(IMessageHandlerContext context)
{
if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
{
//Everything is done
context.SendLocal(new ImportFinished { JobID = Data.JobID });
}
}
}