ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?

问题描述 投票:0回答:4

我只是被我写的一些代码弄糊涂了。我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个生成

f
返回的任何类型的列表,第二个生成一个
concurrent.futures.Future
对象的列表,然后需要使用其
result()
方法对其进行评估以获得
f
返回的值。

我主要担心的是,这意味着

executor.map
不能利用
concurrent.futures.as_completed
,这似乎是一种非常方便的方法来评估我正在对数据库进行的一些长时间运行的调用的结果,因为它们变得可用的。

我一点都不清楚

concurrent.futures.ThreadPoolExecutor
对象是如何工作的——天真地,我更喜欢(有点冗长):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

更简洁

executor.map
以利用可能的性能提升。我这样做错了吗?

python multithreading python-3.x python-multithreading concurrent.futures
4个回答
58
投票

问题是您将

ThreadPoolExecutor.map
的结果转换为列表。如果您不这样做而是直接迭代生成的生成器,则结果仍会按原始顺序生成,但循环会在所有结果准备好之前继续。你可以用这个例子来测试:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

保留顺序的原因可能是因为有时重要的是您获得结果的顺序与您给它们映射的顺序相同。结果可能不会包含在未来的对象中,因为在某些情况下,如果您需要它们,可能需要很长时间才能在列表上进行另一次映射以获取所有结果。毕竟在大多数情况下,下一个值很可能在循环处理第一个值之前就已经准备好了。这在这个例子中得到了证明:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

在这个例子中,

do_some_stuff
可能比
crunch_number
花费更长的时间,如果确实如此,那么在您仍然保持 map 的简单使用的同时,性能确实不会有太大损失。

此外,由于工作线程(/进程)在列表的开头开始处理并一直工作到您提交的列表的结尾,因此结果应该按照迭代器已经产生的顺序完成。这意味着在大多数情况下

executor.map
就很好,但在某些情况下,例如,如果处理值的顺序无关紧要,并且传递给
map
的函数运行时间非常不同,则
 future.as_completed
可能会更快。


28
投票

如果你使用

concurrent.futures.as_completed
,你可以处理每个函数的异常。

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    # -> using `executor.submit()` **requires** calling
    #      `concurrent.futures.as_completed()` <-
    #
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

executor.map
中,如果有异常,整个executor会停止。您需要在工作函数中处理异常。

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # -> Do not call `concurrent.futures.as_completed()`
    #    when using `executor.map()` <-
    #
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop

25
投票

下面是

.submit()
vs
.map()
的例子。他们都立即接受了工作(提交|映射 - 开始)。他们花费相同的时间完成,11 秒(最后结果时间 - 开始)。但是,
.submit()
会在
ThreadPoolExecutor
maxThreads=2
中的任何线程完成后立即给出结果(无序!)。虽然
.map()
按照提交的顺序给出结果。

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

输出:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

19
投票

除了此处答案中的解释外,直接找到来源可能会有所帮助。它重申了此处另一个答案的声明:


.map()
定义在基类中,
concurrent.futures._base.Executor

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

正如您所提到的,还有

.submit()
,它留给子类定义,即
ProcessPoolExecutor
ThreadPoolExecutor
,并返回一个
_base.Future
实例,您需要调用
.result()
到实际上做任何事情。

.map()
的重要台词归结为:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse()
.pop()
是一种让第一次提交的结果(从
iterables
)首先产生,第二次提交的结果第二次产生,依此类推。结果迭代器的元素不是
Future
;它们本身就是实际结果。

© www.soinside.com 2019 - 2024. All rights reserved.