我有一种情况,我正在调用Azure Service Bus库的SubscriptionClient类的RegisterMessageHandler。基本上,我在基于服务矩阵环境的服务之一中从服务总线接收消息作为无状态服务时使用的是基于触发器的方法。因此,我不会立即关闭subscriptionClient对象,而是在Service的整个生命周期中保持打开状态,以便它继续接收来自Azure服务总线主题的消息。
并且当服务需要关闭(由于某些原因)时,我想处理被传递到Service Fabric服务中的取消令牌。
我的问题是,如何处理RegisterMessageHandler方法中的取消令牌,该方法在收到新消息时会被调用?我也想“优雅地”处理订阅客户端的关闭,即,如果消息已经在处理中,那么我希望该消息得到完全处理,然后我想关闭连接。下面是我正在使用的代码。
当前,我们正在采用以下方法:1.使用信号量锁定锁定消息的过程,并在finally块中释放该锁定。2.每当取消操作完成时,都调用cancellationToken.Register方法来处理取消令牌。释放注册方法中的锁定。
public class AzureServiceBusReceiver
{
private SubscriptionClient subscriptionClient;
private static Semaphore semaphoreLock;
public AzureServiceBusReceiver(ServiceBusReceiverSettings settings)
{
semaphoreLock = new Semaphore(1, 1);
subscriptionClient = new SubscriptionClient(
settings.ConnectionString, settings.TopicName, settings.SubscriptionName, ReceiveMode.PeekLock);
}
public void Receive(
CancellationToken cancellationToken)
{
var options = new MessageHandlerOptions(e =>
{
return Task.CompletedTask;
})
{
AutoComplete = false,
};
subscriptionClient.RegisterMessageHandler(
async (message, token) =>
{
semaphoreLock.WaitOne();
if (subscriptionClient.IsClosedOrClosing)
return;
CancellationToken combinedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, token).Token;
try
{
// message processing logic
}
catch (Exception ex)
{
await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken);
}
finally
{
semaphoreLock.Release();
}
}, options);
cancellationToken.Register(() =>
{
semaphoreLock.WaitOne();
if (!subscriptionClient.IsClosedOrClosing)
subscriptionClient.CloseAsync().GetAwaiter().GetResult();
semaphoreLock.Release();
return;
});
}
}