RabbitMQ StreamLostError with Pika when jobs is long

问题描述 投票:0回答:0

我有一个 python 服务,旨在运行长任务作业(有些可能需要 5 分钟)。

我运行长任务,最后我发送确认。

我发现工作时间越长,我就越有可能得到错误:

connection_lost: "StreamLostError: (\"Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')\",)"

这是我用来启动 RabbitMQ 连接的服务: bin/RabbitMqService.py

import pika
import json

from src.bin.LogService import LogService
logger = LogService.init()

from src.config.RabbitMqConfig import RabbitMqConfig

class RabbitMqService:

    def __init__(self) -> None:
        if not RabbitMqConfig.endpoint.strip():
            logger.warn('[HL] [AMQP] No endpoint configured')
            return None

        self.input_connection = pika.BlockingConnection(pika.URLParameters(RabbitMqConfig.endpoint))
        self.channel = self.input_connection.channel()
        self.channel.basic_qos(prefetch_count=1) # handling one message at a time

    def listen_input(self, queue, callback_function):
        self.channel.basic_consume(queue=queue, on_message_callback=callback_function)

    def start_consuming(self):
        self.channel.start_consuming()

    def acknowledge(channel, delivery_tag):
        channel.basic_ack(delivery_tag=delivery_tag)
        logger.debug('[HL] [AMQP] Message acknowledged with delivery_tag ' + str(delivery_tag))

    def send_output(channel, output_exchange, message_body):
        logger.debug('[HL] [AMQP] Publishing message to output queue')
        channel.basic_publish(exchange=output_exchange, routing_key='', body=json.dumps(message_body))
        logger.info('[HL] [AMQP] Message published top queue %s', output_exchange)

这是主要文件(为帖子做了简化):

import argparse
import json
import sys

from src.bin.LogService import LogService
logger = LogService.init()

from datetime import datetime as dt

from src.bin.MinioService import MinioService
from src.bin.RabbitMqService import RabbitMqService
from src.config.MinioConfig import MinioConfig
from src.config.RabbitMqConfig import RabbitMqConfig

from src.lib.SmoothMeasures import run_smoothing
from src.lib.CountCollects import run_collects

from src.config.SentryConfig import SentryConfig
from src.bin.SentryService import SentryService


minio_service = None

def smoothing_message_handler(channel, method_frame, properties, body):
    if not RabbitMqConfig.smoothing_output_exchange.strip():
        logger.warn('[HL] [AMQP] No output configured for smoothing')
        return None
        
    message_handler('smoothing_measures', channel, RabbitMqConfig.smoothing_output_exchange, method_frame, properties, body)

def collect_message_handler(channel, method_frame, properties, body):
    if not RabbitMqConfig.collect_output_exchange.strip():
        logger.warn('[HL] [AMQP] No output configured for collects')
        return None
            
    message_handler('count_collects', channel, RabbitMqConfig.collect_output_exchange, method_frame, properties, body)

def get_blob_info(json_body):
    [...]

    return {'path': path, 'bucket': bucket}

def message_handler(type, channel, output_exchange, method_frame, properties, body):
    try:
        blob_info = get_blob_info(json.loads(body))
    except:
        RabbitMqService.acknowledge(channel, method_frame.delivery_tag) # avoid testing same message again
        raise ValueError("Received body from rabbitMq is not a JSON / have not the correct properties")

    result = process_input_message(type, blob_info)
    RabbitMqService.send_output(channel, output_exchange, result)

    RabbitMqService.acknowledge(channel, method_frame.delivery_tag)

def process_input_message(type: str, blob_info: dict):
    logger.debug('[HL] Handle message, bucketName: %s, blobPath: %s', blob_info['path'], blob_info['bucket'])

    path = blob_info['path']
    bucket = blob_info['bucket']

    if path.count('input') != 1:
        raise Exception("Missing 'input' keyword in blob_path: %s", path)

    input_blob_content = minio_service.get_blob(bucket, path)

    # Here is the long process
    res = run_kpi(type, input_blob_content, path)

    return {'bucket': bucket, 'path': res['output_blob_path']}

def run_kpi(type: str, input_blob_content: dict, path: str):
    [...]

if __name__ == "__main__":
    SentryService.init(config=SentryConfig)

    logger.info("[HL] Parsing arguments ...")

    parser = argparse.ArgumentParser(description='Description du script')
    parser.add_argument('--smoothing', dest='smoothing_input_listener', action='store_true', help='Activer le listener pour le queue de smoothing')
    parser.add_argument('--collect', dest='collect_input_listener', action='store_true', help='Activer le listener pour le queue de collect')
    args = parser.parse_args()
    
    logger.info("[HL] Executing main with arguments %s ...", args)
    logger.info("[HL] Creating RabbitMq client and Minio Client ...")
    rabbit_smoothing_service = RabbitMqService()
    minio_service = MinioService(config=MinioConfig)

    try:
        if args.smoothing_input_listener:
            rabbit_smoothing_service.listen_input(queue=RabbitMqConfig.smoothing_input_queue, callback_function=smoothing_message_handler)
            logger.info("[HL] Smoothing Client is listening the queue : %s", RabbitMqConfig.smoothing_input_queue)

        if args.collect_input_listener:
            rabbit_smoothing_service.listen_input(queue=RabbitMqConfig.collect_input_queue, callback_function=collect_message_handler)
            logger.info("[HL] Collect Client is listening the queue : %s", RabbitMqConfig.collect_input_queue)

        rabbit_smoothing_service.start_consuming()
    except KeyboardInterrupt:
        sys.exit(0)

我做错了什么/理解错了吗?

python rabbitmq jobs
© www.soinside.com 2019 - 2024. All rights reserved.