我正在尝试解决一个涉及很多子问题的大数值问题,并且我正在使用Python的多处理模块(特别是Pool.map)将不同的独立子问题拆分到不同的核心上。每个子问题都涉及计算大量的子子问题,如果尚未通过任何进程计算这些结果,我会尝试通过将它们存储到文件中来有效地记忆这些结果,否则跳过计算并仅从文件中读取结果。
我遇到了文件的并发问题:不同的进程有时会检查是否已经计算了子子问题(通过查找存储结果的文件),如果没有,则运行计算,然后尝试将结果同时写入同一个文件。如何避免这样的书写冲突?
@GP89提到了一个很好的解决方案。使用队列将写入任务发送到对文件具有唯一写入访问权限的专用进程。所有其他工作人员都具有只读访问权限。这将消除碰撞。这是一个使用 apply_async 的示例,但它也适用于 map:
import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
在我看来,您需要使用
Manager
将结果临时保存到列表中,然后将列表中的结果写入文件。另外,使用 starmap
传递要处理的对象和托管列表。第一步是构建要传递给 starmap
的参数,其中包括托管列表。
from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd
def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)
if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])
# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])
with Pool() as p:
p.starmap(worker, params)
从此时起,您需要决定如何处理该列表。如果您有大量的 RAM 和庞大的数据集,请随意使用 pandas 连接。然后您可以非常轻松地将文件保存为 csv 或 pickle。
df = pd.concat(row, ignore_index=True)
df.to_pickle('data.pickle')
df.to_csv('data.csv')
针对评论说这是在集群上运行,一个不依赖进程间通信的简单选项是使用 Python 标准库中的 fcntl 锁定记忆文件。
这适用于 MacOS,我希望它适用于大多数 UNIX 系统,尽管它需要在您的特定网络存储实现上进行测试:
安全.py
import fcntl
import time
def myprint(*args):
print(time.ctime(), *args)
def main():
with open("safe.txt", "r+") as file:
myprint("locking")
# this will block (unless LOCK_EX | LOCK_NB is used)
fcntl.lockf(file, fcntl.LOCK_EX)
lines = file.readlines()
# make race conditions more likely
time.sleep(1)
# "1" or one more than the the previous entry
newval = int(lines[-1])+1 if lines else 1
print(newval)
file.write(str(newval) + "\n")
file.flush()
myprint("unlocking")
fcntl.lockf(file, fcntl.F_UNLCK)
if __name__ == '__main__':
main()
您可以通过在终端中运行以下命令来检查它是否在本地运行:
touch safe.txt # this needs to already exist
for x in 1 2 3 4 5
do
python safe.py &
done
cat safe.txt # should have 1-5 inside
如果将其与多处理结合起来,每个进程可能需要自己的文件描述符(因此在每个进程中单独运行
open()
)。
我想我也发布了一个更简单问题的解决方案,因为每当我寻找我的问题时,都会出现此页面。
我在某种程度上松散地基于@MikeHunter 的解决方案上面。我需要稍微不同的东西的原因是,我想在每个进程结束时写入的数组相当大,这意味着将它们放入队列中并从队列中获取它们并使用不同的进程写入它们意味着大量的酸洗以及对非常大的阵列进行拆解。这不处理OP要求的检查许多子问题和子子问题的问题,但它处理问题的“标题”!
我没有共享数组,而是将“0”放入队列中,并让进程获取“0”。如果没有东西可获取,进程就会空闲,直到有东西为止。如果有东西,进程就开始写入,写入后将东西放入队列中。重要的是,考虑到较长的写入过程,
q.get()
默认将timeout
设置为None
。如果写入时间很长,超时时间设置得更短,这当然会失败。
from multiprocessing import Process, Queue
import h5py
import numpy as np
from time import sleep, time
def func(i, q, filename, subfilename):
# Reading from the subfile
with h5py.File(subfilename, 'r') as ds:
array = ds['array'][:]
sleeptime = np.random.rand(1)*4 + 1
sleep(sleeptime[0])
# Print array loaded to compare to output in the summary file
print(i, f'{sleeptime[0]:.3f}', array)
# If anything is put in the queue it means that a process can start writing
q.get()
with h5py.File(filename, 'r+') as ds:
ds['array'][i, :] = array
# Indicate to the other processes that we are done writing to the summary
# file
q.put(0)
if __name__ == '__main__':
N = 10
Nsample = 5
subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]
for i in range(N):
with h5py.File(subfilenames[i], 'w') as ds:
disp = ds.create_dataset(
'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')
filename = 'test.h5'
with h5py.File(filename, 'w') as ds:
disp = ds.create_dataset('array', (N, Nsample), dtype='f')
# Create a queue to communicate between the writing status between the
# processes
q = Queue()
# Put a 0 in the queue to indicate that a worker can start writing
q.put(0)
# Start the timer
t0 = time()
# Distribute the work to the workers
processes = []
print(" T sleeptime array", flush=True)
print("-----------------------", flush=True)
for i in range(N):
p = Process(target=func, args=(
i, q, filename, subfilenames[i]))
p.start()
processes.append(p)
# Wait for the workers to finish
for p in processes:
p.join()
# Print time taken
print(f'Total time taken: {time()-t0:.2f} s')
如果将脚本另存为
hello.py
,您可以像这样运行并对输出进行排序:
python hello.py | sort
这应该生成这样的东西:
T sleeptime array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s
对照写入的 HDF5 文件进行检查:
h5dump test.h5
这应该会导致类似这样的结果
HDF5 "test.h5" {
GROUP "/" {
DATASET "array" {
DATATYPE H5T_IEEE_F32LE
DATASPACE SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
DATA {
(0,0): 4, 1, 1, 0, 2,
(1,0): 2, 1, 1, 1, 3,
(2,0): 1, 2, 2, 4, 3,
(3,0): 1, 4, 4, 3, 0,
(4,0): 4, 4, 4, 4, 1,
(5,0): 1, 3, 1, 0, 4,
(6,0): 4, 1, 0, 3, 0,
(7,0): 3, 4, 4, 0, 4,
(8,0): 4, 3, 3, 1, 4,
(9,0): 3, 3, 3, 4, 0
}
}
}
}
注意,使用 mpi4py 有更好的方法来做到这一点,但我需要用户不必担心 MPI。