使用 RabbitMQ 优化和扩展 Celery 以执行 I/O 密集型任务

问题描述 投票:0回答:0

我有一个 celery 任务,它接受来自生产者的关键字并使用该关键字查询数据库 -

DB1
,然后将其插入到另一个数据库 -
DB2
。一个任务周期大约需要 15 秒。

部署

  1. RabbitMQ Broker 负责路由任务
  2. GCP 托管实例组是消费者(部署为容器)。每个节点大小为 30 Core CPU 64GB Mem
  3. DB1 - SQL Server(在外部管理,我具有只读访问权限,我执行原始 SQL 查询)
  4. DB2 - PostgresSQL(我使用 SQLAlchemy 作为 ORM 来管理它)

我想要实现的目标

分布式消费者应该能够在一秒钟内处理队列中的 10000 个任务。

我遇到的问题

  1. 两个消费者正在执行相同的任务。我能够使用以下配置修复它
app.conf.task_acks_late = False
  1. Postgresql 写入期间的连接问题
# postgres sqlalchemy config

# create database engine
engine = create_engine(DB_URL, poolclass=QueuePool,
                       pool_size=30000,
                       max_overflow=6000,
                       pool_timeout=120)

# creating db session
SessionInstance = sessionmaker(bind=engine)
session = SessionInstance()

Session = scoped_session(SessionInstance)

# write in tasks.py
from psql_config import Session

...
session = Session()
with session.begin_nested():
    db_manager = WriteRawDBManager(session)
    db_manager.update_record(
        record=record_result,
    )
    Session.close()
...

写入没有问题。

  1. 芹菜并发
celery -A tasks worker --pool threads --concurrency=15000

仅仅增加并发量似乎还不够有效。据我了解,这不是一个 CPU 密集型任务,因此即使有这么多工作线程,CPU 使用率也低于 10%。添加更多节点会浪费资源,因为它们的 CPU 使用率非常低。

这是一项每周任务,生产者将在 60 分钟内向队列添加 600 万条消息。我希望能够在 120 分钟或更短的时间内消耗和处理全部 600 万个数据。另外,请记住对 postgresql 数据库的写入(因为我遇到了连接阻塞问题)

如果有人指导我为此获得更好的资源,那就太好了。

python postgresql sqlalchemy rabbitmq celery
© www.soinside.com 2019 - 2024. All rights reserved.