使用MassTransit发布到Azure服务总线会导致错误。

问题描述 投票:0回答:2

我是MassTransit和Azure服务总线的新手。我正在尝试使用一种架构,在该架构中,RabbitMq 或 Azure 服务总线被用于 .NET Core 3.1 API。我的 RabbitMq 部分可以使用,但刚刚开始使用 Azure 服务总线。我有一个 API,它将接收传入的有效载荷并将其发布到队列中。当我尝试通过 Azure 服务总线方法发布时,我得到一个错误信息 "SubCode=40000。因为命名空间'servicehubqa'使用的是'Basic'层,所以不能对Topic类型进行操作。

我正在尝试使用队列的方法,并希望在发布消息时创建队列。目前,服务总线使用的是Basic定价层,因为文档中说我可以在这个级别玩队列。我不确定是否需要手动创建队列(我不得不使用RabbitMq来做这种方法,因为如果没有消费者存在,就不会创建队列)。如果什么都不指定,topic是默认的方法吗?我如何指定队列与topic?

我的代码如下。

启动--配置服务

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton(Configuration);

        services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
        services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();

        var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
        switch (messageProvider)
        {
            case "AzureServiceBus":
                services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
                break;
            case "RabbitMq":
                services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
                break;
            default:
                throw new ArgumentException("Invalid message service");
        };

        services.AddControllers();
    }

控制器

public class ListenerController : ControllerBase
{
    readonly ILogger<ListenerController> logger;
    readonly IMessagingService messenger;

    public ListenerController(
        ILogger<ListenerController> logger,
        IMessagingService messenger)
    {
        this.logger = logger;
        this.messenger = messenger;
    }

    [HttpPost]
    public async Task<IActionResult> Post()
    {
        var payload = new
        {
            ...
        };

        await messenger.Publish(payload);

        return Ok();
    }
}

IMessagingService

public interface IMessagingService
{
    Task Publish(object payload);
}

IMassTransitTransport

public interface IMassTransitTransport
{
    IBusControl BusControl { get; }
}

public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }

public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }

MassTransitAzureServiceBusTransport(大众交通服务巴士运输)

public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
    public IBusControl BusControl { get; }

    public MassTransitAzureServiceBusTransport()
    {
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingAzureServiceBus(config => {
            var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
        });
    }
}

MassTransitAzureServiceBusMessaging服务

public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
    readonly IMassTransitAzureServiceBusTransport massTransitTransport;

    public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
    {
        //transport bus config already happens in massTransitTransport constructor
        this.massTransitTransport = massTransitTransport;
    }

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                //IRegisterCommandUpdateStatus is an interface specifying the properties needed
                await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }
}
c# azureservicebus masstransit asp.net-core-3.1 azure-servicebus-queues
2个回答
0
投票

Azure 服务总线基本层不允许使用主题。因此,你将无法使用发布。也就是说,MassTransit并不能真正与基本层一起工作,尽管过去的尝试可能已经成功。


0
投票

MassTransit文档确实指出,如果你想使用一个主题(即同时发布到多个订阅的能力),你使用publish.如果你想发送消息到队列(消息被路由到一个特定的位置),你使用send并提供正确的信息。

主题需要标准定价,队列可以使用基本定价。

有了这些信息,MassTransitAzureServiceBusMessagingService将被修改如下。

基本定价 - 队列

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                var queueUri = new Uri(massTransitTransport.BusControl.Address, "registration.updatestatus");
                var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(queueUri);
                await endpoint.Send<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }

标准价格 - 主题订阅

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.