基本上,我的代码有问题,我怀疑我实际上是同时运行它,但不是以并行方法运行。我的结果存在的问题是,执行一次迭代(调用 5 个 API)平均需要 1 秒,当调用一个 API 仅需要 0.2 秒时,这是没有意义的。因此,如果是并行的,一次迭代也应该花费大约 0.2 秒。简而言之,看起来我是按顺序运行的。
import concurrent.futures
import requests
import random
import pandas as pd
import time
#Function to determine validity of the API response
def call_api(url):
try:
response = requests.get(url)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
# Load the DataFrame from your dataset
df=pd.read_csv(r"C:\Users\Jose.Moquiambo\Bulk Calling
APIs\confidential-sales.csv")
# Number of iterations
num_iterations = 100
# Create an empty DataFrame to store the results
results_df = pd.DataFrame(columns=['Iteration', 'Pass', 'Time Taken'])
# Variables for tracking min, max, and total time
min_time = float('inf')
max_time = float('-inf')
total_time = 0
def process_iteration(iteration):
# Get a random sample of URLs from the DataFrame
random_urls = df['url_column'].sample(n=5).tolist()
# Start timer
start_time = time.time()
# Execute API calls concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=5)as
executor:
results = executor.map(call_api, random_urls)
# Stop timer
end_time = time.time()
# Calculate the time taken for this iteration
iteration_time = end_time - start_time
# Update min, max, and total time
global min_time, max_time, total_time
min_time = min(min_time, iteration_time)
max_time = max(max_time, iteration_time)
total_time += iteration_time
# Check if any API call was not successful in this iteration
passed = 'Y' if all(results) else 'N'
# Add the iteration results to the DataFrame
results_df.loc[iteration] = [iteration, passed , iteration_time]
# Run the iterations
for i in range(1, num_iterations + 1):
process_iteration(i)
# Calculate average time per iteration
avg_time = total_time / num_iterations
# Display the results DataFrame
print(results_df)
# Summary statistics
print("Minimum time taken:", min_time)
print("Maximum time taken:", max_time)
print("Average time per iteration:", avg_time)
print("Y stands for error-free response and N for invalid response")
Iteration Pass Time Taken
1 1 Y 1.027692
2 2 Y 1.105409
3 3 Y 0.998195
4 4 Y 1.046251
5 5 Y 1.083588
.. ... ... ...
96 96 Y 1.119467
97 97 Y 1.109750
98 98 Y 1.025725
99 99 Y 1.115403
100 100 Y 1.114420
[100 rows x 3 columns]
Minimum time taken: 0.9663488864898682
Maximum time taken: 1.529128074645996
Average time per iteration: 1.083704767227173
Y stands for error-free response and N for errors
输出采用数据帧的形式,您可以清楚地看到每次迭代平均花费 1 秒,这是不正确的,因为我检查的应该是大约 0.2 秒。因此,它是顺序运行的,而不是并行运行的方法。这是我测试经过时间的示例。
response = requests.get("https://example.api.com")
print(response.elapsed.total_seconds())
输出:
0.274109
使用 ThreadPool 及其 map_async 功能,您可以在您的场景中获得良好的性能。
如果无法访问OP的数据,这个答案显然是人为的,但可以作为一种有价值的模式。
这个完整的程序使用一个平面文件,其中每一行都包含一个 URL。使用的测试文件有 108 个不同的 URL,这些 URL 源自 http://www.testingmcafeesites.com/index.html
import requests
from multiprocessing.pool import ThreadPool
from sys import stderr
from time import perf_counter
SAMPLES = '/Volumes/G-Drive/samples.txt'
def process(url: str) -> None:
try:
with requests.get(url) as response:
response.raise_for_status()
print(url.split('/')[-1], response.elapsed.total_seconds())
except Exception as e:
print(e, file=stderr)
def get_urls() -> list[str]:
try:
with open(SAMPLES) as samples:
return list(map(str.rstrip, samples))
except Exception as e:
print(e, file=stderr)
return []
def main(urls: list[str]) -> None:
start = perf_counter()
with ThreadPool() as pool:
pool.map_async(process, urls).wait()
print(f'Duration={perf_counter()-start:.2f}s')
if __name__ == '__main__':
main(get_urls())
输出:
testcat_hm.html 0.300456
testcat_io.html 0.300651
testcat_cm.html 0.308631
...
testcat_we.html 0.335705
testcat_wp.html 0.335095
testrep_red.html 0.349888
Duration=2.36s
结论:
当您考虑到有 108 个 GET 请求,每个请求花费约 0.3 秒但总持续时间为 <2.5s
时,并行程度是显而易见的requests
库本身不提供非阻塞IO根据文档。
requests-futures
,语法很简单:
from concurrent.futures import as_completed
from pprint import pprint
from requests_futures.sessions import FuturesSession
session = FuturesSession()
futures=[session.get(f'http://httpbin.org/get?{i}') for i in range(3)]
for future in as_completed(futures):
resp = future.result()
pprint({
'url': resp.request.url,
'content': resp.json(),
})
您将能够从 I/O 阻塞操作中获得速度。
multiprocessing
包将执行拆分为多个进程。它的使用具有更高的限制,但确实提供了 GIL(全局解释器锁)之外的并行性。