如何在一次迭代中异步调用 5 个随机 API,然后重复 n 次迭代

问题描述 投票:0回答:2

基本上,我的代码有问题,我怀疑我实际上是同时运行它,但不是以并行方法运行。我的结果存在的问题是,执行一次迭代(调用 5 个 API)平均需要 1 秒,当调用一个 API 仅需要 0.2 秒时,这是没有意义的。因此,如果是并行的,一次迭代也应该花费大约 0.2 秒。简而言之,看起来我是按顺序运行的。

import concurrent.futures 
import requests 
import random
import pandas as pd
import time

#Function to determine validity of the API response 
def call_api(url):
    try:
        response = requests.get(url)
        return response.status_code == 200
    except requests.exceptions.RequestException:
        return False

# Load the DataFrame from your dataset
df=pd.read_csv(r"C:\Users\Jose.Moquiambo\Bulk Calling 
 APIs\confidential-sales.csv")  

# Number of iterations
num_iterations = 100

# Create an empty DataFrame to store the results
results_df = pd.DataFrame(columns=['Iteration', 'Pass', 'Time Taken'])

# Variables for tracking min, max, and total time
min_time = float('inf')
max_time = float('-inf')
total_time = 0

def process_iteration(iteration):
    # Get a random sample of URLs from the DataFrame
    random_urls = df['url_column'].sample(n=5).tolist() 

    # Start timer
    start_time = time.time()

    # Execute API calls concurrently using ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=5)as 
    executor:
      results = executor.map(call_api, random_urls)

    # Stop timer
    end_time = time.time()

    # Calculate the time taken for this iteration
    iteration_time = end_time - start_time

    # Update min, max, and total time
    global min_time, max_time, total_time
    min_time = min(min_time, iteration_time)
    max_time = max(max_time, iteration_time)
    total_time += iteration_time

  # Check if any API call was not successful in this iteration
    passed = 'Y' if all(results) else 'N'

  # Add the iteration results to the DataFrame
    results_df.loc[iteration] = [iteration, passed , iteration_time]

   # Run the iterations
for i in range(1, num_iterations + 1):
    process_iteration(i)

# Calculate average time per iteration
avg_time = total_time / num_iterations

# Display the results DataFrame
print(results_df)

# Summary statistics
print("Minimum time taken:", min_time)
print("Maximum time taken:", max_time)
print("Average time per iteration:", avg_time)
print("Y stands for error-free response and N for invalid response")

输出

       Iteration Pass  Time Taken
 1            1    Y    1.027692
 2            2    Y    1.105409
 3            3    Y    0.998195
 4            4    Y    1.046251
 5            5    Y    1.083588
 ..         ...  ...         ...
 96          96    Y    1.119467
 97          97    Y    1.109750
 98          98    Y    1.025725
 99          99    Y    1.115403
 100        100    Y    1.114420

 [100 rows x 3 columns]
  Minimum time taken: 0.9663488864898682
  Maximum time taken: 1.529128074645996
  Average time per iteration: 1.083704767227173
  Y stands for error-free response and N for errors

输出采用数据帧的形式,您可以清楚地看到每次迭代平均花费 1 秒,这是不正确的,因为我检查的应该是大约 0.2 秒。因此,它是顺序运行的,而不是并行运行的方法。这是我测试经过时间的示例。

response = requests.get("https://example.api.com")
print(response.elapsed.total_seconds())

输出:

0.274109
python asynchronous parallel-processing
2个回答
2
投票

使用 ThreadPool 及其 map_async 功能,您可以在您的场景中获得良好的性能。

如果无法访问OP的数据,这个答案显然是人为的,但可以作为一种有价值的模式。

这个完整的程序使用一个平面文件,其中每一行都包含一个 URL。使用的测试文件有 108 个不同的 URL,这些 URL 源自 http://www.testingmcafeesites.com/index.html

import requests
from multiprocessing.pool import ThreadPool
from sys import stderr
from time import perf_counter

SAMPLES = '/Volumes/G-Drive/samples.txt'

def process(url: str) -> None:
    try:
        with requests.get(url) as response:
            response.raise_for_status()
            print(url.split('/')[-1], response.elapsed.total_seconds())
    except Exception as e:
        print(e, file=stderr)

def get_urls() -> list[str]:
    try:
        with open(SAMPLES) as samples:
            return list(map(str.rstrip, samples))
    except Exception as e:
        print(e, file=stderr)
    return []

def main(urls: list[str]) -> None:
    start = perf_counter()
    with ThreadPool() as pool:
        pool.map_async(process, urls).wait()
    print(f'Duration={perf_counter()-start:.2f}s')

if __name__ == '__main__':
    main(get_urls())

输出:

testcat_hm.html 0.300456
testcat_io.html 0.300651
testcat_cm.html 0.308631
...
testcat_we.html 0.335705
testcat_wp.html 0.335095
testrep_red.html 0.349888
Duration=2.36s

结论:

当您考虑到有 108 个 GET 请求,每个请求花费约 0.3 秒但总持续时间为 <2.5s

时,并行程度是显而易见的

0
投票

requests
库本身不提供非阻塞IO根据文档

但我建议安装并使用

requests-futures
,语法很简单:

from concurrent.futures import as_completed
from pprint import pprint
from requests_futures.sessions import FuturesSession

session = FuturesSession()

futures=[session.get(f'http://httpbin.org/get?{i}') for i in range(3)]

for future in as_completed(futures):
    resp = future.result()
    pprint({
        'url': resp.request.url,
        'content': resp.json(),
    })

您将能够从 I/O 阻塞操作中获得速度。

如果您仍然发现执行速度太慢,那么您可能需要通过

multiprocessing
包将执行拆分为多个进程。它的使用具有更高的限制,但确实提供了 GIL(全局解释器锁)之外的并行性。

© www.soinside.com 2019 - 2024. All rights reserved.