从多个进程和线程读写SQL数据库

问题描述 投票:0回答:1

我正在构建一个应用程序,它可以持续地从API中获取多个城市的天气数据,并使用Python中的多个进程和线程将这些数据读写到SQL(MySQL)数据库。我对这个比较陌生。

假设有两个进程。每个进程产生N个线程,每个城市一个线程。每个线程对不同的城市执行相同的逻辑。

进程1中的线程1每隔60秒从API中抓取纽约的数据,并将其写入SQL表X;进程2中的线程2每隔60秒抓取洛杉矶的数据,并将其写入SQL表X。

表X具有以下属性。Record Index (Int), City (Varchar), Processed (Boolean), Some Data (Int). 当进程1中的线程向表X写入数据时,该 Processed 属性被标记为 False 默认情况下。

转到流程2...流程2中的线程1抓取未处理的数据(所有记录中的 Processed=False进程2中的线程2每60秒从表X中抓取洛杉矶的未处理数据,复制到表Y中,并在表X中标记该数据为已处理数据。

我使用SQLAlchemy对数据库进行读写。我在每个进程中通过以下方式初始化一个新引擎。

engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')

我通过执行SQL语句

with engine.begin() as connection:
     connection.execute(sql_statement)

我的问题是,我需要阻止进程1在表X中添加新的纽约数据(例如),直到进程2完成: 1)将未处理的纽约数据复制到表Y中,2)将复制的数据标记为已处理的数据。True 表X中。否则,有可能在进程1中向表X中添加新的数据,并在进程2中标记为已处理,而实际上并没有被复制到表Y中。

我相信我可以使用锁(multiprocessing.Lock())来解决这个问题,但除了会阻止所有进程处理纽约的线程外,还会阻止所有进程处理洛杉矶(以及任何其他城市)的线程。

我可以切换架构,让每个进程负责处理不同的城市,每个线程运行不同的逻辑(数据采集、数据处理)。本质上,这将与现在的情况相反。但是,我需要做的很多处理都是与CPU绑定的,所以我相信这样做可能会降低效率。

使用多处理锁是解决这个问题的最好方法吗?还有其他的选择吗?

python mysql multithreading sqlalchemy multiprocessing
1个回答
1
投票

我可以快速想到的一些东西,只是为了给你一些想法。

  1. 一个文件锁(在一个已知的位置,其名称可以从你操作的上下文中得出)。有一些库为你提供文件锁基元(filelock)。
  2. 在DB中实现一个你想要的粒度的锁定基元。这个基元可以是一个整数。两个进程必须同步进行。有可能两个进程同时读取基元,递增基元并尝试将其写回,但只有一个进程会成功写入--你必须处理这个问题

我有一个问题。你提到你在做CPU绑定的任务。这种情况下,线程模型在每个进程内的效果如何?

© www.soinside.com 2019 - 2024. All rights reserved.