使用了Concurrent.futures,但只有一个核心处于活动状态

问题描述 投票:0回答:2

我想了解并发。所以我想创建一个小代码示例。 从 1 到 $10^7$ 的数字应相加。我想用4核。所以我创建了4个列表,即 [1,5,9,13...]、[2,6,10,14...]、[3,7,11,...]、[4,8,12,...]。

但是当我运行脚本并检查系统监视器(Linux Mint 21)时,我发现只使用了一个核心。不是应该是四个吗?

这是我的代码:

#!/usr/bin/python3

from multiprocessing import Pool
import time
import concurrent.futures

def computeSum(list):
    sumValue = 0
    for i in list:
        sumValue += i
    return sumValue

def computeModuloList(completeList, modulo, moduloOffset):
    # empty list
    moduloList = []
    for i in completeList:
        if i % modulo - moduloOffset == 0:
            moduloList.append(i) # append i to modulo list
    return moduloList



listToSum = range(1, 10**7+1)

partialList1 = computeModuloList(listToSum, 4, 0)
partialList2 = computeModuloList(listToSum, 4, 1)
partialList3 = computeModuloList(listToSum, 4, 2)
partialList4 = computeModuloList(listToSum, 4, 3)

partialLists = [partialList1, partialList2, partialList3, partialList4]

resultList = []

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future = {executor.submit(computeSum, pList): pList for pList in partialLists}
    for completedFuture in concurrent.futures.as_completed(future):
        resultList.append(completedFuture.result())
            

allSum = 0
for result in resultList:
    allSum += result

print(str(allSum))

我检查了系统监视器,只使用了一个核心

python multiprocessing concurrent.futures
2个回答
0
投票

当您向 ProcessPoolExecutor 或 ThreadPoolExecutor 提交任务时,它们将同步执行。但是,如果您使用map函数,它们将并行执行。

此练习受 CPU 限制,因此不太适合多线程。

问题中的 createModuloList 函数按顺序执行 4 次,然后每个任务 (computeSum) 都通过 PoolExecutor 执行,因此只会使用一个额外的核心。

为了给你的CPU“施加压力”并达到相同的数学结果,你可以这样做:

from concurrent.futures import ProcessPoolExecutor

RANGE = 1, 10**7+1
MODULO = 4
PROCS = 4

def computeSum(moduloOffset):
    moduloList = []
    for i in range(*RANGE):
        if i % MODULO - moduloOffset == 0:
            moduloList.append(i)
    return sum(moduloList)

def main():
    with ProcessPoolExecutor() as executor:
        print(sum(executor.map(computeSum, range(PROCS))))

if __name__ == "__main__":
    main()

输出:

50000005000000

0
投票

我会沿着

multiprocessing
路线走下去,但请注意 Codist 的答案,它使用
concurrent.futures

这就是多重处理的样子:

from multiprocessing import Pool
import numpy as np


def main():
    n = 10**7
    arr = np.arange(1, n+1)

    # Split into 4 arrays 
    arr1, arr2, arr3, arr4 = np.array_split(arr, 4)

    with Pool(4) as pool:
        res1 = pool.apply_async(sum_array, [arr1])
        res2 = pool.apply_async(sum_array, [arr2])
        res3 = pool.apply_async(sum_array, [arr3])
        res4 = pool.apply_async(sum_array, [arr4])

        sums = [r.get() for r in [res1, res2, res3, res4]]
    
    total = sum(sums)
    print(total)


def sum_array(arr):
    return np.sum(arr, dtype=np.uint64)


if __name__ == '__main__':
    main()

代码返回:

50000005000000.0

所以现在你有两种方法...

© www.soinside.com 2019 - 2024. All rights reserved.