如何加快嵌套循环的速度?

问题描述 投票:10回答:3

我在python中执行一个嵌套循环,包含在下面。 这是在现有金融时间序列中搜索的基本方法,并在时间序列中寻找符合特定特征的时期。

在这种情况下,有两个独立的、大小相等的数组,分别代表 "收盘价"(即资产的价格)和 "成交量"(即资产在该期间的交易量)。对于每一个时间段,我想展望未来所有长度在1到1之间的区间。INTERVAL_LENGTH 并查看这些区间中是否有符合我搜索的特征(在这种情况下,接近值的比率大于1.0001且小于1.5,且总和体积大于100)。

我的理解是,使用NumPy加速的主要原因之一是,只要你是在数组上进行整体操作,解释器就不需要在每次评估某些东西时对操作数进行类型检查(例如 numpy_array * 2),但显然下面的代码没有利用这一点。

有没有办法用某种窗口函数来代替内部循环,这样可以加快速度,或者用其他方法使用 numpyscipy 来大大加快本地python的速度?

另外,一般情况下,有没有更好的方法(比如,用C++写这个循环,用weave会不会快很多)?

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
for i in xrange(len(close) - INTERVAL_LENGTH):
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        ret = close[j] / close[i]
        vol = sum( volume[i+1:j+1] )
        if ret > 1.0001 and ret < 1.5 and vol > 100:
            results.append( [i, j, ret, vol] )
print results
python numpy scipy finance
3个回答
7
投票

更新:(几乎)完全矢量化的版本在下面的 "new_function2 "中...。

我将添加注释来解释一下。

它的速度提高了约50倍,如果你不介意输出的是numpy数组而不是列表,那么更大的速度也是可能的。 就像现在这样。

In [86]: %timeit new_function2(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 1.15 s per loop

你可以用调用np.cumsum()来替换你的内部循环... ... 参见下面我的 "new_function "函数。这将会带来相当大的速度提升...

In [61]: %timeit new_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 15.7 s per loop

vs

In [62]: %timeit old_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 53.1 s per loop

应该可以将整个过程矢量化,完全避免for循环,不过... ... 给我一分钟,我看看我能做些什么... ...

import numpy as np

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.arange(ARRAY_LENGTH, dtype=np.float)
volume = np.arange(ARRAY_LENGTH, dtype=np.float)

def old_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(len(close) - INTERVAL_LENGTH):
        for j in xrange(i+1, i+INTERVAL_LENGTH):
            ret = close[j] / close[i]
            vol = sum( volume[i+1:j+1] )
            if (ret > 1.0001) and (ret < 1.5) and (vol > 100):
                results.append( (i, j, ret, vol) )
    return results


def new_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(close.size - INTERVAL_LENGTH):
        vol = volume[i+1:i+INTERVAL_LENGTH].cumsum()
        ret = close[i+1:i+INTERVAL_LENGTH] / close[i]

        filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)
        j = np.arange(i+1, i+INTERVAL_LENGTH)[filter]

        tmp_results = zip(j.size * [i], j, ret[filter], vol[filter])
        results.extend(tmp_results)
    return results

def new_function2(close, volume, INTERVAL_LENGTH):
    vol, ret = [], []
    I, J = [], []
    for k in xrange(1, INTERVAL_LENGTH):
        start = k
        end = volume.size - INTERVAL_LENGTH + k
        vol.append(volume[start:end])
        ret.append(close[start:end])
        J.append(np.arange(start, end))
        I.append(np.arange(volume.size - INTERVAL_LENGTH))

    vol = np.vstack(vol)
    ret = np.vstack(ret)
    J = np.vstack(J)
    I = np.vstack(I)

    vol = vol.cumsum(axis=0)
    ret = ret / close[:-INTERVAL_LENGTH]

    filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)

    vol = vol[filter]
    ret = ret[filter]
    I = I[filter]
    J = J[filter]

    output = zip(I.flat,J.flat,ret.flat,vol.flat)
    return output

results = old_function(close, volume, INTERVAL_LENGTH)
results2 = new_function(close, volume, INTERVAL_LENGTH)
results3 = new_function(close, volume, INTERVAL_LENGTH)

# Using sets to compare, as the output 
# is in a different order than the original function
print set(results) == set(results2)
print set(results) == set(results3)

3
投票

一个加速的方法是去掉 sum 部分,因为在本实施例中,它将长度为2的列表通过 INTERVAL_LENGTH. 相反,只需添加 volume[j+1] 到循环最后一次迭代的vol的前一个结果。因此,你只是每次添加两个整数,而不是每次对整个列表进行求和和切片。另外,我们不需要从做 sum(volume[i+1:j+1]),只是做 vol = volume[i+1] + volume[j+1],因为你知道这里的初始情况将永远只有两个指数。

另一个加速方法是使用 .extend 而不是 .append,因为python的实现有 extend 运行速度明显加快。

你也可以打破最后的 if 语句,以便只在需要时才进行某些计算。例如,你知道 if vol <= 100,你不需要计算 ret.

这并不能完全回答你的问题,但我认为特别是在和的问题上,你应该看到这些变化的显著加速。

编辑 - 你也不需要 len因为你已经知道了列表的具体长度(除非这只是为了例子)。将它定义为一个数字而不是 len(something) 总是更快。

编辑--实现(这个还没有测试)。

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
ex = results.extend
for i in xrange(ARRAY_LENGTH - INTERVAL_LENGTH):
    vol = volume[i+1]
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        vol += volume[j+1]
        if vol > 100:
            ret = close[j] / close[i]
            if 1.0001 < ret < 1.5:
                ex( [i, j, ret, vol] )
print results

1
投票

你为什么不试着把结果生成一个单一的列表(比追加或扩展快得多),比如:

results = [ t for t in ( (i, j, close[j]/close[i], sum(volume[i+1:j+1]))
                         for i in xrange(len(close)-INT_LEN)
                             for j in xrange(i+1, i+INT_LEN)
                       )
            if t[3] > 100 and 1.0001 < t[2] < 1.5
          ]
© www.soinside.com 2019 - 2024. All rights reserved.