Python Pandas 多处理泡菜错误

问题描述 投票:0回答:1

我在一个文件夹中有多个 .csv 文件。每个 .csv 文件都有来自股票列表的交易数据。我想从每个 .csv 中获取特定的数据部分(在本例中来自“BABA”代码),然后合并多天的部分。由于全局解释器锁定,在 150 个 .csv 文件上使用标准 For 循环需要大约 15 分钟的时间。

目标:使用多进程加速 For 循环

问题:使用多处理时,我收到错误:AttributeError: Can't pickle local object 'main.locals.compile' 底部错误的完整追溯。

下面的代码很慢,但可以使用 for 循环:

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using FOR LOOP ===

    for file in file_list:
        compile(file)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

当我将 for 循环更改为此代码以进行多处理时,我收到错误消息。

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using MULTIPROCESSING ===

    pool = Pool(processes = (multiprocessing.cpu_count()-1))
    results = pool.map(compile, file_list)
    pool.close()
    pool.join()
    results_df = pd.concat(results)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

错误回溯: 追溯(最近一次通话): 文件“/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py”,第 54 行,位于 主要的() 文件“/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py”,第 41 行,在 main 结果 = pool.map(编译,file_list) 地图中的文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py”,第 364 行 返回 self._map_async(func, iterable, mapstar, chunksize).get() 文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py”,第 771 行,在 get 提高自我价值 文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py”,第 537 行,在 _handle_tasks 放(任务) 发送文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/connection.py”,第 211 行 self._send_bytes(_ForkingPickler.dumps(obj)) 转储中的文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/reduction.py”,第 51 行 cls(buf, 协议).dump(obj) AttributeError:无法腌制本地对象'main.locals.compile'

python pandas pickle python-multiprocessing
1个回答
0
投票

我不确定你是否真的有你的导入和其他功能嵌套在里面

def main()
- 或者如果这是代码格式问题?

您需要将代码格式化为:

import glob
import pandas as pd
import numpy as np
from   multiprocessing import Pool

def compile(file):
    df = pd.read_csv(file)
    df = df.loc[df['UnderlyingSymbol'] == 'BABA'] 

    def market_delta():
        return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
    df['DMarket'] = market_delta().round(4)

    return df

def main():
    path = '/Users/DataFiles'
    file_list = glob.glob(path + '/*.csv')

    with Pool() as pool:
        results_df = pd.concat(pool.map(compile, file_list))

    print(results_df)
    results_df.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__': 
    main()
© www.soinside.com 2019 - 2024. All rights reserved.