MassTransit Consumer 不消费 Worker 中的消息

问题描述 投票:0回答:1

我有一项服务将消息发布到“ta-emails”队列(SQS) 我有一个正在运行的 Worker,它必须消耗这些消息。

public class AdvisorRegisteredConsumer: IConsumer<AdvisorRegistered>
{
    private readonly ILogger<AdvisorRegisteredConsumer> _logger;

    public AdvisorRegisteredConsumer(
        ILogger<AdvisorRegisteredConsumer> logger
        )
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<AdvisorRegistered> context)
    {
        _logger.LogInformation("Consuming advisor id: {AdvisorId}", context.Message.AdvisorId);
    }
}

Worker 的 Program.cs 中我的 MassTransit 配置

        services.AddMassTransit(x =>
        {
            x.UsingAmazonSqs((context, cfg) =>
            {
                AmazonSQSConfig _sqsConfig = new AmazonSQSConfig
                {
                    ServiceURL = configuration.GetValue<string>("AWS:SQS:Endpoint"),
                    AuthenticationRegion = configuration.GetValue<string>("AWS:Region")
                };

                cfg.Host(configuration.GetValue<string>("AWS:Region"), h =>
                {
                    h.Config(_sqsConfig);
                    h.AccessKey(configuration.GetValue<string>("AWS:SQS:AccessKey"));
                    h.SecretKey(configuration.GetValue<string>("AWS:SQS:SecretAccessKey"));
                });
                
                cfg.ReceiveEndpoint("ta-emails", e =>
                {
                    e.ConfigureConsumer<AdvisorRegisteredConsumer>(context);
                });
            });
        });

这是appsettings.json配置节点

 "AWS": {
        "Region": "ru-central1",
        "SQS": {
          "Endpoint": "https://message-queue.api.cloud.yandex.net",
          "AccessKey": "REPLACED_MY_KEY_WITH_THIS",
          "SecretAccessKey": "REPLACED_MY_SECRET_KEY_WITH_THIS"
        }
      },

工人本身

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;

    public Worker(ILogger<Worker> logger)
    {
        _logger = logger;
    }
    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            _logger.LogInformation("Worker running at: {time}", DateTimeOffset.UtcNow);

            await Task.Delay(30000, cancellationToken);
        }
    }
}

发布服务和消费工作线程在 DI 中具有相同的(复制/粘贴)MassTransit 服务配置。

但是,消息已由服务成功发布,但从未由消费者在运行的工作线程中调度。

我认为我忘记添加与消费者配置或大众运输配置相关的内容。

.net-core amazon-sqs masstransit
1个回答
0
投票

您要做的第一件事是绝对确保您的发布者(或发送者,如果您使用

IBus.Send
而不是
IBus.Publish
)在完全相同的类型(包括命名空间)上运行。这就是 MassTransit 将消息与消费者匹配的方式。

借助 Amazon Simple Queue Service (SQS),MassTransit 可配置发布到您的队列的 Amazon Simple Notification Service (SNS) 主题。如果您/您的公司决定手动创建/配置 SNS 主题和 SQS 队列,而不是向您的应用程序/MassTransit 授予这样做的权限,那么您应该确保为原始消息传递配置您的主题,否则,您的队列收到的消息被包装在一个 SNS 信封中,MassTransit 不知道如何处理。

如果您像这样创建发布、接收和使用观察者

public class MyMassTransitObserver : IReceiveObserver, IPublishObserver, IConsumeObserver
{
 // ...
 public Task PreReceive(ReceiveContext context)
 {
    _logger.LogInformation("PreReceive: msg}", context.Body.GetString();
 }
 // do simple logging for all the other interface methods
}

...
services.AddMassTransit(configurator =>
{
    configurator.AddPublishObserver<MyMassTransitObserver>();
    configurator.AddReceiveObserver<MyMassTransitObserver>();
    configurator.AddConsumeObserver<MyMassTransitObserver>();
    // ...
}

然后,当您发布事件时,您应该会看到以下模式被记录到您的控制台

  • 预发布
  • 发布后
  • 预接收
  • 消费前
  • (你的消费者运行)
  • 消费后
  • 接收后

PreReceive
中记录实际消息非常有用,因为它会向您显示
messageType
是否是您所期望的,以及您的消息是否意外包装在 SNS 信封中。

如果您的消息配置错误,则

PreConsume
...
PostConsume
将无法运行。

如果您没有看到

PreReceive
...
PostReceive
,那么消息可能根本没有传递到您的 SQS 队列,因此您需要确保它订阅了正确的主题。

如果您没有看到

PrePublish
...
PostPublish
,那么您调用
IBus.Publish
的逻辑不会被调用,因此您可能需要使用调试器单步调试代码。

© www.soinside.com 2019 - 2024. All rights reserved.