NServiceBus事件在单独的线程中发布时丢失

问题描述 投票:5回答:1

我一直致力于在Azure传输上使用NServiceBus获取长时间运行的消息。基于this document,我想我可以在一个单独的线程中解雇长进程,将事件处理程序任务标记为完成,然后侦听自定义OperationStarted或OperationComplete事件。我注意到大多数情况下我的处理程序都没有收到OperationComplete事件。事实上,唯一一次收到的是我在发布OperationStarted事件后立即发布它。之间的任何实际处理都会以某种方式阻止接收完成事件。这是我的代码:

用于长时间运行消息的抽象类

public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
    protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();

    public Task Handle(TMessage message, IMessageHandlerContext context)
    {
        var opStarted = new OperationStarted
        {
            OperationID = Guid.NewGuid(),
            OperationType = typeof(TMessage).FullName
        };
        var errors = new List<string>();
        // Fire off the long running task in a separate thread
        Task.Run(() =>
            {
                try
                {
                    _logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
                    context.Publish(opStarted);
                    ProcessMessage(message, context);
                }
                catch (Exception ex)
                {
                    errors.Add(ex.Message);
                }
                finally
                {
                    var opComplete = new OperationComplete
                    {
                        OperationType = typeof(TMessage).FullName,
                        OperationID = opStarted.OperationID,
                        Errors = errors
                    };

                    context.Publish(opComplete);

                    _logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
                }
            });

        return Task.CompletedTask;
    }

    protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}

测试实施

public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
    protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
    {
        // If I remove this, or lessen it to something like 200 milliseconds, the 
        // OperationComplete event gets handled
        Thread.Sleep(1000);
    }
}

操作事件

public sealed class OperationComplete : IEvent
{
    public Guid OperationID { get; set; }
    public string OperationType { get; set; }
    public bool Success => !Errors?.Any() ?? true;
    public List<string> Errors { get; set; } = new List<string>();
    public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}

public sealed class OperationStarted : IEvent
{
    public Guid OperationID { get; set; }
    public string OperationType { get; set; }
    public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}

处理程序

public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
    static ILog logger = LogManager.GetLogger<OperationHandler>();

    public Task Handle(OperationStarted message, IMessageHandlerContext context)
    {
        return PrintJsonMessage(message);
    }

    public Task Handle(OperationComplete message, IMessageHandlerContext context)
    {
        // This is not hit if ProcessMessage takes too long
        return PrintJsonMessage(message);
    }

    private Task PrintJsonMessage<T>(T message) where T : class
    {
        var msgObj = new
        {
            Message = typeof(T).Name,
            Data = message
        };
        logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
        return Task.CompletedTask;
    }

}

我确定context.Publish()调用被击中,因为_logger.Info()调用正在向我的测试控制台打印消息。我还证实他们遇到了断点。在我的测试中,任何运行时间超过500毫秒的事情都会阻止处理OperationComplete事件。

如果在ProcessMessage实现中经过任何大量时间后,任何人都可以提供关于为什么OperationComplete事件没有命中处理程序的建议,我将非常感谢您听到它们。谢谢!

- 更新 - 如果有其他人遇到这个并且对我最终做的事感到好奇:

an exchange与NServiceBus的开发人员合作之后,我决定使用一个实现IHandleTimeouts接口的看门狗传奇来定期检查作业是否完成。我正在使用saga数据,在作业完成时更新,以确定是否在超时处理程序中触发OperationComplete事件。这提出了另一个问题:当使用内存中持久性时,即使它被每个线程锁定,传奇数据也是跨线程的not persisted。为了解决这个问题,我创建了一个专门用于长时间运行的内存数据持久性的接口。此接口作为单例注入到saga中,因此用于跨线程读取/写入saga数据以进行长时间运行。

我知道不建议使用内存中持久性,但是根据我的需要配置另一种类型的持久性(如Azure表)是过度的;我只是想在正常情况下发射OperationComplete事件。如果在正在运行的作业期间发生重新启动,我不需要保留saga数据。无论如何,该作业将被缩短,如果作业运行时间超过设定的最大时间,则传奇超时将处理触发OperationComplete事件并发生错误。

c# .net multithreading nservicebus
1个回答
2
投票

原因是如果ProcessMessage足够快,你可能会在它失效之前获得当前的context,例如被处置。

通过成功返回Handle,你告诉NServiceBus:“我已经完成了这个消息”,所以它也可以用context做它想做的事情,比如使它失效。在后台处理器中,您需要一个端点实例,而不是消息上下文。

当新任务开始运行时,你不知道Handle是否已经返回,所以你应该只考虑消息已被消耗,因此是不可恢复的。如果您的单独任务中发生错误,则无法重试。

避免长时间运行的进程而不持久化。您提到的示例包含一个存储消息中的工作项的服务器,以及一个轮询该存储以查找工作项的过程。如果你扩展处理器,它可能不是理想的,但它不会丢失消息。

为了避免持续轮询,请合并服务器和处理器,在启动时进行无条件轮询,并在Handle中安排轮询任务。请注意此任务仅在没有其他轮询任务正在运行时进行轮询,否则可能会比常量轮询更糟糕。您可以使用信号量来控制它。

要向外扩展,您必须拥有更多服务器。您需要测量N个处理器轮询的成本是否大于以循环方式向N个服务器发送,对于某些N,要知道哪个方法实际上表现更好。在实践中,轮询对于低N来说足够好。

修改多个处理器的样本可能需要更少的部署和配置工作,您只需添加或删除处理器,而添加或删除服务器需要更改指向它们的所有位置(例如配置文件)中的enpoint。

另一种方法是将漫长的过程分解为步骤。 NServiceBus有传奇。这是一种通常针对已知或有限数量的步骤实施的方法。对于未知数量的步骤,它仍然是可行的,尽管有些人可能会认为它滥用了看似预期的传奇目的。

© www.soinside.com 2019 - 2024. All rights reserved.