读取多处理HDF5文件,更好,多个连接或重复生成

问题描述 投票:3回答:1

我正在尝试并行处理数据。我有一个存储数据的HDF5文件,每个传感器都有一个表格。传感器的数据独立于其他传感器。我的HDF5文件的结构如下:

/root
    /sens_metadata
    /sens1
    /sens2
    ...

每个传感器表包含一个datetime[64]索引,3列数据和2列分数表示数据的可信度。我的问题是,以下哪一种是更好的编程方式:

选项1

为每个子进程打开与HDF5文件的连接

def parfunc(sens_id):
    with pd.HDFStore('data.h5', 'r') as store:
        try:
            df = store[sens_id]
        except KeyError:
            pass
        else:
            # Do work on the df

def main():
    import multiprocessing as mp
    maxproc = mp.cpu_count()
    with pd.HDFStore('data.h5', 'r') as store:
        sens_list = store['sens_metadata'].index.tolist()
    with mp.Pool(maxproc, maxtaskperchild=100) as p:
        ret = p.map(parfunc, sens_list)

选项2

读取主线程中的传感器,在每次迭代时重新初始化Pool

def parfunc(df):
    # Do work on the df

def main():
    import multiprocessing as mp
    maxproc = mp.cpu_count()
    i = 0
    df_list, ret = [], []
    with pd.HDFStore('data.h5', 'r') as store:
        sens_list = store['sens_metadata'].index.tolist()
        for sens in sens_list:
            try:
                df_list.extend([store[sens]])
            except KeyError:
                pass
            else:
                if i == maxproc:
                    with mp.Pool(maxproc) as p:
                        ret.extend(p.map(parfunc, df_list))
                    i, df_list = 0, []
                i += 1

现在大约需要0.25秒来获取表的数据。但是,该表的大小只会增加,获取数据所需的时间越来越长。在单个过程中,处理数据表大约需要1分钟。

上述选项中的哪一种是更好的方法?还是还有另一种更好的方法?


不可能的选项:

无法传递HDF5缓冲区对象,因为无法对其进行腌制。 (对象是WeakValueDictionary的子级。)

from functools import partial

def parfunc(hdf_buff, sens_id):
    try:
        df = hdf_buff[sens_id]
    except KeyError:
        pass
    else:
        # Do work on the df

def main():
    import multiprocessing as mp
    maxproc = mp.cpu_count()
    with pd.HDFStore('data.h5', 'r') as store:
        sens_list = store['sens_metadata'].index.tolist()
        with mp.Pool(maxproc, maxtaskperchild=100) as p:
            ret = pd.concat(p.map(partial(parfunc, hdf_buff=store), sens_list))
python-3.x pandas multiprocessing hdf5
1个回答
0
投票

也许有点老,但是如果有人正在处理一些相关问题,它可能会有所帮助。我正在阅读有关您无法选择的选项的信息,因为在阅读此link之后,我遇到了同样的问题。我实现了这样的内容

HDF_LOCK = threading.Lock()
HDF_PATH = 'path'

@contextmanager
def locked_file():
  with HDF_LOCK:
    with h5py.File(HDF_PATH, 'r') as file:
        yield file

def process_files():
    with locked_file() as file:
        timestamp = file.attrs.get('start_timestamp', 0)
        dataset = file.get('series')

        with concurrent.futures.ProcessPoolExecutor(10) as executor:
            for group, res in ((group, executor.submit(process_groups, group,
                 timestamp)) for group in dataset):
                print(res.result())

process_groups是我处理HDF5文件的函数。您可以在主函数中使用locked_file()函数来处理文件。

© www.soinside.com 2019 - 2024. All rights reserved.