我正在使用 MassTransit 5.2.0。我必须更新,因为我们要使用 AWS,并且它需要 amqps 协议才能使用rabbit mq,而 5.2.0 不支持该协议。我最终可能想更新到版本 5.5.6。当我更新库时,我如何收到此错误:
2023-09-14 16:01:09.0917|ERROR|MassTransit.Messages|R-FAULT rabbitmq://localhost/dqportal/dq_audits db270000-1fd6-00ff-3025-08dbb52b0be4
DQ.Audits.IntegrationEvents.Contracts.IAuditEventOccurredIntegrationEvent DQ.Audits.IntegrationEvents.Consumers.AuditEventOccurredConsumer(00:00:00.0013801)
No service for type 'MassTransit.Scoping.ScopedConsumeContextProvider' has been registered.
Exception:System.InvalidOperationException: No service for type 'MassTransit.Scoping.ScopedConsumeContextProvider' has been registered.
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.InternalScopeExtensions.UpdateScope(IServiceScope scope, ConsumeContext context)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionConsumerScopeProvider.MassTransit.Scoping.IConsumerScopeProvider.GetScope[TConsumer,T](ConsumeContext`1 context)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(TInput context, IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(TInput context, IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.Send(TInput context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
当我从 MassTransit 5.2.3 -> 5.3.0 更新时就会发生这种情况。所以 5.2.3 是最后一个使用我的代码的版本。 5.3.0 是第一个失败的版本。 我想我必须在 MassTransit 的注册中做一些事情才能使其再次运行。以某种方式注册 ScopedConsumeContextProvider,但我真的不知道。
这是我的初始化代码:
public void ConfigureServices(IServiceCollection services)
{
// other registrations ...
services.AddEventBus(Configuration);
}
public static class SetupEventBusTask
{
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.Scan(scan => scan.FromAssemblyOf<AuditEventOccurredConsumer>()
.AddClasses(c => c.Where(t => t.Name.EndsWith("Consumer"))).AsSelf().WithScopedLifetime());
if (configuration.GetValue<bool>("Queue:Enabled"))
configureEventBusWithQueue(services, configuration);
else
configureEventBusInMemory(services, configuration);
services.AddSingleton<IBus>(s => s.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
services.AddScoped<IIntegrationEventStore, IntegrationEventStore>();
return services;
}
private static void configureEventBusWithQueue(IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(
configuration.GetValue<string>("Queue:HostName"),
configuration.GetValue<string>("Queue:VirtualHost"),
h => {
h.Username(configuration.GetValue<string>("Queue:Username"));
h.Password(configuration.GetValue<string>("Queue:Password"));
});
cfg.ReceiveEndpoint(host, configuration.GetValue<string>("Queue:QueueName"), endpoint =>
{
endpoint.Consumer<AuditEventOccurredConsumer>(provider);
});
//cfg.UseMessageScheduler(new Uri(configuration.GetValue<string>("Queue:SchedulerQueueUri")));
cfg.UseNLog();
}));
}
private static void configureEventBusInMemory(IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton(provider => Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.UseNLog();
}));
}
}
public class BusService : IHostedService
{
private readonly IBusControl busControl;
private readonly ILogger<BusService> logger;
private readonly IConfiguration configuration;
public BusService(
IBusControl busControl,
ILogger<BusService> logger,
IConfiguration configuration)
{
this.busControl = busControl;
this.logger = logger;
this.configuration = configuration;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Starting eventbus");
string hostname = configuration.GetValue<string>("Queue:HostName");
string virtualhost = configuration.GetValue<string>("Queue:VirtualHost");
string username = configuration.GetValue<string>("Queue:Username");
string queuename = configuration.GetValue<string>("Queue:QueueName");
logger.LogInformation($"Configuring Event Bus with Values: {hostname}, {virtualhost}, {username}, {queuename}.");
await busControl.StartAsync(cancellationToken);
logger.LogInformation("Started eventbus");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping eventbus");
await busControl.StopAsync(cancellationToken);
logger.LogInformation("Stopped eventbus");
}
}
消费者代码我已经简化为最简单的消费者,只写一条日志。我不认为问题出在这里。
好吧,提出问题后我自己找到了答案!您必须像这样将消费者添加/注册到 MassTransit:
private static void configureEventBusWithQueue(IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(opt =>
{
opt.AddConsumer<AuditEventOccurredConsumer>();
});
// the other code ...
}