concurrent.futures.ThreadPoolExecutor -- 检查状态

问题描述 投票:0回答:1

我有一个与此几乎相同的问题:

检查`concurrent.futures.ThreadPoolExecutor`

然而,有一个转折。我没有使用 executor.submit,而是使用 map 方法(因为我的进程数量可变,并且可能有很多)。这是相关代码:

    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
      list_of_chain_responses = list(executor.map(chain, question_list))
      print('pending:', executor._work_queue.qsize(), 'jobs')
      print('threads:', len(executor._threads))
      print()

我尝试使用针对上述问题所显示的技术。但它显示作业为“0”并且只打印一次。我假设(也许是错误的?)这是因为它仅在整个 Question_list 异步输入到链函数调用后才打印作业?

基本上我只是想了解一下这在多大程度上(如果有的话)加速了这个过程。这就是目标。我尝试过使用线程和不使用线程来进行检查,但是我的网络还发生了其他事情,这使得衡量优势的方法相当不精确。我还尝试将 print("start") 和 print("end") 放在链函数本身中,看看这是否能给我任何见解,但执行器似乎只“调用”链函数一次(因为我只看到开始和结束一次),但仍然使用问题列表多次处理它。

python-3.x concurrency concurrent.futures
1个回答
0
投票

executor.map
返回一个迭代器。调用
list(executor(map(...)))
立即请求所有被调用的异步函数的结果。

相反,迭代迭代器并在每次迭代后打印状态:

 import concurrent.futures
import time

def chain(n):
    time.sleep(.1)
    return n * 2

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
      iterator = executor.map(chain, range(30))
      for result in iterator:
          print(f'{result:2d} pending:{executor._work_queue.qsize():2d} threads:{len(executor._threads)}')

输出:

 0 pending:21 threads:8
 2 pending:14 threads:8
 4 pending:14 threads:8
 6 pending:14 threads:8
 8 pending:14 threads:8
10 pending:14 threads:8
12 pending:14 threads:8
14 pending:14 threads:8
16 pending:13 threads:8
18 pending: 6 threads:8
20 pending: 6 threads:8
22 pending: 6 threads:8
24 pending: 6 threads:8
26 pending: 6 threads:8
28 pending: 6 threads:8
30 pending: 6 threads:8
32 pending: 5 threads:8
34 pending: 0 threads:8
36 pending: 0 threads:8
38 pending: 0 threads:8
40 pending: 0 threads:8
42 pending: 0 threads:8
44 pending: 0 threads:8
46 pending: 0 threads:8
48 pending: 0 threads:8
50 pending: 0 threads:8
52 pending: 0 threads:8
54 pending: 0 threads:8
56 pending: 0 threads:8
58 pending: 0 threads:8
© www.soinside.com 2019 - 2024. All rights reserved.