从并行函数返回数据帧字典?

问题描述 投票:0回答:3

Python 和 GitHub/stackoverflow 新手,第一次尝试使用 joblib 和多处理来加快我在 Python 中的工作流程。

我定义了一个空的

OrderedDict
来存储由函数(my_function)生成的
DataFrame
。该函数接受单独
DataFrame
列的元素,执行一些操作,并且应该返回(希望已填充的)
OrderedDict
和另一个
DataFrame

请允许我提供一些伪代码来解释这一点:

from joblib import Parallel, delayed
from collections import OrderedDict
from tqdm import tqdm

import pandas as pd
import multiprocessing

my_dict = OrderedDict()
my_df = DataFrameofvalues

def my_function(k):

  my_dict[k] = someoperationswithpandasresultinginDataFrames
  
  my_df = someooperationswithpandas
  
  return my_dict, my_df
  
num_cores = multiprocessing.cpu_count()
inputs = tqdm(my_df['my_column'])

if __name__ == '__main__':
  my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

这会导致以下错误:

File "<ipython-input-52-df771b916ba5>", line 8, in <module>
    my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k) for k in inputs)

ValueError: too many values to unpack (expected 2)

我想我忽略了一些小事,但我就是找不到它。有人可以看一下并帮助我吗?

我在网上找不到太多关于如何准确地计算出我的函数试图解包多少个值的信息(我猜测输入中的元素数量?)或者它是否给了我所有的

DataFrame
应该一次性进入
OrderedDict

根据进一步的故障排除进行编辑:

我想我知道问题出在哪里:该函数正在迭代输入并简单地生成数据帧,然后它无法将其组合成它所期望的

dict
。我通过设置
inputs = tqdm(my_df.loc[0:1, 'my_column'])
解决了这个问题。当我这样做时它可以工作,但如果我将其设置为
inputs = tqdm(my_df.loc[0:2, 'my_column'])
,则无法解压。但到目前为止还没有解决方案。

python function dataframe dictionary
3个回答
1
投票

我相信这与语法有关。您没有以正确的方式为您的函数提供参数。您可以尝试将最后一行分成更小的部分,以找出哪部分损坏。

此外,这不是正确的列表理解:

(delayed(my_function)(k) for k in inputs)

也许你想要这个:

[delayed(my_function(k)) for k in inputs]

希望这对您有帮助。祝你好运!


1
投票

弄清楚如何获得我想要的东西并认为我会分享。

以下伪代码片段:

if __name__ == '__main__':
  my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

实际上给了我一个数据框列表。我把它改为:

if __name__ == '__main__':
  my_list = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

for i in range(len(my_list)):
   if len(results[i]) > 0:
        my_list[i] = my_list[i].reset_index(drop = True)            
        my_dict[str(my_list[i].loc[0,'col1'])] = my_list[i]

现在返回数据帧的字典。不完全是我最初想要的,但就我的目的而言,甚至更好。


0
投票

TL;博士:

#Python3

from multiprocessing import Process, Manager
from collections import OrderedDict


def update_dict(my_dict, key):
    # Insert your DataFrame calculations here!
    my_dict[key] = {'1st df': 'result_df_1',
                    '2nd df': 'result_df_2'}
    return


if __name__ == "__main__":
    # whatever your inputs are
    inputs = [x for x in range(4)]

    manager = Manager()
    global_dict = manager.dict()
    job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
    _ = [p.start() for p in job]
    _ = [p.join() for p in job]

    [print(f"{x}") for x in global_dict.items()]
    
    # N.B assumes numeric (sortable) keys:
    # dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
    ordered_global_dict = OrderedDict(sorted(global_dict.items(), key=lambda t: t[0]))
    print(ordered_global_dict.items())

    # accessing the results dataframe from the dict
    results_df_01 = ordered_global_dict[0]['results_df_1']
    results_df_02 = ordered_global_dict[0]['results_df_2']

#output:
odict_items([(0, {'1st df': 'result_df_1', '2nd df': 'result_df_2'}), (1, {'1st df': 'result_df_1', '2nd df': 'result_df_2'}), (2, {'1st df': 'result_df_1', '2nd df': 'result_df_2'}), (3, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})])


说明

问得好,尽管你想要实现的目标有点模糊。例如,您从每个进程返回一个

my_df
,但错误地将输出数据帧的 all 分配给单个变量:
my_dict, my_df = Parallel(...

根据我的理解,我会回答,就好像您需要

my_function
来更新以下形式的全局字典:
{key: {secondary_key: <dataframe>}}

让我们一点一点地过一遍。 这是我从 S.O 上的一个相关问题中找到的答案,我的答案基于:

(1)启动每个进程,以及(2)必须在多个进程之间复制 pandas.DataFrame (等),您会产生大量开销。如果您只需要并行填充一个字典,我建议使用共享内存字典。如果没有密钥会被覆盖,那么这很容易,您不必担心锁。

这是他们提供的解决方案:

>>> from multiprocess import Process, Manager
>>> 
>>> def f(d, x):
...   d[x] = x**2
... 
>>> manager = Manager()
>>> d = manager.dict()
>>> job = [Process(target=f, args=(d, i)) for i in range(5)]
>>> _ = [p.start() for p in job]
>>> _ = [p.join() for p in job]
>>> print d
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

通过少量修改,我们可以轻松地重新调整此代码的用途以供您使用。

def f(d, x):
    d[x] = x**2

变成(使用你的变量名称)。

def update_dict(my_dict, key):
    # Insert your DataFrame calculations here!
    my_dict[key] = {'1st df': 'result_df_1',
                    '2nd df': 'result_df_2'}
    return
 

剩下的代码变成:

inputs = [x for x in range(4)]

manager = Manager()
global_dict = manager.dict()
job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
_ = [p.start() for p in job]
_ = [p.join() for p in job]
[print(f"{x}") for x in global_dict.items()]

打印为:

#python3.9
(1, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})
(0, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})
(2, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})
(3, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})

请注意,这里的字典是无序的。最后一步是我们为您订购词典。假设您的键是整数,您可以使用:

from collections import OrderedDict
# dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
ordered_global_dict = OrderedDict(sorted(global_dict.items())
© www.soinside.com 2019 - 2024. All rights reserved.