RabbitMQ Pika连接重置,( - 1,ConnectionResetError(104,'通过peer重置连接'))

问题描述 投票:1回答:1

通过stackoverflow搜索并发布此问题,因为没有解决方案适用于我,我的问题可能与其他问题不同。

我正在编写一个脚本,它从rabbitMQ队列中获取一篇文章并处理文章以计算单词并从中提取关键词并将其转储到db中。我的脚本工作正常,但经过一段时间的执行后,我得到了这个例外 (-1, "ConnectionResetError(104, 'Connection reset by peer')")

我不知道为什么我得到这个。我已经尝试了很多可用于stackover flow的解决方案,没有一个对我有用。我编写了我的脚本并以两种不同的方式尝试了它。两者都工作正常,但一段时间后发生相同的异常。

这是我的第一个代码:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    # Edit 4
    def pika_connect():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
        channel = connection.channel()
        print ("In pika connect")
        Logger.log_message('Setting up input queue consumer')
        channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
        channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

        Logger.log_message('Starting loop')
        channel.start_consuming()

    #########

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Edit 5 starting 10 threads to listen to pika 

    for th in range(qthreads):
        Logger.log_message('Starting thread: '+str(th))
        try:
            t = Thread(target=pika_connect, args=())
            t.start()
        except Exception as e:
            Logger.error_message("Exception in starting threads " + str(e))



try:
    app_main()
except Exception as e:
    Logger.error_message("Exception in APP MAIN " + str(e))

这是我的第二个代码:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()
    print ("In app main")
    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
    channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

    Logger.log_message('Starting loop')

    try:
        channel.start_consuming()
    except Exception as e:
        Logger.error_message("Exception in start_consuming in main " + str(e))
        raise e


try:
    app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))

在我的第一个代码中,我使用了线程,因为我想加快处理文章的过程。 这是我的回电功能 qazxsw poi

编辑:完整代码

def on_message(ch, method, properties, message):
    Logger.log_message("Starting parsing new msg ")
    handle_message(message)

pika没有花费大量时间处理消息仍然面临连接重置问题。 **处理消息所需的时间:0.0005991458892822266 **

python-3.x multithreading rabbitmq pika python-pika
1个回答
4
投票

您的import os abspath = os.path.abspath(__file__) dname = os.path.dirname(abspath) os.chdir(dname) from Modules import Logger import pika import Config import json import pickle import Pipeline import sys import time import datetime import threading import queue import functools from pid.decorator import pidfile Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident']) #qthreads = Config.AMQ_DAEMONS['consumer']['threads'] results = queue.Queue() channel = None speedvars = None SPD_RECEIVED = 'received' SPD_DISCARDED = 'discarded' SPD_SENT = 'sent' class SpeedVars(object): vars = {} lock = None def __init__(self): self.lock = threading.Lock() def inc(self, var): self.lock.acquire() try: if var in self.vars: self.vars[var] += 1 else: self.vars[var] = 1 finally: self.lock.release() def dec(self, var): self.lock.acquire() try: if var in self.vars: self.vars[var] -= 1 else: Logger.error_message('Cannot decrement ' + var + ', not tracked') finally: self.lock.release() def get(self, var): out = None self.lock.acquire() try: if var in self.vars: out = self.vars[var] else: Logger.error_message('Cannot get ' + var + ', not tracked') finally: self.lock.release() return out def get_all(self): out = None self.lock.acquire() try: out = self.vars.copy() finally: self.lock.release() return out class SpeedTracker(threading.Thread): speedvars = None start_ts = None last_vars = {} def __init__(self, speedvars): super(SpeedTracker, self).__init__() self.start_ts = time.time() self.speedvars = speedvars Logger.log_message('Setting up speed tracker') def run(self): while True: time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']) prev = self.last_vars cur = self.speedvars.get_all() now = time.time() if len(prev) > 0: q = {} for key in cur: qty = cur[key] - prev[key] avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'] overall_avg = cur[key] / (now - self.start_ts) Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key]) + ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, ' + ', overall speed ' + '%0.2f' % overall_avg + '/sec') pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT] pending_avg = pending / (now - self.start_ts) Logger.log_message('Speed-tracking (pending): total ' + str(pending) + ', overall speed ' + '%0.2f' % pending_avg + '/sec') self.last_vars = cur class ResultsSender(threading.Thread): channel = None results = None speedvars = None def __init__(self, results, speedvars): super(ResultsSender, self).__init__() connection = pika.BlockingConnection(pika.ConnectionParameters( host=Config.AMQ_DAEMONS['base']['amq-host'])) self.channel = connection.channel() Logger.log_message('Setting up output exchange') self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct') self.results = results self.speedvars = speedvars def run(self): while True: item = self.results.get() self.channel.basic_publish( exchange=Config.AMQ_DAEMONS['consumer']['output'], routing_key='', body=item) self.speedvars.inc(SPD_SENT) def parse_message(message): try: bodytxt = message.decode('UTF-8') body = json.loads(bodytxt) return body except Exception as e: Logger.error_message("Cannot parse message - " + str(e)) raise e def get_body_elements(body): try: artid = str(body.get('article_id')) article_dt = datetime.datetime.fromtimestamp(body.get('pubTime')) date = article_dt.strftime(Config.DATE_FORMAT) article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')]) return (artid, date, article) except Exception as e: Logger.error_message("Cannot retrieve article attributes " + str(e)) raise e def process_article(id, date, text): global results, speedvars try: Logger.log_message('Processing article ' + id) keywords = Pipeline.extract_keywords(text) send_data = {"id": id, "date": date, "keywords": keywords} results.put(pickle.dumps(send_data)) # print('Queue Size:',results.qsize()) except Exception as e: Logger.error_message("Problem processing article " + str(e)) raise e def ack_message(ch, delivery_tag): """Note that `channel` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if channel.is_open: channel.basic_ack(delivery_tag) else: Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e)) # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. #pass def handle_message(connection, ch, delivery_tag, message): global speedvars start = time.time() thread_id = threading.get_ident() try: speedvars.inc(SPD_RECEIVED) body = parse_message(message) (id, date, text) = get_body_elements(body) words = len(text.split()) if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']: process_article(id, date, text) else: Logger.log_message('Ignoring article, over word count limit') speedvars.inc(SPD_DISCARDED) except Exception as e: Logger.error_message("Could not process message - " + str(e)) cb = functools.partial(ack_message, ch, delivery_tag) connection.add_callback_threadsafe(cb) Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag)) Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start)) # CALL BACK ## def on_message(ch, method, properties, message): ## global executor ## executor.submit(handle_message, message) def on_message(ch, method, header_frame, message, args): (connection, threads) = args delivery_tag = method.delivery_tag t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message)) t.start() threads.append(t) #################################################### @pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file']) def app_main(): global channel, results, speedvars speedvars = SpeedVars() speedtracker = SpeedTracker(speedvars) speedtracker.start() sender = ResultsSender(results, speedvars) sender.start() # Pika Connection connection = pika.BlockingConnection(pika.ConnectionParameters( host=Config.AMQ_DAEMONS['base']['amq-host'])) channel = connection.channel() Logger.log_message('Setting up input queue consumer') channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True) #channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True) channel.basic_qos(prefetch_count=1) threads = [] on_message_callback = functools.partial(on_message, args=(connection, threads)) channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input']) Logger.log_message('Starting loop') ## channel.start_consuming() try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() Wait for all to complete for thread in threads: thread.join() connection.close() app_main() 方法阻止了心跳,因为您的所有代码(包括Pika I / O循环)都在同一个线程上运行。查看handle_message如何在与Pikas I / O循环的单独线程上运行您的工作(this example),然后正确确认消息。


注意:RabbitMQ团队监控handle_message,有时只回答StackOverflow上的问题。

© www.soinside.com 2019 - 2024. All rights reserved.