我有一个 Python 脚本,它向 API 发送请求以根据输入提示生成响应。每个请求通常需要大约 10 秒来处理。我有一个包含 6000 行的 CSV 文件,我想将“文本”列中的值作为输入提示发送到 API。此外,我的目标是将生成的响应附加到同一 CSV 文件中的新列。
考虑到大量请求以及完成每个请求所需的时间,我担心整体处理时间。有没有办法优化这个过程以避免连续花费 6000*10 秒?
这是基本代码片段:
import pandas as pd
from requests_futures.sessions import FuturesSession
import time
# Define your existing function for sending a request
def send_request(session, input_prompt):
url = 'https://ai-chat-test.***.workers.dev/'
consolidated_request = f"""From the input "{input_prompt}", extract key descriptions, deduplicate them, and use these to create a compelling, concise description of the fashion item."""
return session.post(url, json={"messages": [{"role": "user", "content": consolidated_request}]}, headers={"Content-Type": "application/json"})
# Function to apply send_request concurrently and save intermittently
def apply_concurrent_requests(df, function):
session = FuturesSession(max_workers=10) # Adjust max_workers as needed
futures = []
results = [None] * len(df)
batch_size = 50
for i, text in enumerate(df['text']):
futures.append((i, function(session, text)))
if (i + 1) % batch_size == 0 or i + 1 == len(df):
start_time = time.time()
# Process the batch
for index, future in futures:
response = future.result()
if response.status_code == 200:
data = response.json()
for message in data:
if message['role'] == 'assistant':
results[index] = message['content']
break
else:
results[index] = "No assistant message found."
else:
results[index] = f"Failed to get a response, status code: {response.status_code}"
# Save the responses so far to the DataFrame
for j, res in enumerate(results[:i+1]):
df.at[j, 'response'] = res
# Save to CSV
df.to_csv('intermediate_test.csv', index=False)
# Print the status
print(f"Processed {i+1} requests in {time.time() - start_time:.2f} seconds and saved to CSV.")
# Reset the futures for the next batch
futures = []
# Load the CSV file
df = pd.read_csv('test.csv')
# Apply the function concurrently and update DataFrame with the responses
apply_concurrent_requests(df, send_request)
# Optionally, save the final DataFrame to a new CSV file after all processing
df.to_csv('new_updated_test.csv', index=False)
如何优化此流程以一次发送多个请求或利用并行处理来减少总体处理时间?
您的代码似乎过于复杂。如果您希望始终有 10 个并发请求发生,那么您并没有真正以最有效的方式实现这一目标。您一次向大小为 10 的池提交 50 个(您的
batch_size
)任务,但是一旦完成这 50 个任务,所有池线程都会空闲,直到您提交下一个 50 个任务。确实,线程只会保留空闲时间很短,不会显着增加代码的运行时间。但是,如果您控制线程,您可以通过大大简化代码来稍微提高效率。
在下面的代码中,我使用
multiprocessing.pool.ThreadPool.imap
方法,该方法接受两个参数,一个辅助函数和一个 iterable,迭代时会生成要提交到池中的所有值。但这个迭代是“懒惰”完成的。如果我们使用生成器函数作为迭代,那么任务将在生成时提交。我还重构了代码,并将请求响应的处理放在发出请求的函数中:
import pandas as pd
import requests
from multiprocessing.pool import ThreadPool
import time
from functools import partial
def make_request(session, consolidated_request: str) -> str:
"""This function generates a post requests using the passed session
and consolidated_request areguments and returns the result of the post."""
url = 'https://ai-chat-test.***.workers.dev/'
response = session.post(url, json={"messages": [{"role": "user", "content": consolidated_request}]}, headers={"Content-Type": "application/json"})
if response.status_code == 200:
data = response.json()
for message in data:
if message['role'] == 'assistant':
return message['content']
return "No assistant message found."
else:
return f"Failed to get a response, status code: {response.status_code}"
def generate_requests(df: pd.DataFrame) -> str:
"""This generator function generates a "consolidated request" for
each row in df."""
for text in df["text"]:
yield f"""From the input "{text}", extract key descriptions, deduplicate them, and use these to create a compelling, concise description of the fashion item."""
def apply_concurrent_requests(df: pd.DataFrame) -> None:
"""Adds a new response column to the passed dataframe."""
with requests.Session() as session, ThreadPool(10) as pool:
worker = partial(make_request, session)
start_time = time.time()
for j, result in enumerate(pool.imap(worker, generate_requests(df))):
df.at[j, 'response'] = result
# Print the status
print(f"Processed {j+1} requests in {time.time() - start_time:.2f} seconds and saved to CSV.")
# Load the CSV file
df = pd.read_csv('test.csv')
# Apply the function concurrently and update DataFrame with the responses
apply_concurrent_requests(df)
# Optionally, save the final DataFrame to a new CSV file after all processing
df.to_csv('new_updated_test.csv', index=False)
您仍然需要选择最佳池大小(10 个线程是否太多?太少了?)