我正在尝试并行处理数据。我有一个存储数据的HDF5文件,每个传感器都有一个表格。传感器的数据独立于其他传感器。我的HDF5文件的结构如下:
/root
/sens_metadata
/sens1
/sens2
...
每个传感器表包含一个datetime[64]
索引,3列数据和2列分数表示数据的可信度。我的问题是,以下哪一种是更好的编程方式:
为每个子进程打开与HDF5文件的连接
def parfunc(sens_id):
with pd.HDFStore('data.h5', 'r') as store:
try:
df = store[sens_id]
except KeyError:
pass
else:
# Do work on the df
def main():
import multiprocessing as mp
maxproc = mp.cpu_count()
with pd.HDFStore('data.h5', 'r') as store:
sens_list = store['sens_metadata'].index.tolist()
with mp.Pool(maxproc, maxtaskperchild=100) as p:
ret = p.map(parfunc, sens_list)
读取主线程中的传感器,在每次迭代时重新初始化Pool
def parfunc(df):
# Do work on the df
def main():
import multiprocessing as mp
maxproc = mp.cpu_count()
i = 0
df_list, ret = [], []
with pd.HDFStore('data.h5', 'r') as store:
sens_list = store['sens_metadata'].index.tolist()
for sens in sens_list:
try:
df_list.extend([store[sens]])
except KeyError:
pass
else:
if i == maxproc:
with mp.Pool(maxproc) as p:
ret.extend(p.map(parfunc, df_list))
i, df_list = 0, []
i += 1
现在大约需要0.25秒来获取表的数据。但是,该表的大小只会增加,获取数据所需的时间越来越长。在单个过程中,处理数据表大约需要1分钟。
上述选项中的哪一种是更好的方法?还是还有另一种更好的方法?
无法传递HDF5缓冲区对象,因为无法对其进行腌制。 (对象是WeakValueDictionary
的子级。)
from functools import partial
def parfunc(hdf_buff, sens_id):
try:
df = hdf_buff[sens_id]
except KeyError:
pass
else:
# Do work on the df
def main():
import multiprocessing as mp
maxproc = mp.cpu_count()
with pd.HDFStore('data.h5', 'r') as store:
sens_list = store['sens_metadata'].index.tolist()
with mp.Pool(maxproc, maxtaskperchild=100) as p:
ret = pd.concat(p.map(partial(parfunc, hdf_buff=store), sens_list))
也许有点老,但是如果有人正在处理一些相关问题,它可能会有所帮助。我正在阅读有关您无法选择的选项的信息,因为在阅读此link之后,我遇到了同样的问题。我实现了这样的内容
HDF_LOCK = threading.Lock()
HDF_PATH = 'path'
@contextmanager
def locked_file():
with HDF_LOCK:
with h5py.File(HDF_PATH, 'r') as file:
yield file
def process_files():
with locked_file() as file:
timestamp = file.attrs.get('start_timestamp', 0)
dataset = file.get('series')
with concurrent.futures.ProcessPoolExecutor(10) as executor:
for group, res in ((group, executor.submit(process_groups, group,
timestamp)) for group in dataset):
print(res.result())
process_groups是我处理HDF5文件的函数。您可以在主函数中使用locked_file()函数来处理文件。