我正在尝试编写下面的 python 脚本,其中我有 5 个服务器,每个服务器最多可以同时处理 2 个请求。我有 10 个请求需要处理。每个服务器各选取 2 个请求,对其进行处理,并在有能力时立即从池中选取另一个请求。
我编写的下面的代码等待服务器的所有 2 个请求,然后服务器才会选择另外 2 个请求。我希望它在处理完请求后立即选择请求。
async def process_request(server_id, request_id):
processing_time = random.randint(10, 30)
print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
await asyncio.sleep(processing_time)
print(f"Server {server_id} finished processing request {request_id}")
async def server_worker(server_id, queue, num_concurrent_requests_per_server):
while True:
request_ids = []
for _ in range(num_concurrent_requests_per_server):
try:
request_id = await queue.get()
request_ids.append(request_id)
except asyncio.QueueEmpty:
break
tasks = []
for request_id in request_ids:
task = asyncio.create_task(process_request(server_id, request_id))
tasks.append(task)
await asyncio.gather(*tasks)
for _ in range(len(request_ids)):
await queue.put(random.randint(1, 100)) # Add one more request to the queue
async def main():
num_servers = 5
num_concurrent_requests_per_server = 2
total_requests = 100
servers = [asyncio.Queue() for _ in range(num_servers)]
# Start server workers
server_tasks = []
for i in range(num_servers):
task = asyncio.create_task(server_worker(i, servers[i], num_concurrent_requests_per_server))
server_tasks.append(task)
# Generate and enqueue initial requests
for _ in range(num_servers * num_concurrent_requests_per_server):
server_id = _ % num_servers
await servers[server_id].put(random.randint(1, 100))
# Wait for all requests to be processed
await asyncio.gather(*[servers[i].join() for i in range(num_servers)])
# Cancel server workers
for task in server_tasks:
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
您可以尝试将
server_worker
函数修改为单个共享队列,所有服务器都可以从中获取请求(而不是为每个服务器创建单独的队列):这将简化处理请求的逻辑并确保任何服务器都可以获取请求一旦有能力就提出新请求。
import asyncio
import random
async def process_request(server_id, request_id):
processing_time = random.randint(10, 30)
print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
await asyncio.sleep(processing_time)
print(f"Server {server_id} finished processing request {request_id}")
async def server_worker(server_id, queue, num_concurrent_requests_per_server):
ongoing_requests = set()
while True:
if len(ongoing_requests) < num_concurrent_requests_per_server:
try:
request_id = await queue.get()
task = asyncio.create_task(process_request(server_id, request_id))
ongoing_requests.add(task)
task.add_done_callback(lambda _ : ongoing_requests.remove(task))
except asyncio.QueueEmpty:
break
else:
# Wait for any one of the tasks to complete
done, _ = await asyncio.wait(ongoing_requests, return_when=asyncio.FIRST_COMPLETED)
for task in done:
ongoing_requests.remove(task)
async def main():
num_servers = 5
num_concurrent_requests_per_server = 2
total_requests = 10 # Adjusted to match your setup description
queue = asyncio.Queue()
# Generate and enqueue initial requests
for _ in range(total_requests):
await queue.put(random.randint(1, 100))
server_tasks = [
asyncio.create_task(server_worker(i, queue, num_concurrent_requests_per_server))
for i in range(num_servers)
]
# Wait for all requests to be processed
await asyncio.gather(*server_tasks)
if __name__ == "__main__":
asyncio.run(main())
每个
server_worker
现在维护一组正在进行的请求任务 (ongoing_requests
)。每当它开始处理请求时,它都会向该集合添加一个新任务,并在请求完成时将其删除。这样,它就可以随时检查是否有能力启动新请求。
不是收集任务并等待所有任务完成,而是立即处理任务。如果服务器已满,它会等待至少一项任务完成,然后再继续轮询队列。