我有一个 Python 脚本,它向 API 发送请求以根据输入提示生成响应。每个请求通常需要大约 10 秒来处理。我有一个包含 6000 行的 CSV 文件,我想将“文本”列中的值作为输入提示发送到 API。此外,我的目标是将生成的响应附加到同一 CSV 文件中的新列。

考虑到大量请求以及完成每个请求所需的时间,我担心整体处理时间。有没有办法优化这个过程以避免连续花费 6000*10 秒?


import pandas as pd
from requests_futures.sessions import FuturesSession
import time

# Define your existing function for sending a request
def send_request(session, input_prompt):
    url = 'https://ai-chat-test.***.workers.dev/'
    consolidated_request = f"""From the input "{input_prompt}", extract key descriptions, deduplicate them, and use these to create a compelling, concise description of the fashion item."""
    return session.post(url, json={"messages": [{"role": "user", "content": consolidated_request}]}, headers={"Content-Type": "application/json"})

# Function to apply send_request concurrently and save intermittently
def apply_concurrent_requests(df, function):
    session = FuturesSession(max_workers=10)  # Adjust max_workers as needed
    futures = []
    results = [None] * len(df)
    batch_size = 50

    for i, text in enumerate(df['text']):
        futures.append((i, function(session, text)))
        if (i + 1) % batch_size == 0 or i + 1 == len(df):
            start_time = time.time()
            # Process the batch
            for index, future in futures:
                response = future.result()
                if response.status_code == 200:
                    data = response.json()
                    for message in data:
                        if message['role'] == 'assistant':
                            results[index] = message['content']
                        results[index] = "No assistant message found."
                    results[index] = f"Failed to get a response, status code: {response.status_code}"
            # Save the responses so far to the DataFrame
            for j, res in enumerate(results[:i+1]):
                df.at[j, 'response'] = res

            # Save to CSV
            df.to_csv('intermediate_test.csv', index=False)
            # Print the status
            print(f"Processed {i+1} requests in {time.time() - start_time:.2f} seconds and saved to CSV.")
            # Reset the futures for the next batch
            futures = []

# Load the CSV file
df = pd.read_csv('test.csv')

# Apply the function concurrently and update DataFrame with the responses
apply_concurrent_requests(df, send_request)

# Optionally, save the final DataFrame to a new CSV file after all processing
df.to_csv('new_updated_test.csv', index=False)


您的代码似乎过于复杂。如果您希望始终有 10 个并发请求发生,那么您并没有真正以最有效的方式实现这一目标。您一次向大小为 10 的池提交 50 个(您的

)任务,但是一旦完成这 50 个任务,所有池线程都会空闲,直到您提交下一个 50 个任务。确实,线程只会保留空闲时间很短,不会显着增加代码的运行时间。但是,如果您控制线程,您可以通过大大简化代码来稍微提高效率。


方法,该方法接受两个参数,一个辅助函数和一个 iterable,迭代时会生成要提交到池中的所有值。但这个迭代是“懒惰”完成的。如果我们使用生成器函数作为迭代,那么任务将在生成时提交。我还重构了代码,并将请求响应的处理放在发出请求的函数中:

import pandas as pd
import requests
from multiprocessing.pool import ThreadPool
import time
from functools import partial

def make_request(session, consolidated_request: str) -> str:
    """This function generates a post requests using the passed session
    and consolidated_request areguments and returns the result of the post."""

    url = 'https://ai-chat-test.***.workers.dev/'
    response = session.post(url, json={"messages": [{"role": "user", "content": consolidated_request}]}, headers={"Content-Type": "application/json"})
    if response.status_code == 200:
        data = response.json()
        for message in data:
            if message['role'] == 'assistant':
                return message['content']
        return "No assistant message found."
        return f"Failed to get a response, status code: {response.status_code}"

def generate_requests(df: pd.DataFrame) -> str:
    """This generator function generates a "consolidated request" for
    each row in df."""

    for text in df["text"]:
        yield f"""From the input "{text}", extract key descriptions, deduplicate them, and use these to create a compelling, concise description of the fashion item."""

def apply_concurrent_requests(df: pd.DataFrame) -> None:
    """Adds a new response column to the passed dataframe."""

    with requests.Session() as session, ThreadPool(10) as pool:
        worker = partial(make_request, session)
        start_time = time.time()
        for j, result in enumerate(pool.imap(worker, generate_requests(df))):
            df.at[j, 'response'] = result
        # Print the status
        print(f"Processed {j+1} requests in {time.time() - start_time:.2f} seconds and saved to CSV.")

# Load the CSV file
df = pd.read_csv('test.csv')

# Apply the function concurrently and update DataFrame with the responses

# Optionally, save the final DataFrame to a new CSV file after all processing
df.to_csv('new_updated_test.csv', index=False)

您仍然需要选择最佳池大小(10 个线程是否太多?太少了?)

