我正在构建一个应用程序,它可以持续地从API中获取多个城市的天气数据,并使用Python中的多个进程和线程将这些数据读写到SQL(MySQL)数据库。我对这个比较陌生。
假设有两个进程。每个进程产生N个线程,每个城市一个线程。每个线程对不同的城市执行相同的逻辑。
进程1中的线程1每隔60秒从API中抓取纽约的数据,并将其写入SQL表X;进程2中的线程2每隔60秒抓取洛杉矶的数据,并将其写入SQL表X。
表X具有以下属性。Record Index (Int), City (Varchar), Processed (Boolean), Some Data (Int)
. 当进程1中的线程向表X写入数据时,该 Processed
属性被标记为 False
默认情况下。
转到流程2...流程2中的线程1抓取未处理的数据(所有记录中的 Processed=False
进程2中的线程2每60秒从表X中抓取洛杉矶的未处理数据,复制到表Y中,并在表X中标记该数据为已处理数据。
我使用SQLAlchemy对数据库进行读写。我在每个进程中通过以下方式初始化一个新引擎。
engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')
我通过执行SQL语句
with engine.begin() as connection:
connection.execute(sql_statement)
我的问题是,我需要阻止进程1在表X中添加新的纽约数据(例如),直到进程2完成: 1)将未处理的纽约数据复制到表Y中,2)将复制的数据标记为已处理的数据。True
表X中。否则,有可能在进程1中向表X中添加新的数据,并在进程2中标记为已处理,而实际上并没有被复制到表Y中。
我相信我可以使用锁(multiprocessing.Lock())来解决这个问题,但除了会阻止所有进程处理纽约的线程外,还会阻止所有进程处理洛杉矶(以及任何其他城市)的线程。
我可以切换架构,让每个进程负责处理不同的城市,每个线程运行不同的逻辑(数据采集、数据处理)。本质上,这将与现在的情况相反。但是,我需要做的很多处理都是与CPU绑定的,所以我相信这样做可能会降低效率。
使用多处理锁是解决这个问题的最好方法吗?还有其他的选择吗?
我可以快速想到的一些东西,只是为了给你一些想法。
我有一个问题。你提到你在做CPU绑定的任务。这种情况下,线程模型在每个进程内的效果如何?