我们希望实现由 Azure 服务总线和 MassTransit 支持的消息总线。我正在开发一些控制台应用程序来模拟事物。我创建了一个
Backend
应用程序来模仿服务器,以及一个 Client
应用程序来模仿连接到后端的一个或多个前端。
根据我在网上阅读的内容,后端如下所示:
using System;
using System.Threading.Tasks;
using MassTransit.Demo.Contract;
using MassTransit.Demo.Contract.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransit.Demo.Backend
{
public class Program
{
private static readonly string BusEndpointName = $"{Environment.MachineName}-{DateTime.Now.Second}-Bus";
public static async Task Main(string[] args)
{
Console.Title = "Sender Window";
Console.WriteLine("Starting MassTransit publisher...");
await CreateHostBuilder(args).Build().RunAsync();
//Cleanup queues, topics, and subs via MS Azure Libs since MT does not do this?
await AzureManagementHelper.RemoveQueueAndSubscription(BusEndpointName);
}
public static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args).ConfigureServices((_, services) =>
{
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
busRegistrationConfigurator.AddHostedService<BackendPublishService>();
busRegistrationConfigurator.AddHostedService<RequestResponseService>();
busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusBusFactoryConfigurator) =>
{
serviceBusBusFactoryConfigurator.Host(ConnectionStrings.AzureServiceBusConnectionString);
serviceBusBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);
//Specify the request-response temporary bus name
serviceBusBusFactoryConfigurator.OverrideDefaultBusEndpointQueueName(BusEndpointName);
//Specify the Message Type Topic names explicitly instead of using assumed format
serviceBusBusFactoryConfigurator.Message<BackendBroadcastMessage>(
messageTopologyConfigurator => messageTopologyConfigurator
.SetEntityName(ConnectionStrings.TopicNameBackendBroadcast));
serviceBusBusFactoryConfigurator.Message<RequestMessage>(
messageTopologyConfigurator => messageTopologyConfigurator
.SetEntityName(ConnectionStrings.TopicNameRequestResponse));
});
});
});
}
}
}
客户相似。一切都有效。但对于下一个要求,我有点不确定正确的方法是什么。
假设后端首先运行(并且始终运行)。然后,客户端上线并需要向后端“注册”。这会在后端和前端之间创建一对一的连接,以发送其他客户端不应看到的目标消息。应用程序启动并配置后,MT 中如何完成此操作?我读过的所有文档都讨论了上面代码中的启动配置。但我希望它是动态的,这样当客户上下移动时,他们可以优雅地注册和注销自己。
我开始沿着这条路走下去
RequestResponseService
:
using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit.Demo.Contract.Messages;
using Microsoft.Extensions.Hosting;
namespace MassTransit.Demo.Backend
{
/// <summary>
/// Sends request messages at regular intervals and awaits responses.
/// </summary>
public sealed class RequestResponseService : BackgroundService
{
private readonly IBus _bus;
public RequestResponseService(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var count = 0;
while (!stoppingToken.IsCancellationRequested)
{
var message = new RequestMessage($"Yo {count++}");
Console.WriteLine($"({Environment.CurrentManagedThreadId}) Requesting response to: {message.MessageText}");
//Await but avoid context switchback so it does not block other message services
var response = await _bus
.Request<RequestMessage, ResponseMessage>(message, stoppingToken)
.ConfigureAwait(false);
Console.WriteLine($"({Environment.CurrentManagedThreadId}) Response received: {response.Message.MessageText}");
await Task.Delay(4000, stoppingToken);
}
}
}
}
但这将是一种 FIFO 场景,第一个接收请求并响应的客户端获胜。其他人会做出回应,而这些消息将成为死信或其他什么。看看
IBus
,我看到了创建 IClientFactory
的选项,但它似乎需要 URI 和端点。
要实现客户端与 MassTransit 后端的动态注册和注销以进行一对一通信,您可以实现服务发现机制并使用 MassTransit 的动态消费者注册功能。 这样,客户上线时可以向后台注册,离线时可以注销。
当客户端通过发送
ClientRegistrationMessage
向后端注册时,后端会为该客户端创建一个动态命名的主题并订阅它。后端发送到该主题的消息只会被注册的客户端接收。
这种方式允许客户端动态注册和注销自己,后端可以向特定客户端发送有针对性的消息
代码
后端/Program.cs
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransitDemo.Backend
{
public class Program
{
public static async Task Main(string[] args)
{
Console.Title = "Backend";
Console.WriteLine("Starting MassTransit backend...");
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args).ConfigureServices((_, services) =>
{
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
var azureServiceBusConnectionString = "YOUR_AZURE_SERVICE_BUS_CONNECTION_STRING";
cfg.Host(azureServiceBusConnectionString);
// Register the client registration consumer
cfg.ReceiveEndpoint("client-registration-queue", e =>
{
e.Consumer<ClientRegistrationConsumer>();
});
// Register other message consumers as needed
});
});
services.AddHostedService<MassTransitService>();
});
}
}
后端/处理程序/ClientRegistrationConsumer.cs
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
namespace MassTransitDemo.Backend.Handlers
{
public class ClientRegistrationConsumer : IConsumer<ClientRegistrationMessage>
{
private readonly IBusControl _busControl;
public ClientRegistrationConsumer(IBusControl busControl)
{
_busControl = busControl;
}
public async Task Consume(ConsumeContext<ClientRegistrationMessage> context)
{
var clientId = context.Message.ClientId;
// Create a dynamically named topic for this client
var topicName = $"client-topic-{clientId}";
// Create a subscription for this client on the topic
await _busControl.ConnectSendEndpoint(new Uri(topicName)).ConfigureAwait(false);
// Perform registration logic here
// For simplicity, just print the registration message
Console.WriteLine($"Client '{clientId}' registered for message types: {string.Join(", ", context.Message.MessageTypes)}");
}
}
}
客户端/程序.cs
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransitDemo.Client
{
public class Program
{
public static async Task Main(string[] args)
{
Console.Title = "Client";
Console.WriteLine("Starting MassTransit client...");
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args).ConfigureServices((_, services) =>
{
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
var azureServiceBusConnectionString = "YOUR_AZURE_SERVICE_BUS_CONNECTION_STRING";
cfg.Host(azureServiceBusConnectionString);
// Register other message consumers as needed
});
});
services.AddHostedService<ClientService>();
});
}
}
}
客户端/处理程序/MessageHandlers.cs
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
namespace MassTransitDemo.Client.Handlers
{
public class TargetedMessageConsumer : IConsumer<TargetedMessage>
{
public async Task Consume(ConsumeContext<TargetedMessage> context)
{
// Handle the targeted message received by the client
var message = context.Message;
Console.WriteLine($"Client received targeted message: {message.Content}");
}
}
}
客户端/处理程序/RegistrationHandlers.cs
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
namespace MassTransitDemo.Client.Handlers
{
public class ClientRegistrationConsumer : IConsumer<ClientRegistrationMessage>
{
private readonly IBusControl _busControl;
public ClientRegistrationConsumer(IBusControl busControl)
{
_busControl = busControl;
}
public async Task Consume(ConsumeContext<ClientRegistrationMessage> context)
{
var clientId = context.Message.ClientId;
// Create a dynamically named topic for this client
var topicName = $"client-topic-{clientId}";
// Create a subscription for this client on the topic
await _busControl.ConnectSendEndpoint(new Uri(topicName)).ConfigureAwait(false);
// Perform registration logic here
// For simplicity, just print the registration message
Console.WriteLine($"Client '{clientId}' registered for message types: {string.Join(", ", context.Message.MessageTypes)}");
}
}
}
**Client/Handlers/TargetedMessageHandlers.cs**
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
namespace MassTransitDemo.Client.Handlers
{
public class TargetedMessageConsumer : IConsumer<TargetedMessage>
{
public async Task Consume(ConsumeContext<TargetedMessage> context)
{
// Handle the targeted message received by the client
var message = context.Message;
Console.WriteLine($"Client received targeted message: {message.Content}");
}
}
}
客户端/处理程序/TargetedMessageHandlers.cs
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
namespace MassTransitDemo.Client.Handlers
{
public class TargetedMessageConsumer : IConsumer<TargetedMessage>
{
public async Task Consume(ConsumeContext<TargetedMessage> context)
{
// Handle the targeted message received by the client
var message = context.Message;
Console.WriteLine($"Client received targeted message: {message.Content}");
}
}
}
客户端/消息/ClientRegistrationMessage.cs
`using System.Collections.Generic;
namespace MassTransitDemo.Client.Messages
{
public class ClientRegistrationMessage
{
public string ClientId { get; set; }
public List<string> MessageTypes { get; set; }
}
}`
客户端/消息/TargetedMessage.cs
namespace MassTransitDemo.Client.Messages
{
public class TargetedMessage
{
public string Content { get; set; }
}
}
这里,后端和客户端应用程序都设置了消息处理程序,客户端可以通过发送
ClientRegistrationMessage
动态向后端注册自己
结果 客户
后端