所以我最近需要使用Service Bus Topic and Subscriptions
,我已经关注了很多文章和教程。我已经能够成功实现Microsoft的Get started with Service Bus topics,并成功使用Visual Studio 2017's
Worker Role
模板来访问数据库。
但是,我对如何正确地“结合”这两者感到困惑。虽然Get started with Service Bus topics文章展示了如何创建2个应用程序,一个发送,一个接收然后退出,Worker Role
模板似乎无休止地循环使用await Task.Delay(10000);
。
我不确定如何正确地“啮合”两者。从本质上讲,我希望我的Worker Role
能够保持活力并永久地收听它的订阅(或直到它明显退出)。
任何指导都会很棒!
P.S。:如果你感兴趣的话,我已经在StackExchange - Software Engineering问了一个关于我应该用于我的案例场景的正确技术的相关问题。
更新#1(2018/08/09)
基于Arunprabhu's answer,这里有一些代码,说明我如何使用Message
的Visual Studio 2017
模板基于我阅读和接收的文章发送Worker Role with Service Bus Queue
。
发送(基于Get started with Service Bus topics)
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace TopicsSender {
internal static class Program {
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private static ITopicClient _topicClient;
private static void Main(string[] args) {
MainAsync().GetAwaiter().GetResult();
}
private static async Task MainAsync() {
const int numberOfMessages = 10;
_topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("======================================================");
// Send messages.
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await _topicClient.CloseAsync();
}
private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
try {
for (var i = 0; i < numberOfMessagesToSend; i++) {
// Create a new message to send to the topic
var messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
// Write the body of the message to the console
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the topic
await _topicClient.SendAsync(message);
}
} catch (Exception exception) {
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
}
}
接收(基于Worker Role with Service Bus Queue
模板)
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// QueueClient is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
public override void Run() {
Trace.WriteLine("Starting processing of messages");
// Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
_client.OnMessage((receivedMessage) => {
try {
// Process the message
Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
var message = receivedMessage.GetBody<byte[]>();
Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
}
});
_completedEvent.WaitOne();
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Initialize the connection to Service Bus Queue
_client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
return base.OnStart();
}
public override void OnStop() {
// Close the connection to Service Bus Queue
_client.Close();
_completedEvent.Set();
base.OnStop();
}
}
}
更新#2(2018/08/10)
在得到Arunprabhu的一些建议并知道我使用不同的库后,下面是我目前的解决方案,其中包含来自多个来源的部分。我有什么东西可以忽略,还有那些肩负在那里等等吗?目前收到的错误可能是针对另一个问题或已经回答过,所以在进一步研究之前不想发布它。
using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// _client is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
public override void Run() {
Trace.WriteLine("Starting processing of messages");
try {
this.RunAsync(this._cancellationTokenSource.Token).Wait();
} catch (Exception e) {
Trace.WriteLine("Exception");
Trace.WriteLine(e.ToString());
} finally {
Trace.WriteLine("Finally...");
this._runCompleteEvent.Set();
}
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
var result = base.OnStart();
Trace.WriteLine("WorkerRole has been started");
return result;
}
public override void OnStop() {
// Close the connection to Service Bus Queue
this._cancellationTokenSource.Cancel();
this._runCompleteEvent.WaitOne();
base.OnStop();
}
private async Task RunAsync(CancellationToken cancellationToken) {
// Configure the client
RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);
_runCompleteEvent.WaitOne();
Trace.WriteLine("Closing");
await _client.CloseAsync();
}
private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
_client = new SubscriptionClient(connectionString, topicName, subscriptionName);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
AutoComplete = false,
};
_client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
}
private async Task ProcessMessageAsync(Message message, CancellationToken token) {
try {
// Process the message
Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await _client.CompleteAsync(message.SystemProperties.LockToken);
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
await _client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
考虑到更新的问题更新#1(2018/08/09)的复杂性,我提供了一个单独的答案。
发件人和收件人正在使用不同的库。
发射器 - Microsoft.Azure.ServiceBus
接收器 - WindowsAzure.ServiceBus
Microsoft.Azure.ServiceBus将消息对象作为消息,其中WindowsAzure.ServiceBus具有BrokeredMessage。
在Microsoft.Azure.ServiceBus中有一个方法RegisterMessageHandler,这是WindowsAzure.ServiceBus中client.OnMessage()的替代方法。通过使用它,侦听器将消息作为Message对象接收。该库支持您期望的异步编程。
请参考here获取两个库的样本。
如果您使用的是Visual Studio,则可以使用默认模板通过Service Bus Queue创建Azure Cloud Service和Worker Role。在那里,您需要在WorkerRole.cs中使用SubscriptionClient更改QueueClient。
然后,worker角色将保持活动状态,侦听主题订阅中的消息。
你可以找到样品here。您应该在Cloud Service中使用Service Bus Queue创建Worker角色
我想知道你是否有任何具体的理由选择工作者角色而不是网络工作?如果没有,您可以使用具有ServiceBusTrigger属性的Web作业,它将使您的代码更简单。 more info here...