如何使用run_in_executor来管理ProcessPoolExecutor

问题描述 投票:0回答:1

我有一个

FastAPI
程序,它接收大约30-100个请求/秒的请求内容,大小为1Kb-1Mb,并且我正在对内容运行正则表达式匹配,正则表达式是空白容忍的,也就是说我可以拆分内容分成多个块并在不同进程中的每个块上运行正则表达式匹配,但由于代码是异步运行的,我需要一种方法来在进程执行时保持 API 运行。 所以我尝试将
ProcessPoolExecutor
run_in_executor
 中的 
asyncio
一起使用 这是我所做的 MVP 代码:

import asyncio
import concurrent.futures
import functools


async def executor_task(fn,executor=None ):
  event_loop = asyncio.get_event_loop()
  return await event_loop.run_in_executor(executor, fn)


def split_on_whitespace(content:str, count):
  if not content: return ['' for i in range(count)]

  length = len(content)
  part = int(length / count )
  last_beg = 0
  last_end = part
  splitted = []
  for beg,end in zip(range(part, length, part), range(part*2, length, part)):
    new_content = content[beg:end]
    last_end = re.search("[\"'\s]", new_content, )

    if not last_end :
      splitted = splitted + ['' for i in range(count - len(splitted))]
      break 

    last_end = last_end.end() + beg
    content = content[last_beg: last_end]
    splitted.append(content)
    last_beg = last_end
    last_end = end

  return splitted

def run_regex_on_content_chunk(content):
    domains = []
    domain_patt = re.compile(r'([a-zA-Z0-9\-_]+\.){1,}[a-zA-Z0-9\-_]+') # extract domain name
    for df in re.findall(domain_patt, content):
      domains.append(content[df.start(): df.end()])

    return domains


@app.post("/addContent")
async def add_content(content:dict):

  all_content = content['data']
  nworkers = 6
  content_chunks = split_on_white_space(all_content)                  #split content
  async_tasks = []
  with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
    for chunk in content_chunks:
      regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
      async_tasks.append(executor_task(regex_fn, executor))           # add to gather later 

    await asyncio.gather(*async_tasks)                                # gather 

运行此代码会创建进程,但 API 完全挂起,在检查进程时,它们似乎处于空闲状态,但 API 仍然挂起并且根本不可用,并且执行似乎永远不会离开

with
语句。

PS:split_on_white_space运行函数和run_regex_on_content_chunk不包含循环或任何类型的阻塞代码

python python-asyncio concurrent.futures process-pool
1个回答
0
投票

问题在于创建资源的生命周期/成本令人困惑。

与调用函数相比,外部进程是一个“巨大”的东西 - 通过在

view
代码中调用 ProcessPoolExecutor,您可以在 each 视图中创建 several 进程。

因此,您的视图代码的执行时间从几毫秒扩展到可能几秒钟。拥有“进程池”的整个想法是让多个外部进程中的工作人员预先生成并“准备好”处理您的数据。您的代码只是将其颠倒过来,并在每个视图的处理中添加大量样板繁重的工作,而无需在数据本身的处理中添加任何内容。 要做的事情是在服务器应用程序的生命周期内拥有一个

single

ProcessPoolExecutor 实例。这本身可能有点难以正确设置 - 也许你最好使用像“Celery”这样的东西,它可以处理失败的工作人员,甚至让工作人员在不同的机器上允许免费水平扩展。但改变代码就是另一个层次了。 目前,专注于您的最小示例,在启动 fastAPI 实例时使用

lifespan

参数可能会起作用。下面,我刚刚更改了您视图中的代码以重用相同的进程池,并添加了为每个服务器进程创建单个执行程序池的代码。

请注意,还有一件事可能会导致您的代码完全失败(而不仅仅是运行速度慢 10000 倍):对 FastAPI 本身的调用必须仅在主进程上进行 - 如果它没有受到保护,即使是间接的,在检查这一点(

if __name__ == "__main__":

子句)时,当尝试创建多处理池时,您的代码可能只是在失控链中设置多个 FastAPI 服务器。

...
@app.post("/addContent")
async def add_content(content:dict):

    all_content = content['data']
    nworkers = 6
    content_chunks = split_on_white_space(all_content)                  #split content
    async_tasks = []
    for chunk in content_chunks:
        regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
        async_tasks.append(executor_task(regex_fn, executor))           # add to gather later 
    await asyncio.gather(*async_tasks)     
        
    
...


from contextlib import asynccontextmanager

process_pool = None

@asynccontextmanager
async def executor_pool(app):
    global process_pool
    nworkers = 18   # These workers are shared for all views - 
                    # even though, if CPU is 100% occupation you should get no
                    # gain above the number of hardware threads you have,
                    # in theory - I think that when we factor in the
                    # idle time for networking, and such  about 3X that 
                    # number will serve you well. Of course
                    # CPU usage should be monitored in production - 
                    # as long as you don't reach close to 100% under maximum load
                    # you can further increase the number of workers.
    process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=nworkers)
    try:
        yield # at this point, fastAPI will setup the server and run your application
    finally:
        process_pool.shutdown()  # and this runs when your server is stopping!
 
if __name__ == "__main__":
    app = FastAPI(lifespan=executor_pool)   # use this line to call "FastAPI" - the line you do this is not present in your example code.


© www.soinside.com 2019 - 2024. All rights reserved.