我写了一个 random_walk
模拟使用 numpy
来分配数据和生成器来执行模拟步骤。这个 random_walk
只是原代码中的一个MWE(与随机漫步完全没有关系,而是一个随机数学模型,太大,太复杂,不能作为例子。然而 random_walk
MWE模拟的是核心部件。
我使用生成器的原因与模拟有关。我将无限期地运行模拟,并且我只在一些角落的情况下转储数据,我可以测量角落情况发生的概率,因此我可以事先高度地分配numpy数组。我可以测量角案例发生的概率,因此我可以事先高精度地分配numpy数组(从不修改),但这是一个上限,因此我需要计算角案例发生的次数,然后对数据集进行分割(在模拟中是 "模拟")。
为了便于比较,我也写了一个类似的天真的方法,使用普通的 append
到列表中来存储模拟数据,我只在角案例发生时追加。
重要的是要知道,角案例会发生十亿次(会占用大量的内存),但最终的模拟将运行 "无限的时间",这是一个非常大的步骤数。拐角情况发生的概率是1e-10。
最后的代码有一个停止的条件,我在这里模拟了使用 distance
和经典的 simulation time
.
令我惊讶的是,我注意到 append
办法的性能优于 numpy+generators
. 正如我们在下面的输出中可以看到
对于小数据集。
%timeit random_walk_naive(max_distance=1e5, simul_time=1e4)
5.35 ms ± 190 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit random_walk_simul(max_distance=1e5, simul_time=1e4)
16.3 ms ± 567 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
对于大数据集:
%timeit random_walk_naive(max_distance=1e12, simul_time=1e7)
12.2 s ± 760 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit random_walk_simul(max_distance=1e12, simul_time=1e7)
36 s ± 102 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
跑步 cProfile
在调用上,我注意到生成器调用的执行时间与naive方法相似,所有额外的时间都花在了 random_walk_simul
本身。评价 np.empty
和分片操作我注意到,创建空数据集和分片返回的时间是最少的。完全没有贡献与操作的时间。除此之外,代码几乎是一样的,只是在 generator
的方法,我首先将数据分配给本地变量,然后 "刷新 "到 numpy.array
,这比直接刷新快,因为我将使用在 while
循环来评估停止条件。
我需要了解为什么会出现这种行为,如果是预期的,如果不是如何解决?
完整的源码粘贴在下面
import numpy as np
from random import random
def clip(value, lower, upper):
return lower if value < lower else upper if value > upper else value
def random_walk(s_0, a_0, pa, pb):
"""Initial position (often 0), acceleration, 0 < pa < pb < 1"""
# Time, x-position, Velocity, Acceleration
t, x, v, a = 0, s_0, 0, a_0
yield (t, x, v, a)
while True:
# Roll the dices
god_wishes = random()
if god_wishes <= pa:
# Increase acceleration
a += .005
elif god_wishes <= pb:
# Reduce acceleration
a -= .005
# Lets avoid too much acceleration
a = clip(a, -.2, .2)
# How much time has passed, since last update?
dt = random()
v += dt*a
x += dt*v
t += dt
yield (t, x, v, a)
def random_walk_simul(initial_position = 0, acceleration = 0,
prob_increase=0.005, prob_decrease=0.005,
max_distance=10000, simul_time=1000):
"""Runs a random walk simulation given parameters
Particle initial state (initial position and acceleration)
State change probability (prob_increase, prob_decrease)
Stop criteria (max_distance, simul_time)
Returns a random_walk particle data
"""
assert (0 < prob_increase+prob_decrease < 1), "Total probability should be in range [0, 1]"
rw = random_walk(initial_position, acceleration, prob_increase, prob_decrease+prob_increase)
# Over estimated given by law of large numbers expected value of a
# uniform distribution
estimated_N = int(simul_time * 2.2)
data = np.empty((estimated_N, 4))
# Runs the first iteraction
n = 0
(t, x, v, a) = rw.__next__()
data[n] = (t, x, v, a)
# While there is simulation time or not too far away
while (t < simul_time) and (np.abs(x) < max_distance):
n += 1
(t, x, v, a) = rw.__next__()
data[n] = (t, x, v, a)
return data[:n]
def random_walk_naive(initial_position = 0, acceleration = 0,
prob_increase=0.005, prob_decrease=0.005,
max_distance=10000, simul_time=1000):
"""Emulates same behavior as random_walk_simul, but use append instead numpy and generators"""
T = []
X = []
V = []
A = []
T.append(0)
X.append(initial_position)
V.append(0)
A.append(acceleration)
a = A[-1]
t = T[-1]
v = V[-1]
x = X[-1]
while (T[-1] < simul_time) and (abs(X[-1]) < max_distance):
god_wishes = random()
if god_wishes <= prob_increase:
# Increase acceleration
a += .005
elif god_wishes <= prob_increase+prob_decrease:
# Reduce acceleration
a -= .005
# Lets avoid too much acceleration
a = clip(a, -.2, .2)
dt = random()
t += dt
v += dt*a
x += dt*v
# Storing next simulation step
T.append(t)
X.append(x)
V.append(v)
A.append(a)
return np.array((T, X, V, A)).transpose()
def main():
random_walk_naive(max_distance=1e9, simul_time=1e6)
random_walk_simul(max_distance=1e9, simul_time=1e6)
if __name__ == '__main__':
main()
这可能是一个很好的情况下,使用 哑巴:
import numpy as np
from random import random
from numba import njit
# Baseline
%timeit random_walk_naive(max_distance=1e9, simul_time=1e6)
1.28 s ± 277 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# Few adjustments for numba
@njit
def random_walk_numba(initial_position = 0, acceleration = 0,
prob_increase=0.005, prob_decrease=0.005,
max_distance=10000, simul_time=1000):
T, X, V, A = [0], [initial_position], [0], [acceleration]
t, x, v, a = T[-1], X[-1], V[-1], A[-1]
while (T[-1] < simul_time) and (abs(X[-1]) < max_distance):
god_wishes = random()
if god_wishes <= prob_increase:
# Increase acceleration
a += .005
elif god_wishes <= prob_increase+prob_decrease:
# Reduce acceleration
a -= .005
# Lets avoid too much acceleration
lower, upper = -0.2, 0.2
a = lower if a < lower else upper if a > upper else a
dt = random()
t += dt
v += dt*a
x += dt*v
# Storing next simulation step
T.append(t)
X.append(x)
V.append(v)
A.append(a)
return np.array((T, X, V, A)).transpose()
%timeit random_walk_numba(max_distance=1e9, simul_time=1e6)
172 ms ± 32.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
请注意,你不能调用 clip
但幸运的是,这很容易在里面重新实现。