我是MassTransit和Azure服务总线的新手。我正在尝试使用一种架构,在该架构中,RabbitMq 或 Azure 服务总线被用于 .NET Core 3.1 API。我的 RabbitMq 部分可以使用,但刚刚开始使用 Azure 服务总线。我有一个 API,它将接收传入的有效载荷并将其发布到队列中。当我尝试通过 Azure 服务总线方法发布时,我得到一个错误信息 "SubCode=40000。因为命名空间'servicehubqa'使用的是'Basic'层,所以不能对Topic类型进行操作。
我正在尝试使用队列的方法,并希望在发布消息时创建队列。目前,服务总线使用的是Basic定价层,因为文档中说我可以在这个级别玩队列。我不确定是否需要手动创建队列(我不得不使用RabbitMq来做这种方法,因为如果没有消费者存在,就不会创建队列)。如果什么都不指定,topic是默认的方法吗?我如何指定队列与topic?
我的代码如下。
启动--配置服务
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton(Configuration);
services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();
var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
switch (messageProvider)
{
case "AzureServiceBus":
services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
break;
case "RabbitMq":
services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddControllers();
}
控制器
public class ListenerController : ControllerBase
{
readonly ILogger<ListenerController> logger;
readonly IMessagingService messenger;
public ListenerController(
ILogger<ListenerController> logger,
IMessagingService messenger)
{
this.logger = logger;
this.messenger = messenger;
}
[HttpPost]
public async Task<IActionResult> Post()
{
var payload = new
{
...
};
await messenger.Publish(payload);
return Ok();
}
}
IMessagingService
public interface IMessagingService
{
Task Publish(object payload);
}
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }
public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }
MassTransitAzureServiceBusTransport(大众交通服务巴士运输)
public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
public IBusControl BusControl { get; }
public MassTransitAzureServiceBusTransport()
{
BusControl = ConfigureBus();
BusControl.StartAsync();
}
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(config => {
var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
});
}
}
MassTransitAzureServiceBusMessaging服务
public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
readonly IMassTransitAzureServiceBusTransport massTransitTransport;
public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
//IRegisterCommandUpdateStatus is an interface specifying the properties needed
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
}
Azure 服务总线基本层不允许使用主题。因此,你将无法使用发布。也就是说,MassTransit并不能真正与基本层一起工作,尽管过去的尝试可能已经成功。
MassTransit文档确实指出,如果你想使用一个主题(即同时发布到多个订阅的能力),你使用publish.如果你想发送消息到队列(消息被路由到一个特定的位置),你使用send并提供正确的信息。
主题需要标准定价,队列可以使用基本定价。
有了这些信息,MassTransitAzureServiceBusMessagingService将被修改如下。
基本定价 - 队列
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
var queueUri = new Uri(massTransitTransport.BusControl.Address, "registration.updatestatus");
var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(queueUri);
await endpoint.Send<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
标准价格 - 主题订阅
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}