在 for 循环期间增加 Python 的 CPU 使用率

问题描述 投票:0回答:1

我有一个Python代码,其中我用大约处理一个pandas DataFrame。 500 万行。我使用 numpy.where 根据其他列中的值的条件应用具有值的新列,并且还通过特定列进行 for 循环。

带有 for 循环的函数就是问题所在。无论执行哪个代码块,总体CPU使用率都不会超过27%。我的电脑有 15 个 CPU,而且代码执行期间使用的 CPU 数量从未超过 6 个。因此,代码执行速度相当慢。

我尝试使用 ProcessPoolExecutor 进行 for 循环,但这不起作用,我没有看到 CPU 使用率或代码性能有任何增加,并且使用的 CPU 数量也没有变化。 我怎样才能使用所有 15 个 CPU,最重要的是增加每个 CPU 的使用率,从而提高整体 CPU 使用率来加快代码的性能?

python performance cpu-usage
1个回答
0
投票

有意义地加速 Python 计算的唯一真正方法是使用 Cython 生成自定义 C++ 扩展。出于性能原因,您最喜欢的所有库(Numpy、SciPy、Pandas 等)都是用 C++ 编写的,并且具有可供您使用的 C++ API。

我不会详细介绍如何设置 Cython 以及如何编译扩展,因此我将使用 Jupyter 笔记本来让我的生活更轻松。

首先你应该加载 Cython 扩展:

load_ext cython

然后导入我们将使用的所有 Python 库:

import numpy as np
from os import cpu_count
from time import perf_counter_ns as counter
from threading import Thread
import matplotlib.pyplot as plt

现在您可以定义您的 Cython 扩展:

%%cython -a

# cython: language_level=3

cimport cython

@cython.wraparound(False)
@cython.boundscheck(False)
def multiply(int[::1] arr1, int[::1] arr2, int[::1] result, int offset=0, int num_elements=-1):
    cdef unsigned int i=0
    if num_elements < 0:
        num_elements = arr1.shape[0]
    for i in range(offset,offset+num_elements):
        result[i]=arr1[i]*arr2[i]

-a 标志生成一个报告,告诉您代码的哪些部分运行缓慢(黄色或深黄色)。与此类似(单击行号旁边的小 + 号会显示 Cython 生成的确切 C++ 代码片段): 正如您所看到的,这与编写 Python 代码非常相似,您只需要为所使用的变量添加类型信息。

int [::1]
表示一维 C 连续整数数组(您也可以在同一函数中创建输出数组,但高性能代码通常定义一次缓冲区并重复使用它们)。环绕和边界检查也是针对索引数组的 Python 特定检查,但由于我们知道我们将使用
range()
并且我们永远不会有无效索引,因此我们可以禁用它们(如果您以某种方式设法传递无效索引,您的内核将崩溃,小心这一点)。

现在我们可以定义一些样板代码来测量性能、在线程之间均匀分配输入等。

def run_benchmark(threads):
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

def timeit_multi_threaded(arr1, arr2, reference_result, num_threads=1, timeout=10, repeats=1000):
    if arr1.shape != arr2.shape:
        raise Exception(f"arr1.shape(={arr1.shape}) must match arr2.shape(={arr2.shape})")
    num_elements = arr1.shape[0]
    ceil = int(np.ceil(num_elements/num_threads))
    offsets = [0]
    counts = []
    # Split the number of elements to be processed as evenly as possible between the worker threads
    for i in range(num_threads-1):
        offsets.append((i+1)*ceil)
        counts.append(offsets[-1]-offsets[-2])
    counts.append(num_elements-offsets[-1])
    # Set up time measurment variables
    duration = timeout*1e9
    times = np.zeros(1000, dtype=np.uint64)
    total_dur = 0
    num_runs = 0
    while total_dur < duration and num_runs < repeats:
        result_buff = np.empty(num_elements,dtype=int)
        # Create the threads used to run the benchmark
        threads = [Thread(target=multiply, args=(arr1,arr2,result_buff, offsets[i], counts[i])) for i in range(num_threads)]
        # Measure execution time
        start = counter()
        run_benchmark(threads)
        end = counter()
        times[num_runs]=end-start
        i+=1
        total_dur+=end-start
        
    print(f"Did it match the expected results? {(result_buff == reference_result).all()}")
    return times[:num_runs]

def s_format(number):
    prefixes = ['n', 'µ', 'm', '']
    prefix_index = 0
    while number > 1000:
        number /= 1000
        prefix_index += 1
    return "%.2f %ss"%(number, prefixes[prefix_index])

def print_perf(times):
    print(f"{s_format(times.mean())} ± {s_format(times.std())} (mean ± std. dev. of {len(times)} loops)")

最后,您可以运行基准测试并将其与 Numpy 执行相同乘法所花费的时间进行比较:

max_value = 10000
num_elements = 100000000

arr1 = np.random.randint(max_value, size=num_elements)
arr2 = np.random.randint(max_value, size=num_elements)

numpy_result = arr1*arr2 # This will be used for validation

print("Time taken for Numpy:")
%timeit arr1*arr2
print("#"*100)
# Run benchmark with various number of cores
run_times = []
for num_threads in range(1,cpu_count()+1):
    print(f"Running with {num_threads} thread(s) ...")
    run_times.append(timeit_multi_threaded(arr1,arr2,numpy_result,num_threads))
    print_perf(run_times[-1])
    print("#"*100)

# Plot the results
xs = [i+1 for i in range(len(run_times))]
means = np.array([run_times[i].mean() for i in range(len(run_times))])
stds = np.array([run_times[i].std() for i in range(len(run_times))])

plt.figure(figsize=(16,9))
plt.plot(xs, (means+stds)/1e6, 'r')
plt.plot(xs, means/1e6, 'k')
plt.plot(xs, (means-stds)/1e6, 'r')
plt.xlabel("Number of cores")
plt.ylabel("Execution time (ms)")
plt.legend(["Upper/Lower Limits","Average"])
plt.ylim(bottom=0)
plt.show()

结果是:

CPU 使用率稳定在 20%...(不过我们仍然设法击败了 Numpy)

所以...出了什么问题? Python 使用全局解释器锁来锁定 Python 对象,防止其他线程访问,以防止不需要的修改,这些修改通常会导致在多线程应用程序中产生错误的答案,但它也限制了 CPU 的使用,因为线程只会等待互相完成。这意味着默认的多线程行为几乎没有用处,除非您正在执行大量 I/O 密集型任务,例如从磁盘读取/写入磁盘、发送 HTTP 请求等。幸运的是,Cython 允许您绕过 GIL(如只要您不尝试访问

with nogil:
中的 Python 对象/类)。这真的就像添加一行代码一样简单:

@cython.wraparound(False)
@cython.boundscheck(False)
def multiply(int[::1] arr1, int[::1] arr2, int[::1] result, int offset=0, int num_elements=-1):
    cdef unsigned int i=0
    if num_elements < 0:
        num_elements = arr1.shape[0]
    with nogil:
        for i in range(offset,offset+num_elements):
            result[i]=arr1[i]*arr2[i]

结果是: 好多了!现在我们真正使 CPU 饱和并使用所有可用的处理能力。但是,您需要小心,不要意外地对内存的相同区域进行操作,这是 GIL 正在防止发生的情况。

检查实际基准测试结果我们可以看到:

这对于简单的数字运算算法来说是相当典型的。由于遇到内存带宽瓶颈(CPU 完成速度太快,必须等待更多数据可用),因此缩放会在几个线程后停止。如果您可以将更多计算塞入同一次迭代中,您很可能会看到比我这里拥有的更好的扩展。

TL;DR:您不能只使用多处理或多线程并期望在 Python 中获得性能优势;你需要剥开几层并禁用很多安全防护装置才能到达那里。我知道一开始可能会很可怕,但随着时间的推移你会习惯的。由于这并不是深入的 Cython 教程,因此我强烈建议您查看此处的官方文档以了解更多信息: https://cython.readthedocs.io/en/latest/src/userguide/numpy_tutorial.html

© www.soinside.com 2019 - 2024. All rights reserved.