我正在尝试减少在数据框内的每一行(最多200万行)上应用cantools库中的复杂功能所花费的时间:
Timestamp Type ID Data
0 16T122109957 0 522 b'0006'
1 16T122109960 0 281 b'0000ce52d2290000'
2 16T122109960 0 279 b'0000000000000000'
3 16T122109960 0 304 b'0000'
4 16T122109961 0 277 b'400000'
使用上述数据帧和读入的dbc文件。dbc文件是有关如何编码/解码数据的一组规则。
使用DataFrame可能最多需要10分钟:
df['decoded'] = df.apply(lambda x: dbc.decode_message(df['ID'][x], df['Data']))
将两列放入列表中,然后遍历列表只需要大约一分钟即可完成,但是当新数组保存到数据帧中时,会出现错误ValueError: array is too big
。预期如此,因为它很大。
示例循环代码:
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts)
我尝试了python向量化,这显然是最快的,并遇到了我似乎无法修复的错误TypeError: 'Series' objects are mutable, thus they cannot be hashed
。例如:
Data['dict'] = dbc.decode_message(df['ID'], df['Data'])
还有其他方法可以加快申请过程,还是应该尝试进行矢量化?
最小示例:
import cantools
import pandas as pd
df = pd.read_csv('file.log', skiprows=11, sep=';')
dbc = cantools.database.load_file('file.dbc')
# option 1 SLOW
df['decoded'] = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']))
# option 2 Faster...
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts) #< -- causes error for being to big
#option 3
df['dict'] = dbc.decode_message(df['ID'], df['Data']) #< --Error
只要cantools
库不支持在Series
或DataFrame
对象上工作,矢量化将不起作用。因此,使用apply
是唯一的方法。
由于dbc转换逐行工作,没有任何行间依赖性,因此您应该能够并行化它。
您需要
def decode(df):
dbc.decode_message(df['ID'], df['data'])
return df
import pandas as pd
import numpy as np
import multiprocessing as mp
def parallelApply(df, func, numChunks=4):
df_split = np.array_split(df, numChunks)
pool = mp.Pool(numChunks)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
df = parallelApply(df, decode)
parallelApply
所做的是将数据帧分成numChunks
块,并创建具有这么多条目的多处理池。然后在单独的进程中将函数
func
(在您的情况下为decode
应用于每个块。[
decode
返回它已更新的数据框块,pd.concat
将再次合并它们。
还有一个名为pandarallel的非常方便的库,它将为您完成此操作,但是在Windows上运行时,您将不得不使用WSL。:
pip install pandarallel
通话后
from pandarallel import pandarallel pandarallel.initialize()
您只需将呼叫转换为
df.apply(...)
到
df.parallel_apply(func)
该库将启动多个进程,并让每个进程处理一部分数据。