Python Numpy矢量化组合嵌套for循环

问题描述 投票:6回答:4

给定nxn数组A的实数正数,我试图找到2-d数组的三行的所有组合的元素最小值的最小值。使用for循环,结果如下:

import numpy as np

n = 100
np.random.seed(2)
A = np.random.rand(n,n)
global_best = np.inf

for i in range(n-2):
    for j in range(i+1, n-1):
        for k in range(j+1, n):
            # find the maximum of the element-wise minimum of the three vectors
            local_best = np.amax(np.array([A[i,:], A[j,:], A[k,:]]).min(0))
            # if local_best is lower than global_best, update global_best
            if (local_best < global_best):
                global_best = local_best
                save_rows = [i, j, k]

print global_best, save_rows

n = 100的情况下,输出应该是这样的:

Out[]: 0.492652949593 [6, 41, 58]

我有一种感觉,虽然我可以使用Numpy矢量化更快地做到这一点,并且肯定会感谢任何帮助。谢谢。

python numpy matrix vectorization combinatorics
4个回答
2
投票

不要试图矢量化不易于矢量化的循环。而是使用像Numba这样的jit编译器或使用Cython。如果生成的代码更具可读性,则矢量化解决方案很好,但就性能而言,编译解决方案通常更快或在最坏情况下与矢量化解决方案一样快(BLAS例程除外)。

单线程示例

import numba as nb
import numpy as np

#Min and max library calls may be costly for only 3 values
@nb.njit()
def max_min_3(A,B,C):
  max_of_min=-np.inf
  for i in range(A.shape[0]):
    loc_min=A[i]
    if (B[i]<loc_min):
      loc_min=B[i]
    if (C[i]<loc_min):
      loc_min=C[i]

    if (max_of_min<loc_min):
      max_of_min=loc_min

  return max_of_min

@nb.njit()
def your_func(A):
  n=A.shape[0]
  save_rows=np.zeros(3,dtype=np.uint64)
  global_best=np.inf
  for i in range(n):
      for j in range(i+1, n):
          for k in range(j+1, n):
              # find the maximum of the element-wise minimum of the three vectors
              local_best = max_min_3(A[i,:], A[j,:], A[k,:])
              # if local_best is lower than global_best, update global_best
              if (local_best < global_best):
                  global_best = local_best
                  save_rows[0] = i
                  save_rows[1] = j
                  save_rows[2] = k

  return global_best, save_rows

单线程版本的性能

n=100
your_version: 1.56s
compiled_version: 0.0168s (92x speedup)

n=150
your_version: 5.41s
compiled_version: 0.08122s (66x speedup)

n=500
your_version: 283s
compiled_version: 8.86s (31x speedup)

第一次调用具有大约0.3-1s的恒定开销。对于计算时间本身的性能测量,请调用一次然后测量性能。

通过一些代码更改,此任务也可以并行化。

多线程示例

@nb.njit(parallel=True)
def your_func(A):
  n=A.shape[0]
  all_global_best=np.inf
  rows=np.empty((3),dtype=np.uint64)

  save_rows=np.empty((n,3),dtype=np.uint64)
  global_best_Temp=np.empty((n),dtype=A.dtype)
  global_best_Temp[:]=np.inf

  for i in range(n):
      for j in nb.prange(i+1, n):
          row_1=0
          row_2=0
          row_3=0
          global_best=np.inf
          for k in range(j+1, n):
              # find the maximum of the element-wise minimum of the three vectors

              local_best = max_min_3(A[i,:], A[j,:], A[k,:])
              # if local_best is lower than global_best, update global_best
              if (local_best < global_best):
                  global_best = local_best
                  row_1 = i
                  row_2 = j
                  row_3 = k

          save_rows[j,0]=row_1
          save_rows[j,1]=row_2
          save_rows[j,2]=row_3
          global_best_Temp[j]=global_best

      ind=np.argmin(global_best_Temp)
      if (global_best_Temp[ind]<all_global_best):
          rows[0] = save_rows[ind,0]
          rows[1] = save_rows[ind,1]
          rows[2] = save_rows[ind,2]
          all_global_best=global_best_Temp[ind]

  return all_global_best, rows

多线程版本的性能

n=100
your_version: 1.56s
compiled_version: 0.0078s (200x speedup)

n=150
your_version: 5.41s
compiled_version: 0.0282s (191x speedup)

n=500
your_version: 283s
compiled_version: 2.95s (96x speedup)

编辑

在较新的Numba版本(通过Anaconda Python Distribution安装)中,我必须手动安装tbb才能实现并行化。


4
投票

对于n=100,此解决方案快5倍:

coms = np.fromiter(itertools.combinations(np.arange(n), 3), 'i,i,i').view(('i', 3))
best = A[coms].min(1).max(1)
at = best.argmin()
global_best = best[at]
save_rows = coms[at]

第一行有点卷曲,但将itertools.combinations的结果转换为NumPy数组,其中包含所有可能的[i,j,k]索引组合。

从那里,使用所有可能的索引组合索引到A,然后沿适当的轴减少是一个简单的问题。

这个解决方案消耗了更多的内存,因为它构建了所有可能组合A[coms]的具体数组。它为小型n节省了时间,比如250以下,但对于大型n,内存流量将非常高,并且可能比原始代码慢。


3
投票

通过块的工作允许结合矢量化微积分的速度,同时避免遇到存储器错误。下面是一个通过块将嵌套循环转换为矢量化的示例。

从与问题相同的变量开始,定义块长度,以便对块内的计算进行向量化,并仅在块上而不是在组合上进行循环。

chunk = 2000 # define chunk length, if to small, the code won't take advantage 
             # of vectorization, if it is too large, excessive memory usage will 
             # slow down execution, or Memory Error will be risen 
combinations = itertools.combinations(range(n),3) # generate iterator containing 
                                        # all possible combinations of 3 columns
N = n*(n-1)*(n-2)//6 # number of combinations (length of combinations cannot be 
                     # retrieved because it is an iterator)
# generate a list containing how many elements of combinations will be retrieved 
# per iteration
n_chunks, remainder = divmod(N,chunk)
counts_list = [chunk for _ in range(n_chunks)]
if remainder:
    counts_list.append(remainder)

# Iterate one chunk at a time, using vectorized code to treat the chunk
for counts in counts_list:
    # retrieve combinations in current chunk
    current_comb = np.fromiter(combinations,dtype='i,i,i',count=counts)\
                     .view(('i',3)) 
    # maximum of element-wise minimum in current chunk
    chunk_best = np.minimum(np.minimum(A[current_comb[:,0],:],A[current_comb[:,1],:]),
                            A[current_comb[:,2],:]).max(axis=1) 
    ravel_save_row = chunk_best.argmin() # minimum of maximums in current chunk
    # check if current chunk contains global minimum
    if chunk_best[ravel_save_row] < global_best: 
        global_best = chunk_best[ravel_save_row]
        save_rows = current_comb[ravel_save_row]
print(global_best,save_rows)

我对嵌套循环进行了一些性能比较,得到了以下结果(chunk_length = 1000):

  • N = 100 嵌套循环:1.13 s±16.6 ms 以块为单位工作:108 ms±565μs
  • N = 150 嵌套循环:4.16 s±39.3 ms 按块工作:523 ms±4.75 ms
  • N = 500 嵌套循环:3分18秒±3.21秒 大块工作:1分12秒±1.6秒

注意

在对代码进行分析之后,我发现np.mincalling np.maximum.reduce花费最长的时间。我将它直接转换为np.maximum,这有点改善了性能。


2
投票

您可以使用combinations中的itertools,它是一个python标准库,它将帮助您删除所有这些嵌套循环。

from itertools import combinations
import numpy as np

n = 100
np.random.seed(2)
A = np.random.rand(n,n)
global_best = 1000000000000000.0

for i, j, k in combinations(range(n), 3):
    local_best = np.amax(np.array([A[i,:], A[j,:], A[k,:]]).min(0))
    if local_best < global_best:
        global_best = local_best
        save_rows = [i, j, k]

print global_best, save_rows
© www.soinside.com 2019 - 2024. All rights reserved.