如何使用分散/聚集模式在MassTransit 3.0中实现传奇

问题描述 投票:42回答:3

Jimmy Boagard描述了麦当劳快餐连锁店here将其与scatter gather pattern.进行比较

从上面文章中窃取的工作流图像:enter image description here

初步实施思路:

为所有食品站所能获得的所有类型的FoodOrdered事件建立一个共同界面,然后每个食品站将能够消费/创建其各自的项目并发布共同的完成事件。例如:薯条和汉堡站收到关于薯条顺序的消息,薯条站消耗该命令宣布该传奇正在监听的ItemDoneEvent。

最初的担忧:

由于佐贺并不关心所完成的食物类型,所有食物都已完成,这似乎是一个不错的解决方案。然而,在阅读了关于共享队列的警告here并注意到Consumer.Conditional filtering has been removed with MassTransit 3.0感觉好像框架说这种方法的“Bad Things(TM)将会发生”。但是我不知道如果不为厨房中的每个食品项目创建消息请求和响应并关联事件,您还能做到这一点。例如:FriesOrdered,BurgerOrdered FriesCooked,BurgerCooked。如果您必须为厨房中的每件物品做这件事,这将非常繁琐?

鉴于上述问题 - 这种类型的工作流程的一个好的传奇示例是什么样的?

c# masstransit saga
3个回答
1
投票

难道你不能“简单地”将队列中的对象作为事件参数传递吗?当saga监听器获得“订单已完成”事件时,它将包含在事件中完成的对象?

我想它是通过Generic方法发送到队列的,其中对象必须实现IFoodOrdered

然后你可以在实现一个虚拟方法,传奇可以用来做“通用”的东西,当它被拾取,你只需要为那些需要特殊事件的特殊项目实现重载?


0
投票

将完成事件踢回传奇的问题在于它会在共享资源(即传奇状态)上产生争用。

Jim在你引用的那篇文章之后发表了另一篇文章,概述了问题和解决方案。当然,他特别谈到NServiceBus,但问题和概念是一样的。

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

创建外部存储。为每个工作项记录一份记录。让每个工作人员设置自己的工作完成,而传奇有效地使用延迟消息进行民意调查,看看是否所有工作都已完成。

然后,您仍在进行分散 - 聚集,但“聚合器”已被进程管理器模式替换,以减少争用。


0
投票

我遇到了类似的问题 - 需要发布一些命令(所有相同的接口,IMyRequest)并等待所有。

实际上我的命令启动了其他传奇,它在处理结束时发布IMyRequestDone而没有标记传奇完成。 (需要在以后的某个时间完成它们。)因此,我只是查询子传奇实例的状态,而不是在父传奇中保存已完成的嵌套传奇的数量。

检查每个MyRequestDone消息:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
    // timeout for all requests
    x.Delay = TimeSpan.FromMinutes(10);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});


During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(MyRequestDone)
        .Then(context =>
        {
            if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
                throw new TimeoutException();
        })
        .If(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
            return allDone;
        }, x => x
            .Unschedule(FailSagaOnRequestsTimeout)
            .TransitionTo(Active))
        )
        .Catch<TimeoutException>(x => x.TransitionTo(Failed))
);

During(WaitingMyResponses,
    When(FailSagaOnRequestsTimeout.Received)
        .TransitionTo(Failed)

定期检查所有请求是否完成(通过“减少NServiceBus Saga加载”):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
    // check interval
    x.Delay = TimeSpan.FromSeconds(15);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});

During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(CheckAllRequestsDone.Recieved)
        .Then(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
            if (!allDone)           
            {
                if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)              
                    throw new TimeoutException();
                throw new NotAllDoneException();
            }
        })
        .TransitionTo(Active)
        .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
        .Catch<TimeoutException>(x => x.TransitionTo(Failed));
© www.soinside.com 2019 - 2024. All rights reserved.