我在一个函数中使用
quad_vec
,该函数包含无法用分析方法求解的积分。我需要将这个方程拟合到数据点。
但是,即使使用固定值进行评估也需要几秒钟,总拟合时间也会增长到 7 分钟。
有没有办法加速计算?我知道
quad_vec
有 workers
关键字,但是当我尝试使用它时,计算根本没有完成。
我目前正在 Jupyter 笔记本中工作,如果这有任何意义的话。
这是函数定义,我已经尝试在这里使用 numba 但收效甚微。
from scipy.special import j0, j1
from scipy.integrate import quad_vec
import numpy as np
import numba as nb
h = 0.00005
G = 1000
E = 3*G
R= 0.0002
upsilon = 0.06
gamma = 0.072
@nb.jit(nb.float64(nb.float64, nb.float64, nb.float64, nb.float64, nb.float64),cache=True)
def QSzz_1(s,h,z, upsilon, E):
# exponential form of the QSzz^-1 function to prevent float overflow,
# also assumes nu==0.5
numerator = (1+2*s**2*h**2)*np.exp(-2*s*z) + 0.5*(1+np.exp(-4*s*z))
denominator = 0.5*(1-np.exp(-4*s*z)) - 2*s*h*np.exp(-2*s*z)
return (3/(2*s*E)) / (numerator/denominator + (3*upsilon/(2*E))*s)
def integrand(s, r, R, upsilon, E, h):
return s*(R*j0(s*R) - 2*j1(s*R)/s) * QSzz_1(s,h,h, upsilon, E) * j0(s*r)
def style_exact(r, gamma, R, upsilon, E, h):
int_out = quad_vec(integrand, 0, np.inf, args=(r,R,upsilon,E,h))
return gamma * int_out[0]
# calculate fixed ~10s
x_ax = np.linspace(0, 0.0004,101, endpoint=True, dtype=np.float64)
zeta = style_exact(x_ax, gamma, 0.0002, upsilon, E, h)
# fit to dataset (wetting ridge) ~7 min
popt_hemi, pcov_hemi = curve_fit(lambda x, upsilon, E, h: style_exact(x, gamma, R, upsilon, E, h) ,points_x, points_y, p0=(upsilon, E, h), bounds=([0,0,0],[np.inf,np.inf,np.inf]))
注释中肯定有一些代码优化可以加快速度,所以我将通过关注集成来补充这一点。如果您可以使用 GPU,您可以获得两到三个数量级的加速。
在不牺牲太多精度的情况下提高速度可以做的最简单的事情之一是提供积分的限制来捕获积分的重要部分。我绘制了被积函数来选择
b = 1e-7
,但您可能可以开发启发式方法并编写一些内容来自动选择合理的值。
args= (x_ax, R, upsilon, E, h)
# infinite upper limit of integration
%timeit -r 1 -n 1 quad_vec(integrand, 0, np.inf, args=args)
# 9.41 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
res = quad_vec(integrand, 0, np.inf, args=args)
# large, finite upper limit of integration
%timeit quad_vec(integrand, 0, 1e7, args=args)
# 595 ms ± 169 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
res2 = quad_vec(integrand, 0, 1e7 args=args)
# Check error between the two
# (note that `quad_vec` may not be super accurate, anyway;
# consider checking against `mpmath.quad`)
from scipy import stats
res = quad_vec(integrand, 0, 1e7, args=args)
res0 = quad_vec(integrand, 0, np.inf, args=args)
stats.gmean(abs((res[0] - res0[0])/res0[0])) # gmean of relative error
# 3.643331971383965e-05
通过在 CuPy 中推出自己的集成方案(或者找到一个使用 GPU 的集成库),您可以毫不费力地获得更大的收益。
例如,SciPy 中有一个tanh-sinh求积规则的实现,它相对简单地适应使用 GPU 的东西,特别是如果您愿意使用固定步长的话。 tanh-sinh 可能不是这个振荡被积函数的最佳规则;我只是因为熟悉才使用它。
import cupy as np # still np to avoid changing the existing code
from cupyx.scipy.special import j0, j1
# copy-pasted from
# https://github.com/scipy/scipy/blob/v1.12.0/scipy/integrate/_tanhsinh.py
# then removed comments to reduce length; other optimizations possible
# There are plans to make this Array-API compatible and provide a public
# interface
_N_BASE_STEPS = 8
def _get_base_step(dtype=np.float64):
fmin = 4*np.finfo(dtype).tiny
tmax = np.arcsinh(np.log(2/fmin - 1) / np.pi)
h0 = tmax / _N_BASE_STEPS
return h0.astype(dtype)
def _compute_pair(k, h0):
h = h0 / 2**k
max = _N_BASE_STEPS * 2**k
j = np.arange(max+1) if k == 0 else np.arange(1, max+1)
jh = j * h
pi_2 = np.pi / 2
u1 = pi_2*np.cosh(jh)
u2 = pi_2*np.sinh(jh)
wj = u1 / np.cosh(u2)**2
xjc = 1 / (np.exp(u2) * np.cosh(u2))
wj[0] = wj[0] / 2 if k == 0 else wj[0]
return xjc, wj
def _transform_to_limits(xjc, wj, a, b):
alpha = (b - a) / 2
xj = np.concatenate((-alpha * xjc + b, alpha * xjc + a), axis=-1)
wj = wj*alpha
wj = np.concatenate((wj, wj), axis=-1)
invalid = (xj <= a) | (xj >= b)
wj[invalid] = 0
return xj, wj
# simple fixed-step integration function
def integrate(func, a, b, args):
k = 10 # increase this to improve accuracy
step0 = _get_base_step()
step = step0 / 2**k
xjc, wj = _compute_pair(k, step0)
xj, wj = _transform_to_limits(xjc, wj, a, b)
fj = integrand(xj, *args)
return fj @ wj * step
# your code
def QSzz_1(s,h,z, upsilon, E):
# exponential form of the QSzz^-1 function to prevent float overflow,
# also assumes nu==0.5
numerator = (1+2*s**2*h**2)*np.exp(-2*s*z) + 0.5*(1+np.exp(-4*s*z))
denominator = 0.5*(1-np.exp(-4*s*z)) - 2*s*h*np.exp(-2*s*z)
return (3/(2*s*E)) / (numerator/denominator + (3*upsilon/(2*E))*s)
def integrand(s, r, R, upsilon, E, h):
return s*(R*j0(s*R) - 2*j1(s*R)/s) * QSzz_1(s, h, h, upsilon, E) * j0(s*r)
h = 0.00005
G = 1000
E = 3*G
R= 0.0002
upsilon = 0.06
gamma = 0.072
x_ax = np.linspace(0, 0.0004, 101, endpoint=True, dtype=np.float64)
r, gamma, R, upsilon, E, h = x_ax, gamma, 0.0002, upsilon, E, h
a = 0
b = 1e7
args = (r[:, np.newaxis], R, upsilon, E, h)
%timeit integrate(integrand, a, b, args)
# 2.14 ms ± 82.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
res = integrate(integrand, a, b, args)
stats.gmean(abs((res - res0[0])/res0[0]))
# 0.00011028421945971745
您可以通过增加
k
来交换时间与准确性。如果您不想使积分上限有限,您还可以通过变量替换将不合适的积分转换为积分有限的积分(但您需要增加 k
来补偿)。