具有辅助角色的Azure Service Bus主题和订阅

问题描述 投票:0回答:3

所以我最近需要使用Service Bus Topic and Subscriptions,我已经关注了很多文章和教程。我已经能够成功实现Microsoft的Get started with Service Bus topics,并成功使用Visual Studio 2017's Worker Role模板来访问数据库。

但是,我对如何正确地“结合”这两者感到困惑。虽然Get started with Service Bus topics文章展示了如何创建2个应用程序,一个发送,一个接收然后退出,Worker Role模板似乎无休止地循环使用await Task.Delay(10000);

我不确定如何正确地“啮合”两者。从本质上讲,我希望我的Worker Role能够保持活力并永久地收听它的订阅(或直到它明显退出)。

任何指导都会很棒!

P.S。:如果你感兴趣的话,我已经在StackExchange - Software Engineering问了一个关于我应该用于我的案例场景的正确技术的相关问题。

更新#1(2018/08/09)

基于Arunprabhu's answer,这里有一些代码,说明我如何使用MessageVisual Studio 2017模板基于我阅读和接收的文章发送Worker Role with Service Bus Queue

发送(基于Get started with Service Bus topics

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;

namespace TopicsSender {
    internal static class Program {
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private static ITopicClient _topicClient;

        private static void Main(string[] args) {
            MainAsync().GetAwaiter().GetResult();
        }

        private static async Task MainAsync() {
            const int numberOfMessages = 10;
            _topicClient = new TopicClient(ServiceBusConnectionString, TopicName);

            Console.WriteLine("======================================================");
            Console.WriteLine("Press ENTER key to exit after sending all the messages.");
            Console.WriteLine("======================================================");

            // Send messages.
            await SendMessagesAsync(numberOfMessages);

            Console.ReadKey();

            await _topicClient.CloseAsync();
        }

        private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
            try {
                for (var i = 0; i < numberOfMessagesToSend; i++) {
                    // Create a new message to send to the topic
                    var messageBody = $"Message {i}";
                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

                    // Write the body of the message to the console
                    Console.WriteLine($"Sending message: {messageBody}");

                    // Send the message to the topic
                    await _topicClient.SendAsync(message);
                }
            } catch (Exception exception) {
                Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
            }
        }
    }
}

接收(基于Worker Role with Service Bus Queue模板)

using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1 {
    public class WorkerRole : RoleEntryPoint {
        // The name of your queue
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private const string SubscriptionName = "test-sub1";

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        private SubscriptionClient _client;
        private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);

        public override void Run() {
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
            _client.OnMessage((receivedMessage) => {
                try {
                    // Process the message
                    Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
                    var message = receivedMessage.GetBody<byte[]>();
                    Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
                } catch (Exception e) {
                    // Handle any message processing specific exceptions here
                    Trace.Write(e.ToString());
                }
            });

            _completedEvent.WaitOne();
        }

        public override bool OnStart() {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Initialize the connection to Service Bus Queue
            _client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
            return base.OnStart();
        }

        public override void OnStop() {
            // Close the connection to Service Bus Queue
            _client.Close();
            _completedEvent.Set();
            base.OnStop();
        }
    }
}

更新#2(2018/08/10)

在得到Arunprabhu的一些建议并知道我使用不同的库后,下面是我目前的解决方案,其中包含来自多个来源的部分。我有什么东西可以忽略,还有那些肩负在那里等等吗?目前收到的错误可能是针对另一个问题或已经回答过,所以在进一步研究之前不想发布它。

using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1 {
    public class WorkerRole : RoleEntryPoint {
        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);

        // The name of your queue
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private const string SubscriptionName = "test-sub1";

        // _client is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        private SubscriptionClient _client;

        public override void Run() {
            Trace.WriteLine("Starting processing of messages");

            try {
                this.RunAsync(this._cancellationTokenSource.Token).Wait();
            } catch (Exception e) {
                Trace.WriteLine("Exception");
                Trace.WriteLine(e.ToString());
            } finally {
                Trace.WriteLine("Finally...");
                this._runCompleteEvent.Set();
            }
        }

        public override bool OnStart() {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            var result = base.OnStart();

            Trace.WriteLine("WorkerRole has been started");

            return result;
        }

        public override void OnStop() {
            // Close the connection to Service Bus Queue
            this._cancellationTokenSource.Cancel();
            this._runCompleteEvent.WaitOne();

            base.OnStop();
        }

        private async Task RunAsync(CancellationToken cancellationToken) {
            // Configure the client
            RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);

            _runCompleteEvent.WaitOne();

            Trace.WriteLine("Closing");
            await _client.CloseAsync();
        }

        private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
            _client = new SubscriptionClient(connectionString, topicName, subscriptionName);

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                // Set it according to how many messages the application wants to process in parallel.
                MaxConcurrentCalls = 1,

                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                AutoComplete = false,
            };

            _client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
        }

        private async Task ProcessMessageAsync(Message message, CancellationToken token) {
            try {
                // Process the message
                Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
                await _client.CompleteAsync(message.SystemProperties.LockToken);
            } catch (Exception e) {
                // Handle any message processing specific exceptions here
                Trace.Write(e.ToString());
                await _client.AbandonAsync(message.SystemProperties.LockToken);
            }
        }

        private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
            Console.WriteLine("Exception context for troubleshooting:");
            Console.WriteLine($"- Endpoint: {context.Endpoint}");
            Console.WriteLine($"- Entity Path: {context.EntityPath}");
            Console.WriteLine($"- Executing Action: {context.Action}");
            return Task.CompletedTask;
        }
    }
}
c# azure azureservicebus azure-worker-roles azure-servicebus-topics
3个回答
2
投票

考虑到更新的问题更新#1(2018/08/09)的复杂性,我提供了一个单独的答案。

发件人和收件人正在使用不同的库。

发射器 - Microsoft.Azure.ServiceBus

接收器 - WindowsAzure.ServiceBus

Microsoft.Azure.ServiceBus将消息对象作为消息,其中WindowsAzure.ServiceBus具有BrokeredMessage。

在Microsoft.Azure.ServiceBus中有一个方法RegisterMessageHandler,这是WindowsAzure.ServiceBus中client.OnMessage()的替代方法。通过使用它,侦听器将消息作为Message对象接收。该库支持您期望的异步编程。

请参考here获取两个库的样本。


1
投票

如果您使用的是Visual Studio,则可以使用默认模板通过Service Bus Queue创建Azure Cloud Service和Worker Role。在那里,您需要在WorkerRole.cs中使用SubscriptionClient更改QueueClient。

然后,worker角色将保持活动状态,侦听主题订阅中的消息。

你可以找到样品here。您应该在Cloud Service中使用Service Bus Queue创建Worker角色

enter image description here


0
投票

我想知道你是否有任何具体的理由选择工作者角色而不是网络工作?如果没有,您可以使用具有ServiceBusTrigger属性的Web作业,它将使您的代码更简单。 more info here...

© www.soinside.com 2019 - 2024. All rights reserved.