使用ThreadPoolExecutor时避免竞争状态

问题描述 投票:1回答:2

我具有以下采用以下参数的并发API_call_and_processing()方法:

  • api_call:是对外部网站的HTTP请求,该网站检索并XLM文件
  • lst:是api_call所需的整数(id)的列表
  • callback_processing:是仅解析每个XLM的本地方法要求

我使用api_call()进行了大约500个HTTP请求,每个请求中的每个ID一个然后,每个响应(如果使用解析XLM并返回元组的本地方法callback_processing()处理)

def concurrent_api_call_and_processing(api_call=None, callback_processing=None, lst=None, workers=5):
    """
    :param api_call: Function that will be called concurrently. An API call to API_Provider for each entry.
    : param lst: List of finding's ids needed by the API function to call API_Provider endpoint.
    :param callback_processing: Function that will be called after we get the response from the above  API call.
    : param workers: Number of concurrent threads that will be used.
    :return: array of tuples containing the details of each particular finding.
    """

    output = Queue()
    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.put(f_det)
    return output

使用此方法时,我开始注意到一些随机问题(不是正常终止)。>>

[由于我使用数组而不是队列(output=[]),但是不确定我是否可以具有竞争条件,所以我决定重构代码并开始使用Queueoutput=Queue

我的问题是:

  • 我的代码现在是否没有竞争条件?

    [注意:我想指出,在Keynote on Concurrency, PyBay 2017的Raymond Hettinger之后,我添加了fuzz()的睡眠方法进行测试,但无法确定我是否确实患有比赛条件。

我有以下采用以下参数的并发_api_call_and_processing()方法:api_call:是对检索和XLM文档lst的外部WebSite的HTTP请求:...的列表]] >>

我认为没有足够的信息可以确定这一点。

请考虑如果传入api_call函数以增加全局变量会发生什么:

count = 0
def api_call_fn():
  global count 
  count += 1

同时执行时,其竞争条件可以递增count变量。

callback_processing功能也一样。


为了审核此代码是否符合竞争条件,我们必须查看这两个函数的定义:)

在上述条件下,该代码将没有竞争条件。根据concurrent.futures docs here,会发生什么:

  1. executor.submit():返回表示可调用对象执行的Future对象。
  2. as_completed(future_to_f_detail):返回由future_to_f_detail给出的Future实例的迭代器,该实例在完成时产生期货(完成或取消的期货)。

因此,确实for循环正在消耗迭代器并一一返回as_completed()产生的每个未来

因此,除非call_back()或我们调用的函数引入了某种异步功能(如上面@ dm03514所描述的示例,否则我们只是在for循环之后同步工作

   counter = 0
   with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            print(f"Entering the for loop for {counter+1} time") 
            counter +=1
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.append(f_det)
    return output

[如果我们有一个包含500个ID的数组,并且我们进行了500次通话,并且所有通话都产生了未来,我们将在每次进入try循环之前,以500次打印该消息。

在这种情况下,我们没有被迫使用队列来避免竞争情况。

python python-3.x multithreading python-multithreading
2个回答
1
投票

我认为没有足够的信息可以确定这一点。

请考虑如果传入api_call函数以增加全局变量会发生什么:


0
投票

在上述条件下,该代码将没有竞争条件。根据concurrent.futures docs here,会发生什么:

  1. executor.submit():返回表示可调用对象执行的Future对象。
  2. as_completed(future_to_f_detail):返回由future_to_f_detail给出的Future实例的迭代器,该实例在完成时产生期货(完成或取消的期货)。
© www.soinside.com 2019 - 2024. All rights reserved.