我有一个
FastAPI
程序,它接收大约30-100个请求/秒的请求内容,大小为1Kb-1Mb,并且我正在对内容运行正则表达式匹配,正则表达式是空白容忍的,也就是说我可以拆分内容分成多个块并在不同进程中的每个块上运行正则表达式匹配,但由于代码是异步运行的,我需要一种方法来在进程执行时保持 API 运行。
所以我尝试将 ProcessPoolExecutor
与 run_in_executor
中的
asyncio
import asyncio
import concurrent.futures
import functools
async def executor_task(fn,executor=None ):
event_loop = asyncio.get_event_loop()
return await event_loop.run_in_executor(executor, fn)
def split_on_whitespace(content:str, count):
if not content: return ['' for i in range(count)]
length = len(content)
part = int(length / count )
last_beg = 0
last_end = part
splitted = []
for beg,end in zip(range(part, length, part), range(part*2, length, part)):
new_content = content[beg:end]
last_end = re.search("[\"'\s]", new_content, )
if not last_end :
splitted = splitted + ['' for i in range(count - len(splitted))]
break
last_end = last_end.end() + beg
content = content[last_beg: last_end]
splitted.append(content)
last_beg = last_end
last_end = end
return splitted
def run_regex_on_content_chunk(content):
domains = []
domain_patt = re.compile(r'([a-zA-Z0-9\-_]+\.){1,}[a-zA-Z0-9\-_]+') # extract domain name
for df in re.findall(domain_patt, content):
domains.append(content[df.start(): df.end()])
return domains
@app.post("/addContent")
async def add_content(content:dict):
all_content = content['data']
nworkers = 6
content_chunks = split_on_white_space(all_content) #split content
async_tasks = []
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for chunk in content_chunks:
regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
async_tasks.append(executor_task(regex_fn, executor)) # add to gather later
await asyncio.gather(*async_tasks) # gather
运行此代码会创建进程,但 API 完全挂起,在检查进程时,它们似乎处于空闲状态,但 API 仍然挂起并且根本不可用,并且执行似乎永远不会离开
with
语句。
PS:split_on_white_space运行函数和run_regex_on_content_chunk不包含循环或任何类型的阻塞代码
问题在于创建资源的生命周期/成本令人困惑。
与调用函数相比,外部进程是一个“巨大”的东西 - 通过在
view代码中调用
ProcessPoolExecutor
,您可以在 each 视图中创建 several 进程。
因此,您的视图代码的执行时间从几毫秒扩展到可能几秒钟。拥有“进程池”的整个想法是让多个外部进程中的工作人员预先生成并“准备好”处理您的数据。您的代码只是将其颠倒过来,并在每个视图的处理中添加大量样板繁重的工作,而无需在数据本身的处理中添加任何内容。 要做的事情是在服务器应用程序的生命周期内拥有一个
singleProcessPoolExecutor 实例。这本身可能有点难以正确设置 - 也许你最好使用像“Celery”这样的东西,它可以处理失败的工作人员,甚至让工作人员在不同的机器上允许免费水平扩展。但改变代码就是另一个层次了。 目前,专注于您的最小示例,在启动 fastAPI 实例时使用
lifespan
参数可能会起作用。下面,我刚刚更改了您视图中的代码以重用相同的进程池,并添加了为每个服务器进程创建单个执行程序池的代码。
请注意,还有一件事可能会导致您的代码完全失败(而不仅仅是运行速度慢 10000 倍):对 FastAPI 本身的调用必须仅在主进程上进行 - 如果它没有受到保护,即使是间接的,在检查这一点(if __name__ == "__main__":
子句)时,当尝试创建多处理池时,您的代码可能只是在失控链中设置多个 FastAPI 服务器。
...
@app.post("/addContent")
async def add_content(content:dict):
all_content = content['data']
nworkers = 6
content_chunks = split_on_white_space(all_content) #split content
async_tasks = []
for chunk in content_chunks:
regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
async_tasks.append(executor_task(regex_fn, executor)) # add to gather later
await asyncio.gather(*async_tasks)
...
from contextlib import asynccontextmanager
process_pool = None
@asynccontextmanager
async def executor_pool(app):
global process_pool
nworkers = 18 # These workers are shared for all views -
# even though, if CPU is 100% occupation you should get no
# gain above the number of hardware threads you have,
# in theory - I think that when we factor in the
# idle time for networking, and such about 3X that
# number will serve you well. Of course
# CPU usage should be monitored in production -
# as long as you don't reach close to 100% under maximum load
# you can further increase the number of workers.
process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=nworkers)
try:
yield # at this point, fastAPI will setup the server and run your application
finally:
process_pool.shutdown() # and this runs when your server is stopping!
if __name__ == "__main__":
app = FastAPI(lifespan=executor_pool) # use this line to call "FastAPI" - the line you do this is not present in your example code.