当 Pod 关闭或重新启动时,公共交通取消消费者作业

问题描述 投票:0回答:1

我在 .Net 应用程序上遇到以下公共交通问题。

我有一个类实现 IConsumer<> 接口并使用来自 RabbitMQ 队列的消息。该应用程序是在 Kubernetes 集群上作为 pod 运行的,它可以水平扩展,因此如果需要,我们可以同时运行该应用程序的多个实例。

我还将服务注入:

builder.Services.AddMassTransit()

我在其中配置消费者、RabbitMQ 主机和接收端点。

当 Pod 出于某种原因关闭/重新启动时,并且因为从 Pod 收到 SIGTERM 信号到完全关闭之间存在实际延迟,我希望能够:

  • 通知我从消费者调用的方法任务已被取消;
  • 停止所有其他刚刚开始处理消息的消费者实际调用该方法;

这是我现在所拥有的,它不起作用主要是因为上下文中的取消令牌没有改变状态:

public async Task Consume(ConsumeContext<NotificationMessage> context)
{
  try
  {
    if (context.CancellationToken.IsCancellationRequested)
    {
        throw new OperationCanceledException();
    }

    // call method that also receives the Cancellation Token
  }
  catch (Exception ex)
      when (ex is OperationCanceledException || ex is TaskCanceledException)
  {
    // log it

    throw;
  }
}

如果您有任何想法,请告诉我如何实现这种行为。 谢谢!

c# rabbitmq .net-6.0 masstransit
1个回答
1
投票

您可以在 MassTransit 中配置 主机选项,以便当通用主机终止时,总线及其使用者也可以被取消。

ConsumerStopTimeout
设置为低于
StopTimeout
的值将通过
CancellationToken
上的
ConsumeContext
属性向消费者发出信号,表明公交车正在停止。

此外,请务必设置 .NET 主机停止超时,这也记录在上面的链接中。

© www.soinside.com 2019 - 2024. All rights reserved.