我有一个 celery 任务,它接受来自生产者的关键字并使用该关键字查询数据库 -
DB1
,然后将其插入到另一个数据库 - DB2
。一个任务周期大约需要 15 秒。
部署
我想要实现的目标
分布式消费者应该能够在一秒钟内处理队列中的 10000 个任务。
我遇到的问题
app.conf.task_acks_late = False
# postgres sqlalchemy config
# create database engine
engine = create_engine(DB_URL, poolclass=QueuePool,
pool_size=30000,
max_overflow=6000,
pool_timeout=120)
# creating db session
SessionInstance = sessionmaker(bind=engine)
session = SessionInstance()
Session = scoped_session(SessionInstance)
# write in tasks.py
from psql_config import Session
...
session = Session()
with session.begin_nested():
db_manager = WriteRawDBManager(session)
db_manager.update_record(
record=record_result,
)
Session.close()
...
写入没有问题。
celery -A tasks worker --pool threads --concurrency=15000
仅仅增加并发量似乎还不够有效。据我了解,这不是一个 CPU 密集型任务,因此即使有这么多工作线程,CPU 使用率也低于 10%。添加更多节点会浪费资源,因为它们的 CPU 使用率非常低。
这是一项每周任务,生产者将在 60 分钟内向队列添加 600 万条消息。我希望能够在 120 分钟或更短的时间内消耗和处理全部 600 万个数据。另外,请记住对 postgresql 数据库的写入(因为我遇到了连接阻塞问题)
如果有人指导我为此获得更好的资源,那就太好了。