MassTransit消费者从未收到信息

问题描述 投票:0回答:1

我正在按照在 ASP.NET Core 应用程序中使用 RabbitMQ 和 Autofac 的 MassTransit 的文档构建一个演示应用程序。

我的程序代码:

namespace MessageDemo
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
                .UseServiceProviderFactory(new AutofacServiceProviderFactory())
                .ConfigureWebHostDefaults(webHostBuilder =>
                {
                    webHostBuilder
                        .UseContentRoot(Directory.GetCurrentDirectory())
                        .UseIISIntegration()
                        .UseStartup<Startup>();
                })
                .Build();
            host.Run();
        }
    }
}

我的启动程序:

    public class Startup
    {
        public Startup(IWebHostEnvironment env)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(env.ContentRootPath)
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
                .AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
                .AddEnvironmentVariables();
            this.Configuration = builder.Build();
        }

        public IConfiguration Configuration { get; }
        public ILifetimeScope AutofacContainer { get; set; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddOptions();
            services.AddControllers();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            this.AutofacContainer = app.ApplicationServices.GetAutofacRoot();

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }

        // ConfigureContainer is where you can register things directly
        public void ConfigureContainer(ContainerBuilder builder)
        {

            builder.RegisterType<DemoContent>().As<IDemoContent>();
            builder.RegisterType<WeatherForecast>().As<IWeatherForecast>();

            builder.AddMassTransit(x =>
            {
                x.AddConsumer<DemoConsumer>();

                x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
                {

                    cfg.Host("rabbitmq://my_container_ip/", host =>
                    {
                        host.Username("devuser");
                        host.Password("devuser");
                    });


                    cfg.ReceiveEndpoint("submit-data", ec =>
                    {
                        // Configure a single consumer
                        ec.ConfigureConsumer<DemoConsumer>(context);
                    });

                }));
            });
        }
    }

我的消费者

    public class DemoConsumer : IConsumer<IDemoContent>
    {
        public async Task Consume(ConsumeContext<IDemoContent> context)
        {
            Debug.WriteLine($"Write content: {context.Message.Data}");
            await Console.Out.WriteLineAsync($"Write content: {context.Message.Data}");
        }
    }

为了测试,我通过点击控制器的一个端点来触发发布,发布端点被容器注入。

        // GET: api/Demo
        [HttpGet]
        public async void Get()
        {
            await _endpoint.Publish<IDemoContent>(new
            {
                Data = "Some random content"
            }, new CancellationToken());
        }

这一切似乎都在工作--没有错误信息--使用InMemoryTestHarness添加了一个演示单元测试,并且正在工作--我的RabbitMQ实例在 "管理器概览 "中注册了发布的消息。

我在RabbitMQ管理UI中获得了Publisher Confirmation,但消息显示为Unroutable(drop)。

asp.net-core rabbitmq masstransit consumer
1个回答
1
投票

由于您没有添加托管服务,所以总线没有启动。

services.AddMassTransitHostedService();

它就在代码片断中。在文档中.

© www.soinside.com 2019 - 2024. All rights reserved.