我在一个文件夹中有多个 .csv 文件。每个 .csv 文件都有来自股票列表的交易数据。我想从每个 .csv 中获取特定的数据部分(在本例中来自“BABA”代码),然后合并多天的部分。由于全局解释器锁定,在 150 个 .csv 文件上使用标准 For 循环需要大约 15 分钟的时间。
目标:使用多进程加速 For 循环
问题:使用多处理时,我收到错误:AttributeError: Can't pickle local object 'main.locals.compile' 底部错误的完整追溯。
下面的代码很慢,但可以使用 for 循环:
def main():
import pandas as pd
import glob
import numpy as np
import multiprocessing
from multiprocessing import Pool
path = '/Users/DataFiles' #multiple .csv files located here
file_list = glob.glob(path + '/*.csv')
stock_list = []
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file
#=== make some changes to dataframe ===
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
#=== append dataframes to list ===
stock_list.append(df)
#=== using FOR LOOP ===
for file in file_list:
compile(file)
#=== combine dataframes from list into one dataframe and export
stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()
当我将 for 循环更改为此代码以进行多处理时,我收到错误消息。
def main():
import pandas as pd
import glob
import numpy as np
import multiprocessing
from multiprocessing import Pool
path = '/Users/DataFiles' #multiple .csv files located here
file_list = glob.glob(path + '/*.csv')
stock_list = []
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file
#=== make some changes to dataframe ===
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
#=== append dataframes to list ===
stock_list.append(df)
#=== using MULTIPROCESSING ===
pool = Pool(processes = (multiprocessing.cpu_count()-1))
results = pool.map(compile, file_list)
pool.close()
pool.join()
results_df = pd.concat(results)
#=== combine dataframes from list into one dataframe and export
stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()
错误回溯: 追溯(最近一次通话): 文件“/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py”,第 54 行,位于 主要的() 文件“/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py”,第 41 行,在 main 结果 = pool.map(编译,file_list) 地图中的文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py”,第 364 行 返回 self._map_async(func, iterable, mapstar, chunksize).get() 文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py”,第 771 行,在 get 提高自我价值 文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py”,第 537 行,在 _handle_tasks 放(任务) 发送文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/connection.py”,第 211 行 self._send_bytes(_ForkingPickler.dumps(obj)) 转储中的文件“/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/reduction.py”,第 51 行 cls(buf, 协议).dump(obj) AttributeError:无法腌制本地对象'main.locals.compile'
我不确定你是否真的有你的导入和其他功能嵌套在里面
def main()
- 或者如果这是代码格式问题?
您需要将代码格式化为:
import glob
import pandas as pd
import numpy as np
from multiprocessing import Pool
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA']
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
return df
def main():
path = '/Users/DataFiles'
file_list = glob.glob(path + '/*.csv')
with Pool() as pool:
results_df = pd.concat(pool.map(compile, file_list))
print(results_df)
results_df.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()