在进程之间更新pandas数据帧

问题描述 投票:1回答:1

我有一个(某种程度上)多处理的最小测试示例,其中预期输出是共享的Pandas数据帧。但是,共享数据帧永远不会更新。在我的示例中,首先创建10个文本文件用于测试目的,每个文本文件包含与文件名对应的单个整数。 worker函数给出了10个文件路径和命名空间中的每一个用于共享数据帧,然后它分析每个文件并将“结果”输入到数据帧中的适当位置(出于测试目的,它是整数的总和)文件中给出的值以及列表中名为“常量”的每个常量。

有关在每项任务之后让数据框更新并让变量共享工作的任何想法?我犯了一个简单的错误吗?有几篇文章提出了这种共享数据框架的方法,但它们通常都有一个简单的结构,而我的结构之一就是让共享失败。例如,我试图遵循这里给出的方法:How to share pandas DataFrame object between processes?

from multiprocessing import Manager
import multiprocessing as mp
import pandas as pd
import os

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

ct = 1

for filename in test_filenames:
    with open(test_folder + '\\' + filename + '.txt', 'w') as f:
        f.write(str(ct))
    f.close()    

    ct += 1

def worker_function(file_paths, ns):

    dataframe = ns.df

    for file_path in file_paths:

        with open(file_path) as f:
            value = int(f.readline())
        f.close()

        filename = file_path.split( '\\' )[-1]    
        for constant in constants:
            result = value + constant 
            dataframe.at[constant, filename] = result

    ns.df = dataframe

def run_parallel(file_paths, number_procs, ns):    
    procs = []
    for i in range(number_procs):
        paths_load = file_paths[i::number_procs]
        proc = mp.Process(target=worker_function, args=(paths_load, ns))
        procs.append(proc)
        procs[i].start()
    for p in procs:
        p.join()

if __name__ == '__main__':        
    num_procs = 4
    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    output_df = pd.DataFrame(columns=files, index=constants)   
    mgr = Manager()
    ns = mgr.Namespace()
    ns.df = output_df

    run_parallel(file_paths, num_procs, ns)

    output_df = ns.df

***我编辑了标题以反映不再使用命名空间的解决方案。我接受了接受的答案并重新编写(下面)以尽可能少地使用代码并且不处理异常。如果要进行多处理,可以导入ProcessPoolExecutor。

from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

ct = 1

for filename in test_filenames:
    with open(test_folder + '\\' + filename + '.txt', 'w') as f:
        f.write(str(ct))

    ct += 1

def worker_function(file_path):

    with open(file_path) as f:
        value = int(f.readline())

    result_list = []
    filename = file_path.split( '\\' )[-1]    
    result_list.append(filename)
    for constant in constants:
        result = value + constant
        result_list.append(result)

    return result_list

if __name__ == '__main__':

    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    output_df = pd.DataFrame(columns=constants, index=files)

    with ThreadPoolExecutor(max_workers=4) as executor:
        pool = {executor.submit(worker_function, p): p for p in file_paths}

        for future in as_completed(pool):
            worker_result = future.result()
            output_df.loc[worker_result[0]] = worker_result[1:]
python pandas multiprocessing
1个回答
0
投票

concurrent.futures模块有助于您通过令人尴尬的并行数据查找或处理步骤受CPU或I / O限制的工作流程。

对于您的情况,它应如下所示。我不在Windows上,所以我没有尝试重新创建文件名来测试它,但我希望这个结构让你对模式有所了解。请注意,我使用多个线程而不是进程,因为worker函数主要用于I / O而不是处理。

from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import pandas as pd

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

#ct = 1

def file_counter(ct=1):
    for filename in test_filenames:
        with open(test_folder + '\\' + filename + '.txt', 'w') as f:
            f.write(str(ct))
        # no need to use f.close() with a context manager
        ct += 1

def worker_function(file_path):
    result_list = []
    with open(file_path) as f:
        value = int(f.readline())
    # no need to use f.close() with a context manager
    filename = file_path.split( '\\' )[-1]    
    for constant in constants:
        result = value + constant
        result_list.append((constant, filename, result))
    return result_list


if __name__ == '__main__':
    file_counter() # keep execution below the if...main
    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    dataframe_collection = []

    # for I/O you should prefer threads over processes
    with ThreadPoolExecutor(max_workers=4) as executor:
        pool = {executor.submit(worker_function, p): p for p in file_paths}

        for future in as_completed(pool):
            worker_result = future.result()
            if isinstance(worker_result, Exception):  # choose your own exception types
                # handle the exception
                pass
            else:
                output_df = pd.DataFrame(data=worker_result, columns=files, index=constants)
                dataframe_collection.append(output_df)

    # now concatenate all the DataFrames
    single_df = pd.concat(dataframe_collection)
© www.soinside.com 2019 - 2024. All rights reserved.