我想知道是否有可以在队列之间移动消息的工具或库? 目前,我正在做类似下面的事情
public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("myqueue");
queue.CreateIfNotExists();
var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
queue.AddMessage(new CloudQueueMessage(messageData));
}
截至 (2018-09-11) 版本 1.4.1 的 Microsoft Azure 存储资源管理器 无法将消息从一个 Azure 队列移动到另一个队列。
我 blogged 一个简单的解决方案,可以将有毒消息传输回原始队列,并认为它可以节省一些人几分钟的时间。显然,您需要修复导致消息最终进入毒消息队列的错误!
您需要添加对 Microsoft.NET.Sdk.Functions 的 NuGet 包引用:
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
void Main()
{
const string queuename = "MyQueueName";
string storageAccountString = "xxxxxx";
RetryPoisonMesssages(storageAccountString, queuename);
}
private static int RetryPoisonMesssages(string storageAccountString, string queuename)
{
CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");
int count = 0;
while (true)
{
var msg = poisonqueue.GetMessage();
if (msg == null)
break;
poisonqueue.DeleteMessage(msg);
targetqueue.AddMessage(msg);
count++;
}
return count;
}
private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference(queuename);
return queue;
}
Azure 存储资源管理器版本 1.15.0 现在可以在 2020 年执行此操作。https://github.com/microsoft/AzureStorageExplorer/issues/1064
本质上,Azure 存储不支持将消息从一个队列移动到另一个队列。你需要自己做这件事。
实现将消息从一个队列移动到另一个队列的一种方法是从源队列中取出消息(通过调用
GetMessages
),读取消息的内容,然后在目标队列中创建一个新消息。这可以通过使用存储客户端库来完成。
Cerebrata Azure Management Studio
(付费产品,免费试用 15 天)。它具有此功能。
截至 (2018-09-11) 版本 1.4.1 的 Microsoft Azure 存储资源管理器 不支持移动队列消息。
这是米奇答案的更新版本,使用最新的 Microsoft.Azure.Storage.Queue 包。只需创建一个新的 .NET Console 应用程序,将上述包添加到其中,并将 Program.cs 的内容替换为以下内容:
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;
namespace PoisonMessageDequeuer
{
class Program
{
static async Task Main(string[] args)
{
const string queuename = "MyQueueName";
string storageAccountString = "xxx";
await RetryPoisonMesssages(storageAccountString, queuename);
}
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
{
var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");
var count = 0;
while (true)
{
var msg = await poisonqueue.GetMessageAsync();
if (msg == null)
break;
await poisonqueue.DeleteMessageAsync(msg);
await targetqueue.AddMessageAsync(msg);
count++;
}
return count;
}
private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
var storageAccount = CloudStorageAccount.Parse(storageAccountString);
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(queuename);
return queue;
}
}
}
如果您处理超过 1000 条消息,它仍然很慢,所以我建议您查看批处理 API 以获得更高的数量。
这是一个你可能会觉得有用的 python 脚本。你需要安装
azure-storage-queue
queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
if "poison" in queue.name:
print(queue.name)
targetQueueName = queue.name.replace("-poison", "")
while queueService.peek_messages(queue.name):
for message in queueService.get_messages(queue.name, 32):
print(".", end="", flush=True)
queueService.put_message(targetQueueName, message.content)
queueService.delete_message(queue.name, message.id, message.pop_receipt)
我不得不再次这样做,并花时间将我的片段更新到新的存储 SDK。有关详细信息,请参阅 https://www.bokio.se/engineering-blog/how-to-re-run-the-poison-queue-in-azure-webjobs/ 上的帖子。
这是我使用的代码
using Azure.Storage.Queues;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace AzureQueueTransfer
{
internal class Program
{
// Need Read, Update & Process (full url, can create in storage explorer)
private const string sourceQueueSAS = "";
// Need Add (full url, can create in storage explorer)
private const string targetQueueSAS = "";
private static async Task Main(string[] args)
{
var sourceQueue = new QueueClient(new Uri(sourceQueueSAS));
var targetQueue = new QueueClient(new Uri(targetQueueSAS));
var queuedAny = true;
while (queuedAny)
{
Thread.Sleep(30000); // Sleep to make sure we dont build too much backlog so we can process new messages on higher prio than old ones
queuedAny = false;
foreach (var message in sourceQueue.ReceiveMessages(maxMessages: 32).Value)
{
queuedAny = true;
var res = await targetQueue.SendMessageAsync(message.Body);
Console.WriteLine($"Transfered: {message.MessageId}");
await sourceQueue.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
Console.WriteLine($"Finished batch");
}
}
}
}
对于来这里寻找与@MitchWheats 答案相当的节点的任何人,请使用 Azure 函数。
import AzureStorage from 'azure-storage'
import { Context, HttpRequest } from '@azure/functions'
import util from 'util'
const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()
const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)
export async function run (context: Context, req: HttpRequest): Promise<void> {
try {
const poisonQueue = (req.query.queue || (req.body && req.body.queue));
const targetQueue = poisonQueue.split('-')[0]
let count = 0
while (true) {
const message = await getMessage(poisonQueue)
if (!message) { break; }
if (message.messageText && message.messageId && message.popReceipt) {
await createMessage(targetQueue, message.messageText)
await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
}
count++
}
context.res = {
body: `Replayed ${count} messages from ${poisonQueue} on ${targetQueue}`
};
} catch (e) {
context.res = { status: 500 }
}
}
要使用您需要的功能,您需要为用于存储队列的存储帐户提供连接信息。这是作为环境变量提供的。您要么提供
AZURE_STORAGE_ACCOUNT
和AZURE_STORAGE_ACCESS_KEY
,要么提供AZURE_STORAGE_CONNECTION_STRING
。有关更多信息,请参阅 Azure 存储 SDK 文档。
这篇中篇也写了几行
根据 Jon Canning 的回答更新了 python:
from azure.storage.queue import QueueServiceClient
queueService = QueueServiceClient.from_connection_string(conn_str="DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<key>;EndpointSuffix=core.windows.net")
for queue in queueService.list_queues():
if "poison" in queue.name:
print(queue.name)
targetQueueName = queue.name.replace("-poison", "")
queue = queueService.get_queue_client(queue=queue.name)
targetQueue = queueService.get_queue_client(queue=targetQueueName)
while queue.peek_messages() :
messages = queue.receive_messages()
for msg in messages:
targetQueue.send_message(msg.content)
queue.delete_message(msg)
正如 Mikael Eliasson 指出的那样,IGx89 答案中的代码 已损坏,因为
AddMessageAsync 将覆盖消息上的一些信息,然后 DeleteMessagAsync 将给出 404。更好的解决方案是复制 AddMessageAsync
的新消息中的值
请查看 RetryPoisonMesssages 的增强版本,它能够仅指定消息列表(而不是队列中的所有消息)并允许复制消息而不是移动它们。 它还记录每条消息的成功/失败。
/// <param name="storageAccountString"></param>
/// <param name="queuename"></param>
/// <param name="idsToMove">If not null, only messages with listed IDs will be moved/copied</param>
/// <param name="deleteFromPoisonQueue">if false, messages will be copied; if true, they will be moved
///Warning: if queue is big, keeping deleteFromPoisonQueue=false can cause the same row
///from poisonqueue to be copied more than once(the reason is not found yet)</param>
/// <returns></returns>
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename, string[] idsToMove=null, bool deleteFromPoisonQueue=false)
{
var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
var poisonQueueName = queuename + "-poison";
var poisonqueue = GetCloudQueueRef(storageAccountString, poisonQueueName);
var count = 0;
while (true)
{
var msg = await poisonqueue.GetMessageAsync();
if (msg == null)
{
Console.WriteLine("No more messages in a queue " + poisonQueueName);
break;
}
string action = "";
try
{
if (idsToMove == null || idsToMove.Contains(msg.Id))
{
var msgToAdd = msg;
if (deleteFromPoisonQueue)
{
//The reason is that AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404.
//The better solution is to copy the values into a new message for AddMessageAsync
msgToAdd = new CloudQueueMessage(msg.AsBytes);
}
action = "adding";
await targetqueue.AddMessageAsync(msgToAdd);
Console.WriteLine(action + " message ID " + msg.Id);
if (deleteFromPoisonQueue)
{
action = "deleting";
await poisonqueue.DeleteMessageAsync(msg);
}
Console.WriteLine(action + " message ID " + msg.Id);
}
}
catch (Exception ex)
{
Console.WriteLine("Error encountered when "+ action + " " + ex.Message + " at message ID " + msg.Id);
}
count++;
}
return count;
}
我发现这对运营人员来说是非常有用的工具
https://azure.microsoft.com/en-us/products/storage/storage-explorer/