我正在尝试使用 cythonise 函数来替换极大的
pandas.DataFrame
(100,000,000 行)上的过滤操作。目前,它将 df
列 (Z, T, M
) 中的值与单个值 (Z1, Z2, T1, T2, M1, M2
) 进行比较,但最终将对这些值的一维数组执行 rowise:
# Python
# df: pandas.DataFrame
# Z, T, M: columns of df, float64 values
# I: column of df, uint32 values
# Z1, Z2, T1, T2, M1, M2: float64 values
res = df.query(
f"{Z1} < Z < {Z2} and {T1} < T < {T2} and {M1} < M < {M2}",
engine="numexpr",
)[["T", "I", "M"]]
df.query(..., engine="numexpr")
的速度已经比 df.loc[(df['Z'] > Z1) & (df['Z'] < Z2) & ...]
有所提高。我什至使用装饰器进一步改进了它@jit(nopython=False, cache=True, parallel=True)
。
我现在正在尝试重构它以与 cython 一起使用。然而,经过多轮错误修复和细化后,我无法使用以下命令来构建它(在与
module.pyx
、setup.py
和 pyproject.toml
相同的目录中的终端中执行):
python setup.py build_ext --inplace
我现在遇到以下无法解决的错误:
Error compiling Cython file: Invalid types for '>' (float64_t[::1], float64_t)
这是我第一次使用 cythonising 代码,我已经阅读了以下有用的文档:
#!/usr/bin/env python
import numpy
from Cython.Build import build_ext, cythonize
from setuptools import Extension, setup
extensions = [Extension("module", ["module.pyx"])]
setup(
ext_modules=cythonize(extensions),
include_dirs=[numpy.get_include()],
cmdclass={"build_ext": build_ext},
)
[build-system]
requires = ["setuptools", "wheel", "Cython"]
#!/usr/bin/env python
# cython: infer_types=True
import numpy as np
DTYPE = np.dtype(
[('T', np.float64), ('I', np.uint32), ('M', np.float64)]
)
cimport cython
cimport numpy as np
ctypedef packed struct result_struct:
np.float64_t T
np.uint32_t I
np.float64_t M
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef filter_function(
np.ndarray[(
np.uint32_t,
np.uint32_t,
np.uint32_t,
np.uint32_t,
np.float64_t,
np.float64_t,
np.float64_t,
np.float64_t
), ndim=2] df,
np.float64_t Z1,
np.float64_t Z2,
np.float64_t T1,
np.float64_t T2,
np.float64_t M1,
np.float64_t M2
):
# Types
cdef np.uint32_t[:, ::1] I = df[3]
cdef np.float64_t[:, ::1] Z = df[4]
cdef np.float64_t[:, ::1] M = df[5]
cdef np.float64_t[:, ::1] T = df[7]
cdef Py_ssize_t i, n
# Output array
n = df.shape[0]
cdef np.ndarray[result_struct, ndim=2] result = np.zeros((n, 3), dtype=DTYPE)
cdef np.float64_t[:, ::1] result_T = result['T']
cdef np.uint32_t[:, ::1] result_I = result['I']
cdef np.float64_t[:, ::1] result_M = result['M']
# Filter
for i in range(n):
if (Z[i] > Z1) and (Z[i] < Z2) and (T[i] > T1) and (T[i] < T2) and (M[i] > M1) and (M[i] < M2):
result_T[i] = T[i]
result_I[i] = I[i].astype(np.uint32)
result_M[i] = M[i]
# Remove rows with 0
result = result[(result != 0).all(axis=1)]
return result
我感谢您提供的任何帮助。
如果有更好的方法来实现对值/一维值数组进行过滤的任务,欢迎提出建议。
另外,我想检查我对内存视图和连续数据的使用是否正确。我是否需要使用列
df
创建 'I', 'Z', 'M', 'T'
的子集,以便 'T'
是连续的,或者如果它是 df
的内存视图,那没关系(在作为参数提供之前使用 df.to_numpy()
)到filter_function()
)。
我无法使用结构化数组方法进行编译。然而,我并没有花很多时间在这上面,因为我的理解是,从结构化数组转换为 DataFrame 需要复制数组,因为 Pandas 不支持 NumPy 的结构化数组。相反,我将每一列视为一个单独的数组。
此代码对 DataFrame 进行了一些操作,因此它确实与 Python 进行了一些交互,但我将其保留在循环之外。它最终比朴素的 Pandas 方法快了大约 3 倍。
我可以通过以下更改使其正常工作:
模块.pyx:
import numpy as np
import pandas as pd
cimport numpy as cnp
cimport cython
cnp.import_array()
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef filter_df_cy_no_pd(df, double Z1, double Z2, double T1, double T2, double M1, double M2):
# Get NumPy array for each column
cdef cnp.float64_t[::1] Z = df['Z'].values
cdef cnp.float64_t[::1] T = df['T'].values
cdef cnp.float64_t[::1] M = df['M'].values
cdef cnp.int32_t[::1] I = df['I'].values
cdef cnp.float64_t[::1] T_out = np.empty_like(T)
cdef cnp.int32_t[::1] I_out = np.empty_like(I)
cdef cnp.float64_t[::1] M_out = np.empty_like(M)
cdef Py_ssize_t n = Z.shape[0]
cdef cython.bint in_range = 0
cdef Py_ssize_t out_idx = 0
for i in range(n):
in_range = (
(Z1 < Z[i]) & (Z[i] < Z2) &
(T1 < T[i]) & (T[i] < T2) &
(M1 < M[i]) & (M[i] < M2)
)
if in_range:
T_out[out_idx] = T[i]
I_out[out_idx] = I[i]
M_out[out_idx] = M[i]
out_idx += 1
T_out_arr = np.asarray(T_out)[:out_idx]
I_out_arr = np.asarray(I_out)[:out_idx]
M_out_arr = np.asarray(M_out)[:out_idx]
return pd.DataFrame({
'T': T_out_arr,
'I': I_out_arr,
'M': M_out_arr,
}, copy=False)
设置.py
#!/usr/bin/env python
import numpy
from Cython.Build import build_ext, cythonize
from setuptools import Extension, setup
extensions = [
Extension(
"module",
["module.pyx"],
define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")],
extra_compile_args=["-O3"],
)
]
setup(
ext_modules=cythonize(extensions, language_level=3),
include_dirs=[numpy.get_include()],
cmdclass={"build_ext": build_ext},
)
测试代码示例:
import module
import numpy as np
import pandas as pd
import time
N = 1000000*100
df = pd.DataFrame({
"Z": (np.random.rand(N) * 10).astype('float64'),
"T": (np.random.rand(N) * 10).astype('float64'),
"M": (np.random.rand(N) * 10).astype('float64'),
"I": (np.random.rand(N) * 10).astype('int32'),
})
t0 = time.time()
df_filt = module.filter_df_cy_no_pd(df, 1, 9, 1, 9, 1, 9)
t = time.time()
print(f"Duration: {t - t0:.3f}")
print(df_filt)