设计Python代码以通过服务器执行并发请求以获得最大请求

问题描述 投票:0回答:1

我正在尝试编写下面的 python 脚本,其中我有 5 个服务器,每个服务器最多可以同时处理 2 个请求。我有 10 个请求需要处理。每个服务器各选取 2 个请求,对其进行处理,并在有能力时立即从池中选取另一个请求。

我编写的下面的代码等待服务器的所有 2 个请求,然后服务器才会选择另外 2 个请求。我希望它在处理完请求后立即选择请求。

async def process_request(server_id, request_id):
    processing_time = random.randint(10, 30)
    print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
    await asyncio.sleep(processing_time)
    print(f"Server {server_id} finished processing request {request_id}")

async def server_worker(server_id, queue, num_concurrent_requests_per_server):
    while True:
        request_ids = []
        for _ in range(num_concurrent_requests_per_server):
            try:
                request_id = await queue.get()
                request_ids.append(request_id)
            except asyncio.QueueEmpty:
                break

        tasks = []
        for request_id in request_ids:
            task = asyncio.create_task(process_request(server_id, request_id))
            tasks.append(task)
        await asyncio.gather(*tasks)
        for _ in range(len(request_ids)):
            await queue.put(random.randint(1, 100))  # Add one more request to the queue

async def main():
    num_servers = 5
    num_concurrent_requests_per_server = 2
    total_requests = 100

    servers = [asyncio.Queue() for _ in range(num_servers)]

    # Start server workers
    server_tasks = []
    for i in range(num_servers):
        task = asyncio.create_task(server_worker(i, servers[i], num_concurrent_requests_per_server))
        server_tasks.append(task)

    # Generate and enqueue initial requests
    for _ in range(num_servers * num_concurrent_requests_per_server):
        server_id = _ % num_servers
        await servers[server_id].put(random.randint(1, 100))

    # Wait for all requests to be processed
    await asyncio.gather(*[servers[i].join() for i in range(num_servers)])

    # Cancel server workers
    for task in server_tasks:
        task.cancel()

if __name__ == "__main__":
    asyncio.run(main())
python python-3.x concurrency python-asyncio
1个回答
0
投票

您可以尝试将

server_worker
函数修改为单个共享队列,所有服务器都可以从中获取请求(而不是为每个服务器创建单独的队列):这将简化处理请求的逻辑并确保任何服务器都可以获取请求一旦有能力就提出新请求。

import asyncio
import random

async def process_request(server_id, request_id):
    processing_time = random.randint(10, 30)
    print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
    await asyncio.sleep(processing_time)
    print(f"Server {server_id} finished processing request {request_id}")

async def server_worker(server_id, queue, num_concurrent_requests_per_server):
    ongoing_requests = set()
    while True:
        if len(ongoing_requests) < num_concurrent_requests_per_server:
            try:
                request_id = await queue.get()
                task = asyncio.create_task(process_request(server_id, request_id))
                ongoing_requests.add(task)
                task.add_done_callback(lambda _ : ongoing_requests.remove(task))
            except asyncio.QueueEmpty:
                break
        else:
            # Wait for any one of the tasks to complete
            done, _ = await asyncio.wait(ongoing_requests, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                ongoing_requests.remove(task)

async def main():
    num_servers = 5
    num_concurrent_requests_per_server = 2
    total_requests = 10  # Adjusted to match your setup description

    queue = asyncio.Queue()
    # Generate and enqueue initial requests
    for _ in range(total_requests):
        await queue.put(random.randint(1, 100))

    server_tasks = [
        asyncio.create_task(server_worker(i, queue, num_concurrent_requests_per_server))
        for i in range(num_servers)
    ]

    # Wait for all requests to be processed
    await asyncio.gather(*server_tasks)

if __name__ == "__main__":
    asyncio.run(main())

每个

server_worker
现在维护一组正在进行的请求任务 (
ongoing_requests
)。每当它开始处理请求时,它都会向该集合添加一个新任务,并在请求完成时将其删除。这样,它就可以随时检查是否有能力启动新请求。

不是收集任务并等待所有任务完成,而是立即处理任务。如果服务器已满,它会等待至少一项任务完成,然后再继续轮询队列。

    

© www.soinside.com 2019 - 2024. All rights reserved.