注册IConsumer 使用Autofac在Masstransit中依赖

问题描述 投票:1回答:2

我有2个系统,一个用于发布消息,另一个用于消费它们。两者都使用Masstransit(使用RabbitMQ)并使用ASP.Net web api 2和OWIN(以及Autofac作为IoC容器)实现。如果我的消费者没有依赖关系,一切正常,但是当我将依赖注入到我的使用者中时,从不执行Consume方法(在初始化期间没有抛出错误)。

这是相关的发布商代码:

//Startup.cs
public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        HttpConfiguration config = new HttpConfiguration();

        IContainer container = null;
        var builder = new ContainerBuilder();

        builder.Register(context =>
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
                {
                    settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
                    settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
                });
            });

            return busControl;
        })
        .As<IBusControl>()
        .As<IBus>()
        .SingleInstance();

        // Register Web API controllers
        builder.RegisterApiControllers(Assembly.GetExecutingAssembly());

        // Resolve dependencies
        container = builder.Build();
        config.DependencyResolver = AutofacWebApiDependencyResolver(container);

        WebApiConfig.Register(config);
        SwaggerConfig.Register(config);
        app.UseCors(CorsOptions.AllowAll);

        // Register the Autofac middleware FIRST.
        app.UseAutofacMiddleware(container);
        app.UseWebApi(config);

        // Starts MassTransit Service bus, and registers stopping of bus on app dispose
        var bus = container.Resolve<IBusControl>();
        var busHandle = bus.StartAsync();
        var properties = new AppProperties(app.Properties);
        if (properties.OnAppDisposing != CancellationToken.None)
        {
            properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
        }
    }
}

// Controller
public IHttpActionResult Post()
{
    _bus.Publish<IFooMessage>(new
    {
        Foo = "Foo"
    });

    return Ok();
}

这是相关的消费者代码:

// Startup.cs
public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        HttpConfiguration config = new HttpConfiguration();

        IContainer container = null;
        var builder = new ContainerBuilder();

        builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();
        builder.RegisterModule<BusModule>();
        builder.RegisterModule<ConsumersModule>();

        // Register Web API controllers
        builder.RegisterApiControllers(Assembly.GetExecutingAssembly());

        // Resolve dependencies
        container = builder.Build();
        config.DependencyResolver = new AutofacWebApiDependencyResolver(container);

        WebApiConfig.Register(config);
        SwaggerConfig.Register(config);
        app.UseCors(CorsOptions.AllowAll);

        // Register the Autofac middleware FIRST.
        app.UseAutofacMiddleware(container);
        app.UseWebApi(config);

        // Starts MassTransit Service bus, and registers stopping of bus on app dispose
        var bus = container.Resolve<IBusControl>();
        var busHandle = bus.StartAsync();
        var properties = new AppProperties(app.Properties);
        if (properties.OnAppDisposing != CancellationToken.None)
        {
            properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
        }
    }
}

// BusModule.cs
public class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.Register(context =>
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
                {
                    settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
                    settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
                });
                cfg.ReceiveEndpoint(rabbitMqHost, "IP.AgilePoint.queue", ec =>
                {
                    ec.LoadFrom(context);
                });
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}

// ConsumerModule.cs
public class ConsumersModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterType<FooConsumer>();
    }
}

// FooConsumer.cs
public class FooConsumer : IConsumer<IFooMessage>
{
    private IFooService _service;

    public FooConsumer(IFooService service)
    {
        _service = service;
    }

    public Task Consume(ConsumeContext<IFooMessage> context)
    {
        IFooMessage @event = context.Message;

        _service.DoStuff(@event.Foo);

        return Task.FromResult(context.Message);
    }
}

注意我的FooConsumer在IFooService上有一个依赖项(构造函数)。我跟着Masstransit documentation,但我不能让它工作。我究竟做错了什么?

框架版本:

  • .Net Framework 4.6.1
  • Autofac:3.5.2
  • Masstransit:3.5.7

更新:

代码可以找到in this Github repository

c# rabbitmq owin autofac masstransit
2个回答
3
投票

我建议查看特定于Autofac的文档,它是通过扩展库完全支持的容器。

http://masstransit-project.com/MassTransit/usage/containers/autofac.html

包裹:https://www.nuget.org/packages/masstransit.autofac


0
投票

最后我发现了我做错了什么。出于某种原因,我无法在RabbitMQ Management中看到我的队列。当我能够看到错误队列时,我注意到de follow错误:

RabbitMQ Error

我以这种方式注册我的IFooService:

builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();

InstancePerRequest()导致错误。如果我用builder.RegisterType<FooService>().As<IFooService>()注册服务一切正常。我认为这是因为我的总线实例是单身(注册为SingleIsntace())。使用web api项目来托管我的公共汽车/消费者这一事实让我感到困惑。

无论如何,感谢@Chris Patterson和@Alexey Zimarev指出我正确的实施方向(使用MT扩展来注册消费者)。

© www.soinside.com 2019 - 2024. All rights reserved.