在后端动态注册 MassTransit 客户端

问题描述 投票:0回答:1

我们希望实现由 Azure 服务总线和 MassTransit 支持的消息总线。我正在开发一些控制台应用程序来模拟事物。我创建了一个

Backend
应用程序来模仿服务器,以及一个
Client
应用程序来模仿连接到后端的一个或多个前端。

根据我在网上阅读的内容,后端如下所示:

using System;
using System.Threading.Tasks;
using MassTransit.Demo.Contract;
using MassTransit.Demo.Contract.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace MassTransit.Demo.Backend
{
    public class Program
    {
        private static readonly string BusEndpointName = $"{Environment.MachineName}-{DateTime.Now.Second}-Bus";

        public static async Task Main(string[] args)
        {
            Console.Title = "Sender Window";
            Console.WriteLine("Starting MassTransit publisher...");
            await CreateHostBuilder(args).Build().RunAsync();
            //Cleanup queues, topics, and subs via MS Azure Libs since MT does not do this?
            await AzureManagementHelper.RemoveQueueAndSubscription(BusEndpointName);
        }

        public static IHostBuilder CreateHostBuilder(string[] args)
        {
            return Host.CreateDefaultBuilder(args).ConfigureServices((_, services) =>
            {
                services.AddMassTransit(busRegistrationConfigurator =>
                {
                    busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
                    busRegistrationConfigurator.AddHostedService<BackendPublishService>();
                    busRegistrationConfigurator.AddHostedService<RequestResponseService>();

                    busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusBusFactoryConfigurator) =>
                    {
                        serviceBusBusFactoryConfigurator.Host(ConnectionStrings.AzureServiceBusConnectionString);
                        serviceBusBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);

                        //Specify the request-response temporary bus name
                        serviceBusBusFactoryConfigurator.OverrideDefaultBusEndpointQueueName(BusEndpointName);

                        //Specify the Message Type Topic names explicitly instead of using assumed format
                        serviceBusBusFactoryConfigurator.Message<BackendBroadcastMessage>(
                            messageTopologyConfigurator => messageTopologyConfigurator
                                .SetEntityName(ConnectionStrings.TopicNameBackendBroadcast));

                        serviceBusBusFactoryConfigurator.Message<RequestMessage>(
                            messageTopologyConfigurator => messageTopologyConfigurator
                                .SetEntityName(ConnectionStrings.TopicNameRequestResponse));
                    });
                });
            });
        }
    }
}

客户相似。一切都有效。但对于下一个要求,我有点不确定正确的方法是什么。

假设后端首先运行(并且始终运行)。然后,客户端上线并需要向后端“注册”。这会在后端和前端之间创建一对一的连接,以发送其他客户端不应看到的目标消息。应用程序启动并配置后,MT 中如何完成此操作?我读过的所有文档都讨论了上面代码中的启动配置。但我希望它是动态的,这样当客户上下移动时,他们可以优雅地注册和注销自己。

我开始沿着这条路走下去

RequestResponseService
:

using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit.Demo.Contract.Messages;
using Microsoft.Extensions.Hosting;

namespace MassTransit.Demo.Backend
{
    /// <summary>
    /// Sends request messages at regular intervals and awaits responses.
    /// </summary>
    public sealed class RequestResponseService : BackgroundService
    {
        private readonly IBus _bus;

        public RequestResponseService(IBus bus)
        {
            _bus = bus;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var count = 0;
            while (!stoppingToken.IsCancellationRequested)
            {
                var message = new RequestMessage($"Yo {count++}");
                Console.WriteLine($"({Environment.CurrentManagedThreadId}) Requesting response to: {message.MessageText}");

                //Await but avoid context switchback so it does not block other message services
                var response = await _bus
                    .Request<RequestMessage, ResponseMessage>(message, stoppingToken)
                    .ConfigureAwait(false);

                Console.WriteLine($"({Environment.CurrentManagedThreadId}) Response received: {response.Message.MessageText}");
                await Task.Delay(4000, stoppingToken);
            }
        }
    }
}

但这将是一种 FIFO 场景,第一个接收请求并响应的客户端获胜。其他人会做出回应,而这些消息将成为死信或其他什么。看看

IBus
,我看到了创建
IClientFactory
的选项,但它似乎需要 URI 和端点。

c# azure azureservicebus masstransit
1个回答
0
投票

要实现客户端与 MassTransit 后端的动态注册和注销以进行一对一通信,您可以实现服务发现机制并使用 MassTransit 的动态消费者注册功能。 这样,客户上线时可以向后台注册,离线时可以注销。

当客户端通过发送

ClientRegistrationMessage
向后端注册时,后端会为该客户端创建一个动态命名的主题并订阅它。后端发送到该主题的消息只会被注册的客户端接收。

这种方式允许客户端动态注册和注销自己,后端可以向特定客户端发送有针对性的消息

代码

后端/Program.cs

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace MassTransitDemo.Backend
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            Console.Title = "Backend";
            Console.WriteLine("Starting MassTransit backend...");

            await CreateHostBuilder(args).Build().RunAsync();
        }

        public static IHostBuilder CreateHostBuilder(string[] args)
        {
            return Host.CreateDefaultBuilder(args).ConfigureServices((_, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingAzureServiceBus((context, cfg) =>
                    {
                        var azureServiceBusConnectionString = "YOUR_AZURE_SERVICE_BUS_CONNECTION_STRING";
                        cfg.Host(azureServiceBusConnectionString);

                        // Register the client registration consumer
                        cfg.ReceiveEndpoint("client-registration-queue", e =>
                        {
                            e.Consumer<ClientRegistrationConsumer>();
                        });

                        // Register other message consumers as needed
                    });
                });
                services.AddHostedService<MassTransitService>();
            });
        }
    }

后端/处理程序/ClientRegistrationConsumer.cs

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;

namespace MassTransitDemo.Backend.Handlers
{
    public class ClientRegistrationConsumer : IConsumer<ClientRegistrationMessage>
    {
        private readonly IBusControl _busControl;

        public ClientRegistrationConsumer(IBusControl busControl)
        {
            _busControl = busControl;
        }

        public async Task Consume(ConsumeContext<ClientRegistrationMessage> context)
        {
            var clientId = context.Message.ClientId;

            // Create a dynamically named topic for this client
            var topicName = $"client-topic-{clientId}";

            // Create a subscription for this client on the topic
            await _busControl.ConnectSendEndpoint(new Uri(topicName)).ConfigureAwait(false);

            // Perform registration logic here
            // For simplicity, just print the registration message
            Console.WriteLine($"Client '{clientId}' registered for message types: {string.Join(", ", context.Message.MessageTypes)}");
        }
    }
}

客户端/程序.cs

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace MassTransitDemo.Client
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            Console.Title = "Client";
            Console.WriteLine("Starting MassTransit client...");

            await CreateHostBuilder(args).Build().RunAsync();
        }

        public static IHostBuilder CreateHostBuilder(string[] args)
        {
            return Host.CreateDefaultBuilder(args).ConfigureServices((_, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingAzureServiceBus((context, cfg) =>
                    {
                        var azureServiceBusConnectionString = "YOUR_AZURE_SERVICE_BUS_CONNECTION_STRING";
                        cfg.Host(azureServiceBusConnectionString);

                        // Register other message consumers as needed
                    });
                });
                services.AddHostedService<ClientService>();
            });
        }
    }
}

客户端/处理程序/MessageHandlers.cs

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;

namespace MassTransitDemo.Client.Handlers
{
    public class TargetedMessageConsumer : IConsumer<TargetedMessage>
    {
        public async Task Consume(ConsumeContext<TargetedMessage> context)
        {
            // Handle the targeted message received by the client
            var message = context.Message;
            Console.WriteLine($"Client received targeted message: {message.Content}");
        }
    }
}

客户端/处理程序/RegistrationHandlers.cs

    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using MassTransitDemo.Client.Messages;
    
    namespace MassTransitDemo.Client.Handlers
    {
        public class ClientRegistrationConsumer : IConsumer<ClientRegistrationMessage>
        {
            private readonly IBusControl _busControl;
    
            public ClientRegistrationConsumer(IBusControl busControl)
            {
                _busControl = busControl;
            }
    
            public async Task Consume(ConsumeContext<ClientRegistrationMessage> context)
            {
                var clientId = context.Message.ClientId;
    
                // Create a dynamically named topic for this client
                var topicName = $"client-topic-{clientId}";
    
                // Create a subscription for this client on the topic
                await _busControl.ConnectSendEndpoint(new Uri(topicName)).ConfigureAwait(false);
    
                // Perform registration logic here
                // For simplicity, just print the registration message
                Console.WriteLine($"Client '{clientId}' registered for message types: {string.Join(", ", context.Message.MessageTypes)}");
            }
        }
    }
**Client/Handlers/TargetedMessageHandlers.cs**
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;

namespace MassTransitDemo.Client.Handlers
{
    public class TargetedMessageConsumer : IConsumer<TargetedMessage>
    {
        public async Task Consume(ConsumeContext<TargetedMessage> context)
        {
            // Handle the targeted message received by the client
            var message = context.Message;
            Console.WriteLine($"Client received targeted message: {message.Content}");
        }
    }
}

客户端/处理程序/TargetedMessageHandlers.cs

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransitDemo.Client.Messages;

namespace MassTransitDemo.Client.Handlers
{
    public class TargetedMessageConsumer : IConsumer<TargetedMessage>
    {
        public async Task Consume(ConsumeContext<TargetedMessage> context)
        {
            // Handle the targeted message received by the client
            var message = context.Message;
            Console.WriteLine($"Client received targeted message: {message.Content}");
        }
    }
}

客户端/消息/ClientRegistrationMessage.cs

`using System.Collections.Generic;

namespace MassTransitDemo.Client.Messages
{
    public class ClientRegistrationMessage
    {
        public string ClientId { get; set; }
        public List<string> MessageTypes { get; set; }
    }
}` 

客户端/消息/TargetedMessage.cs

namespace MassTransitDemo.Client.Messages
{
    public class TargetedMessage
    {
        public string Content { get; set; }
    }
}

这里,后端和客户端应用程序都设置了消息处理程序,客户端可以通过发送

ClientRegistrationMessage

动态向后端注册自己

结果 客户 enter image description here

后端 enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.