如何在SQS中解码celery消息

问题描述 投票:0回答:2

sqs 中的一些 celery 任务永远处于待处理状态,我想在删除之前阅读这些消息(任务)。 在进入 sqs 控制台时,我可以看到我尝试使用它进行解码的编码消息

value = base64.b64decode(value.encode('utf-8')).decode('utf-8')

这给了我带键的字典转储

['body', 'headers', 'content-type', 'properties', 'content-encoding']

在这个字典中,主体看起来像是编码的 我尝试用相同的方式解码它

value = base64.b64decode(value.encode('utf-8')).decode('utf-8')

但它给出错误说 UnicodeDecodeError:“utf8”编解码器无法解码位置 1 中的字节 0x87:无效的起始字节

我错过了什么吗? 如何解码此消息?有什么办法可以破解吗

django encoding celery amazon-sqs decoding
2个回答
4
投票

看来“Celery”使用“pickle.dump”将任务的有效负载转换为字节,然后编码为base64。进行相反的操作,我们再次获得有效负载。

import base64
import boto3
import pickle

queue_name = 'your-queue-name'
sqsr = boto3.resource('sqs')
queue = sqsr.get_queue_by_name(QueueName=queue_name)

for message in queue.receive_messages(MaxNumberOfMessages=10):
    print(f'{message.message_id} >>> {message.receipt_handle}'
          f' >>> {message.body} >>> {message.message_attributes}')
    body_dict = json.loads(base64.b64decode(message.body))
    celery_payload = pickle.loads(base64.b64decode(body_dict.get('body')))
    print(celery_payload)

0
投票

我重构了以下几点,使其在出现以下错误时起作用:

如果出现错误“_pickle.UnpicklingError: invalid load key, '['”

  1. 添加上面缺少的库“import json”

  2. 为 boto3.resource 函数调用提供凭证

  3. 无需解开 paylaod,因为 Celery 似乎不使用 pickle.dump 而仅使用 base64.encode

    导入boto3 进口泡菜 导入 json

    queue_name = '你的队列名称' sqs = boto3.resource('sqs',region_name="your_region_name", aws_access_key_id="your_access_key", aws_secret_access_key="your_secret_key") 队列 = sqsr.get_queue_by_name(QueueName=queue_name)

    对于queue.receive_messages中的消息(MaxNumberOfMessages=10): print(f'{message.message_id} >>> {message.receipt_handle}' f' >>> {message.body} >>> {message.message_attributes}') body_dict = json.loads(base64.b64decode(message.body)) celery_payload = base64.b64decode(body_dict.get('body')) 打印(celery_payload)

© www.soinside.com 2019 - 2024. All rights reserved.