更新到 MassTransit 5.3.0 导致 ScopedConsumeContextProvider 未注册

问题描述 投票:0回答:1

我正在使用 MassTransit 5.2.0。我必须更新,因为我们要使用 AWS,并且它需要 amqps 协议才能使用rabbit mq,而 5.2.0 不支持该协议。我最终可能想更新到版本 5.5.6。当我更新库时,我如何收到此错误:

2023-09-14 16:01:09.0917|ERROR|MassTransit.Messages|R-FAULT rabbitmq://localhost/dqportal/dq_audits db270000-1fd6-00ff-3025-08dbb52b0be4 
DQ.Audits.IntegrationEvents.Contracts.IAuditEventOccurredIntegrationEvent DQ.Audits.IntegrationEvents.Consumers.AuditEventOccurredConsumer(00:00:00.0013801) 
No service for type 'MassTransit.Scoping.ScopedConsumeContextProvider' has been registered.
Exception:System.InvalidOperationException: No service for type 'MassTransit.Scoping.ScopedConsumeContextProvider' has been registered.
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.InternalScopeExtensions.UpdateScope(IServiceScope scope, ConsumeContext context)
   at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionConsumerScopeProvider.MassTransit.Scoping.IConsumerScopeProvider.GetScope[TConsumer,T](ConsumeContext`1 context)
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
   at GreenPipes.Filters.TeeFilter`1.Send(TContext context, IPipe`1 next)
   at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(TInput context, IPipe`1 next, TOutput pipeContext)
   at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(TInput context, IPipe`1 next, TOutput pipeContext)
   at GreenPipes.Filters.DynamicFilter`1.Send(TInput context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

当我从 MassTransit 5.2.3 -> 5.3.0 更新时就会发生这种情况。所以 5.2.3 是最后一个使用我的代码的版本。 5.3.0 是第一个失败的版本。 我想我必须在 MassTransit 的注册中做一些事情才能使其再次运行。以某种方式注册 ScopedConsumeContextProvider,但我真的不知道。

这是我的初始化代码:


    public void ConfigureServices(IServiceCollection services)
    {
       // other registrations ...
       services.AddEventBus(Configuration);
    }

    public static class SetupEventBusTask
    {
        public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
        {
            services.Scan(scan => scan.FromAssemblyOf<AuditEventOccurredConsumer>()
                .AddClasses(c => c.Where(t => t.Name.EndsWith("Consumer"))).AsSelf().WithScopedLifetime());

            if (configuration.GetValue<bool>("Queue:Enabled"))
                configureEventBusWithQueue(services, configuration);
            else
                configureEventBusInMemory(services, configuration);

            services.AddSingleton<IBus>(s => s.GetRequiredService<IBusControl>());
            services.AddSingleton<IHostedService, BusService>();
            services.AddScoped<IIntegrationEventStore, IntegrationEventStore>();

            return services;
        }

        private static void configureEventBusWithQueue(IServiceCollection services, IConfiguration configuration)
        {
            services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host(
                    configuration.GetValue<string>("Queue:HostName"),
                    configuration.GetValue<string>("Queue:VirtualHost"),
                    h => {
                        h.Username(configuration.GetValue<string>("Queue:Username"));
                        h.Password(configuration.GetValue<string>("Queue:Password"));
                    });

                cfg.ReceiveEndpoint(host, configuration.GetValue<string>("Queue:QueueName"), endpoint =>
                {
                    endpoint.Consumer<AuditEventOccurredConsumer>(provider);
                });

                //cfg.UseMessageScheduler(new Uri(configuration.GetValue<string>("Queue:SchedulerQueueUri")));
                cfg.UseNLog();
            }));
        }

        private static void configureEventBusInMemory(IServiceCollection services, IConfiguration configuration)
        {
            services.AddSingleton(provider => Bus.Factory.CreateUsingInMemory(cfg =>
            {
                cfg.UseNLog();
            }));
        }
    }

    public class BusService : IHostedService
    {
        private readonly IBusControl busControl;
        private readonly ILogger<BusService> logger;
        private readonly IConfiguration configuration;

        public BusService(
            IBusControl busControl,
            ILogger<BusService> logger,
            IConfiguration configuration)
        {
            this.busControl = busControl;
            this.logger = logger;
            this.configuration = configuration;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            logger.LogInformation("Starting eventbus");

            string hostname = configuration.GetValue<string>("Queue:HostName");
            string virtualhost = configuration.GetValue<string>("Queue:VirtualHost");
            string username = configuration.GetValue<string>("Queue:Username");
            string queuename = configuration.GetValue<string>("Queue:QueueName");

            logger.LogInformation($"Configuring Event Bus with Values: {hostname}, {virtualhost}, {username}, {queuename}.");

            await busControl.StartAsync(cancellationToken);

            logger.LogInformation("Started eventbus");
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            logger.LogInformation("Stopping eventbus");
            await busControl.StopAsync(cancellationToken);
            logger.LogInformation("Stopped eventbus");
        }
    }

消费者代码我已经简化为最简单的消费者,只写一条日志。我不认为问题出在这里。

masstransit
1个回答
0
投票

好吧,提出问题后我自己找到了答案!您必须像这样将消费者添加/注册到 MassTransit:

        private static void configureEventBusWithQueue(IServiceCollection services, IConfiguration configuration)
        {
            services.AddMassTransit(opt =>
            {
                opt.AddConsumer<AuditEventOccurredConsumer>();

            });
            // the other code ...
        }

© www.soinside.com 2019 - 2024. All rights reserved.