我有一项服务将消息发布到“ta-emails”队列(SQS) 我有一个正在运行的 Worker,它必须消耗这些消息。
public class AdvisorRegisteredConsumer: IConsumer<AdvisorRegistered>
{
private readonly ILogger<AdvisorRegisteredConsumer> _logger;
public AdvisorRegisteredConsumer(
ILogger<AdvisorRegisteredConsumer> logger
)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<AdvisorRegistered> context)
{
_logger.LogInformation("Consuming advisor id: {AdvisorId}", context.Message.AdvisorId);
}
}
Worker 的 Program.cs 中我的 MassTransit 配置
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
AmazonSQSConfig _sqsConfig = new AmazonSQSConfig
{
ServiceURL = configuration.GetValue<string>("AWS:SQS:Endpoint"),
AuthenticationRegion = configuration.GetValue<string>("AWS:Region")
};
cfg.Host(configuration.GetValue<string>("AWS:Region"), h =>
{
h.Config(_sqsConfig);
h.AccessKey(configuration.GetValue<string>("AWS:SQS:AccessKey"));
h.SecretKey(configuration.GetValue<string>("AWS:SQS:SecretAccessKey"));
});
cfg.ReceiveEndpoint("ta-emails", e =>
{
e.ConfigureConsumer<AdvisorRegisteredConsumer>(context);
});
});
});
这是appsettings.json配置节点
"AWS": {
"Region": "ru-central1",
"SQS": {
"Endpoint": "https://message-queue.api.cloud.yandex.net",
"AccessKey": "REPLACED_MY_KEY_WITH_THIS",
"SecretAccessKey": "REPLACED_MY_SECRET_KEY_WITH_THIS"
}
},
工人本身
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.UtcNow);
await Task.Delay(30000, cancellationToken);
}
}
}
发布服务和消费工作线程在 DI 中具有相同的(复制/粘贴)MassTransit 服务配置。
但是,消息已由服务成功发布,但从未由消费者在运行的工作线程中调度。
我认为我忘记添加与消费者配置或大众运输配置相关的内容。
您要做的第一件事是绝对确保您的发布者(或发送者,如果您使用
IBus.Send
而不是 IBus.Publish
)在完全相同的类型(包括命名空间)上运行。这就是 MassTransit 将消息与消费者匹配的方式。
借助 Amazon Simple Queue Service (SQS),MassTransit 可配置发布到您的队列的 Amazon Simple Notification Service (SNS) 主题。如果您/您的公司决定手动创建/配置 SNS 主题和 SQS 队列,而不是向您的应用程序/MassTransit 授予这样做的权限,那么您应该确保为原始消息传递配置您的主题,否则,您的队列收到的消息被包装在一个 SNS 信封中,MassTransit 不知道如何处理。
如果您像这样创建发布、接收和使用观察者:
public class MyMassTransitObserver : IReceiveObserver, IPublishObserver, IConsumeObserver
{
// ...
public Task PreReceive(ReceiveContext context)
{
_logger.LogInformation("PreReceive: msg}", context.Body.GetString();
}
// do simple logging for all the other interface methods
}
...
services.AddMassTransit(configurator =>
{
configurator.AddPublishObserver<MyMassTransitObserver>();
configurator.AddReceiveObserver<MyMassTransitObserver>();
configurator.AddConsumeObserver<MyMassTransitObserver>();
// ...
}
然后,当您发布事件时,您应该会看到以下模式被记录到您的控制台
在
PreReceive
中记录实际消息非常有用,因为它会向您显示 messageType
是否是您所期望的,以及您的消息是否意外包装在 SNS 信封中。
如果您的消息配置错误,则
PreConsume
...PostConsume
将无法运行。
如果您没有看到
PreReceive
...PostReceive
,那么消息可能根本没有传递到您的 SQS 队列,因此您需要确保它订阅了正确的主题。
如果您没有看到
PrePublish
...PostPublish
,那么您调用 IBus.Publish
的逻辑不会被调用,因此您可能需要使用调试器单步调试代码。