1 分钟内发送 100 万条消息到队列

问题描述 投票:0回答:1

我正在使用rabbitMQ接收消息。 我有 1,000,000 条消息需要在 1 分钟内发送并排队。我正在使用 python 的多重处理。我的代码可以发送超过 5 分钟。是否可以在一台电脑上1分钟内发送它们? 这是我的代码

import multiprocessing
from datetime import datetime
import pika
import time
import uuid
import sys

class PyPikaTest:

def publish(self,no_message,producer):
    c = pika.BlockingConnection(pika.ConnectionParameters(port=5672,virtual_host="test"))

    channel = c.channel()
    qname = str(uuid.uuid4())
    channel.queue_declare(queue='letterbox')

    print("start: %s" % (time.ctime(time.time())))

    for i in range(1, int(no_message)):
        sendtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-2]
        body = ('aa cccc ' + str(sendtime))
        _properties = pika.BasicProperties(
            content_type='application/json',
            content_encoding='utf-8',
            message_id=producer + "_message_no_" + str(i),
            timestamp=int(time.time())
        )

        channel.basic_publish(
            exchange='',
            routing_key='letterbox',
            properties=_properties,
            body=body
        )
    print("end: %s" % (time.ctime(time.time())))
    c.close()

def thread_publish(self, no_publisher, no_message):

    jobs = []
    for i in range(int(no_publisher)):
        process = multiprocessing.Process(target=self.publish,args=(no_message, "test_publisher_no_" + str(i)))
        jobs.append(process)
        #Start the threads (i.e. calculate the random number lists)
    for j in jobs:
        j.start()

        #Ensure all of the threads have finished
    for j in jobs:
        j.join()

    print("List processing complete.")

if __name__ == "__main__":

    print('starting .. %s')
    x = PyPikaTest()
    x.thread_publish(sys.argv[1],sys.argv[2])
python multithreading rabbitmq python-multiprocessing pika
1个回答
0
投票

您是否尝试过鼠兔非阻塞连接之一?我使用 AsyncioConnection。当仅从单个 Python 进程发布时,我每秒可以发送 14k-17k 条消息,即一分钟内大约 1mm 消息。这是在 2018 MacBook Pro i7 上,使用 Homebrew 和 Python 3.11 在本地安装的 RabbitMQ。

我创建了一个脚本,通过将其分成 10 批 100k 异步请求,我可以在 61 秒内发送 1mm 消息。

    total_request_time = 0
    batch_size = 100000
    number_of_batches = 10
    for i in range(number_of_batches):
        rr_start_time = time.time()
        tasks = [asyncio.create_task(send_message(client_api)) for _ in range(batch_size)]
        _ = await asyncio.wait(tasks)
        rr_taken = time.time() - rr_start_time
        total_request_time += rr_taken
        logger.info("batch %s of %s calls took %s seconds and %s calls/s", i, batch_size, rr_taken, batch_size / rr_taken)

为了充分披露,我运行了 4 个消费者,我发现如果队列不会变得太大,RabbitMQ 在发送时效果最好。

完整的示例代码可以在这里找到

这是我的程序输出。

I 2023-09-25 00:01:29,536 one_million_example       main                                 17  : Client connected
I 2023-09-25 00:01:35,772 one_million_example       main                                 41  : batch 0 of 100000 calls took 6.235662937164307 seconds and 16036.787268279675 calls/s
I 2023-09-25 00:01:41,478 one_million_example       main                                 41  : batch 1 of 100000 calls took 5.705104827880859 seconds and 17528.161710771692 calls/s
I 2023-09-25 00:01:47,438 one_million_example       main                                 41  : batch 2 of 100000 calls took 5.960079193115234 seconds and 16778.30054934751 calls/s
I 2023-09-25 00:01:53,282 one_million_example       main                                 41  : batch 3 of 100000 calls took 5.84413480758667 seconds and 17111.172704329678 calls/s
I 2023-09-25 00:01:59,617 one_million_example       main                                 41  : batch 4 of 100000 calls took 6.335207939147949 seconds and 15784.801534620101 calls/s
I 2023-09-25 00:02:05,506 one_million_example       main                                 41  : batch 5 of 100000 calls took 5.888730049133301 seconds and 16981.590116313437 calls/s
I 2023-09-25 00:02:12,120 one_million_example       main                                 41  : batch 6 of 100000 calls took 6.614094972610474 seconds and 15119.226502508423 calls/s
I 2023-09-25 00:02:18,071 one_million_example       main                                 41  : batch 7 of 100000 calls took 5.950514793395996 seconds and 16805.268698933756 calls/s
I 2023-09-25 00:02:24,473 one_million_example       main                                 41  : batch 8 of 100000 calls took 6.401550054550171 seconds and 15621.21660345697 calls/s
I 2023-09-25 00:02:30,549 one_million_example       main                                 41  : batch 9 of 100000 calls took 6.075830936431885 seconds and 16458.654140683902 calls/s
I 2023-09-25 00:02:30,549 one_million_example       main                                 44  : Request only performance: 1000000 total calls in 61.010910511016846 seconds @ 16390.511002444855 calls/s
© www.soinside.com 2019 - 2024. All rights reserved.