我有一些代码使用MCMC对后验分布进行采样,特别是Metropolis Hastings。我使用scipy生成随机样本:
import numpy as np
from scipy import stats
def get_samples(n):
"""
Generate and return a randomly sampled posterior.
For simplicity, Prior is fixed as Beta(a=2,b=5), Likelihood is fixed as Normal(0,2)
:type n: int
:param n: number of iterations
:rtype: numpy.ndarray
"""
x_t = stats.uniform(0,1).rvs() # initial value
posterior = np.zeros((n,))
for t in range(n):
x_prime = stats.norm(loc=x_t).rvs() # candidate
p1 = stats.beta(a=2,b=5).pdf(x_prime)*stats.norm(loc=0,scale=2).pdf(x_prime) # prior * likelihood
p2 = stats.beta(a=2,b=5).pdf(x_t)*stats.norm(loc=0,scale=2).pdf(x_t) # prior * likelihood
alpha = p1/p2 # ratio
u = stats.uniform(0,1).rvs() # random uniform
if u <= alpha:
x_t = x_prime # accept
posterior[t] = x_t
elif u > alpha:
x_t = x_t # reject
posterior = posterior[np.where(posterior > 0)] # get rid of initial zeros that don't contribute to distribution
return posterior
一般来说,我尽量避免在python中使用显式的for循环 - 我会尝试使用纯numpy生成所有内容。然而,对于该算法的情况,具有if语句的for循环是不可避免的。因此,代码很慢。当我描述我的代码时,它花费大部分时间在for循环中(显然),更具体地说,最慢的部分是生成随机数; stats.beta().pdf()
和stats.norm().pdf()
。
有时我使用numba来加速我的矩阵运算代码。虽然numba与一些numpy操作兼容,但生成随机数不是其中之一。 Numba有一个cuda rng,但这仅限于正常和均匀的分布。
我的问题是,有没有办法使用与numba兼容的各种分布的某种随机抽样来显着加快上面的代码?
我们不必将自己局限于numba,但它是我所知道的唯一易于使用的优化器。更一般地说,我正在寻找方法来加速python中for循环中各种分布(beta,gamma,poisson)的随机抽样。
在开始考虑numba等之前,您可以对此代码进行大量优化。人。 (我只是通过智能算法的实现,设法在这段代码上加速了25倍)
首先,您实施Metropolis - Hastings算法时出错。无论您的链是否移动,您都需要保留方案的每次迭代。也就是说,您需要从代码中删除posterior = posterior[np.where(posterior > 0)]
,并在每个循环结束时使用posterior[t] = x_t
。
其次,这个例子看起来很奇怪。通常,对于这些类型的推理问题,我们希望在给出一些观察结果的情况下推断出分布的参数。但是,在这里,分布的参数是已知的,而你是在采样观察?无论如何,无论如何,无论如何,我很乐意与你的例子一起滚动并向你展示如何加快速度。
首先,从主要的t
循环中删除任何不依赖于for
值的内容。首先从for循环中删除随机游走创新的生成:
x_t = stats.uniform(0,1).rvs()
innov = stats.norm(loc=0).rvs(size=n)
for t in range(n):
x_prime = x_t + innov[t]
当然也可以从for循环中移动u
的随机生成:
x_t = stats.uniform(0,1).rvs()
innov = stats.norm(loc=0).rvs(size=n)
u = np.random.uniform(size=n)
for t in range(n):
x_prime = x_t + innov[t]
...
if u[t] <= alpha:
另一个问题是你在每个循环中计算当前的后p2
,这是不必要的。在每个循环中,您需要计算建议的后验p1
,并且当提议被接受时,您可以将p2
更新为等于p1
:
x_t = stats.uniform(0,1).rvs()
innov = stats.norm(loc=0).rvs(size=n)
u = np.random.uniform(size=n)
p2 = stats.beta(a=2,b=5).pdf(x_t)*stats.norm(loc=0,scale=2).pdf(x_t)
for t in range(n):
x_prime = x_t + innov[t]
p1 = stats.beta(a=2,b=5).pdf(x_prime)*stats.norm(loc=0,scale=2).pdf(x_prime)
...
if u[t] <= alpha:
x_t = x_prime # accept
p2 = p1
posterior[t] = x_t
一个非常小的改进可能是将scipy stats函数直接导入名称空间:
from scipy.stats import norm, beta
另一个非常小的改进是注意到代码中的elif
语句没有做任何事情,因此可以删除。
完全放下这个并使用更明智的变量名称我想出了:
from scipy.stats import norm, beta
import numpy as np
def my_get_samples(n, sigma=1):
x_cur = np.random.uniform()
innov = norm.rvs(size=n, scale=sigma)
u = np.random.uniform(size=n)
post_cur = beta.pdf(x_cur, a=2, b=5) * norm.pdf(x_cur, loc=0, scale=2)
posterior = np.zeros(n)
for t in range(n):
x_prop = x_cur + innov[t]
post_prop = beta.pdf(x_prop, a=2, b=5) * norm.pdf(x_prop, loc=0, scale=2)
alpha = post_prop / post_cur
if u[t] <= alpha:
x_cur = x_prop
post_cur = post_prop
posterior[t] = x_cur
return posterior
现在,为了速度比较:
%timeit get_samples(1000)
3.19 s ± 5.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit my_get_samples(1000)
127 ms ± 484 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
这是一个25倍的加速
值得注意的是,当谈到MCMC算法时,粗暴的速度并不是一切。真的,你感兴趣的是你可以从后面每秒进行的独立(ish)绘制的数量。通常,这是用ESS (effective sample size)评估的。通过调整随机游走,您可以提高算法的效率(从而增加每秒绘制的有效样本)。
为此,通常进行初始试运行,即samples = my_get_samples(1000)
。从这个输出计算sigma = 2.38**2 * np.var(samples)
。然后,此值应用于将方案中的随机游走调整为innov = norm.rvs(size=n, scale=sigma)
。似乎任意出现的2.38 ^ 2有它的起源于:
随机游走Metropolis算法的弱收敛和最优缩放(1997)。 A. Gelman,W。R. Gilks和G. O. Roberts。
为了说明改进,调整可以让我们运行两次算法,一次调整,另一次调整,都使用10000次迭代:
x = my_get_samples(10000)
y = my_get_samples(10000, sigma=0.12)
fig, ax = plt.subplots(1, 2)
ax[0].hist(x, density=True, bins=25, label='Untuned algorithm', color='C0')
ax[1].hist(y, density=True, bins=25, label='Tuned algorithm', color='C1')
ax[0].set_ylabel('density')
ax[0].set_xlabel('x'), ax[1].set_xlabel('x')
fig.legend()
您可以立即看到调整对我们的算法效率的改进。请记住,两次运行都是针对相同的迭代次数进行的。
如果你的算法需要很长时间才能收敛,或者你的样本有大量的自相关,我会考虑调查Cython
以进一步缩短速度优化。
我还建议查看PyStan
项目。它需要一些习惯,但它的NUTS HMC算法可能会胜过你可以手工编写的任何Metropolis - Hastings算法。
不幸的是,除了在numba兼容的python代码中重写它们之外,我真的没有看到任何加速随机发行版的可能性。
但是加速代码瓶颈的一个简单方法是用两个调用替换对stats函数的两个调用:
p1, p2 = (
stats.beta(a=2, b=5).pdf([x_prime, x_t])
* stats.norm(loc=0, scale=2).pdf([x_prime, x_t]))
另一个微小的调整可能是在for循环之外将u
的生成外包:
x_t = stats.uniform(0, 1).rvs() # initial value
posterior = np.zeros((n,))
u = stats.uniform(0, 1).rvs(size=n) # random uniform
for t in range(n): # and so on
然后在循环中索引u
(当然必须删除循环中的行u = stats.uniform(0,1).rvs() # random uniform
):
if u[t] <= alpha:
x_t = x_prime # accept
posterior[t] = x_t
elif u[t] > alpha:
x_t = x_t # reject
微小的变化也可能是通过省略elif
语句简化if条件,或者如果需要将其替换为else
则用于其他目的。但这实际上只是一个微小的改进:
if u[t] <= alpha:
x_t = x_prime # accept
posterior[t] = x_t
基于jwalton答案的另一项改进:
def new_get_samples(n):
"""
Generate and return a randomly sampled posterior.
For simplicity, Prior is fixed as Beta(a=2,b=5), Likelihood is fixed as Normal(0,2)
:type n: int
:param n: number of iterations
:rtype: numpy.ndarray
"""
x_cur = np.random.uniform()
innov = norm.rvs(size=n)
x_prop = x_cur + innov
u = np.random.uniform(size=n)
post_cur = beta.pdf(x_cur, a=2,b=5) * norm.pdf(x_cur, loc=0,scale=2)
post_prop = beta.pdf(x_prop, a=2,b=5) * norm.pdf(x_prop, loc=0,scale=2)
posterior = np.zeros((n,))
for t in range(n):
alpha = post_prop[t] / post_cur
if u[t] <= alpha:
x_cur = x_prop[t]
post_cur = post_prop[t]
posterior[t] = x_cur
return posterior
随着时间的改善:
%timeit my_get_samples(1000)
# 187 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit my_get_samples2(1000)
# 1.55 ms ± 57.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
这比jwalton的回答加速了121倍。它是通过外包post_prop
计算完成的。
检查直方图,这似乎没问题。但最好问问jwalton,如果确实没问题,他似乎对这个话题有了更多的了解。 :)