如何设置阻塞函数以在执行程序中运行,结果无关紧要,因此主线程不应等待或减慢它。
老实说,我不确定这对于它是否是正确的解决方案,我想要的是将某些类型的处理队列与主进程分开,以便它不会阻止服务器应用程序返回请求,因为这Web服务器的类型为许多请求运行一个worker。
我希望远离像Celery这样的解决方案,但如果这是最优化的,我愿意学习它。
这里的上下文是一个异步Web服务器,它生成包含大图像的pdf文件。
app = Sanic()
#App "global" worker
executor = ProcessPoolExecutor(max_workers=5)
app.route('/')
async def getPdf(request):
asyncio.create_task(renderPdfsInExecutor(request.json))
#This should be returned "instantly" regardless of pdf generation time
return response.text('Pdf being generated, it will be sent to your email when done')
async def renderPdfsInExecutor(json):
asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)
def syncRenderPdfs(json)
#Some PDF Library that downloads images synchronously
pdfs = somePdfLibrary.generatePdfsFromJson(json)
sendToDefaultMail(pdfs)
上面的代码给出了错误(是的,它以管理员身份运行):
PermissionError [WinError 5] Access denied
Future exception was never retrieved
奖金问题:我是否通过在执行程序中运行asyncio循环获得任何收益?因此,如果它一次处理多个PDF请求,它将在它们之间分配处理。如果是,我该怎么办?
好的,首先是误会。这个
async def getPdf(request):
asyncio.create_task(renderPdfsInExecutor(request.json))
...
async def renderPdfsInExecutor(json):
asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)
是多余的。这足够了
async def getPdf(request):
asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, request.json)
...
或者(因为你不想等待)甚至更好
async def getPdf(request):
executor.submit(syncRenderPdfs, request.json)
...
现在你遇到的问题是因为syncRenderPdfs
抛出PermissionError
。它没有被处理,因此Python警告你“嘿,一些后台代码引发了错误。但是代码并不是由任何人拥有的,所以到底是什么?”。这就是为什么你得到Future exception was never retrieved
。你有一个pdf库本身的问题,而不是asyncio。一旦你解决了这个内心问题,保证安全也是个好主意:
def syncRenderPdfs(json)
try:
#Some PDF Library that downloads images synchronously
pdfs = somePdfLibrary.generatePdfsFromJson(json)
sendToDefaultMail(pdfs)
except Exception:
logger.exception('Something went wrong') # or whatever
您的“权限被拒绝”问题是完全不同的事情,您应该调试它和/或为此发布单独的问题。
至于最后一个问题:是的,执行者将在工人之间排队并均匀分配任务。
编辑:正如我们在评论中所说的那样,实际问题可能出在您所使用的Windows环境中。或者更准确地说,使用ProcessPoolExecutor,即产生进程可能会更改权限。我建议使用ThreadPoolExecutor,假设它在平台上工作正常。
您可以查看asyncio.gather(* tasks)以并行运行多个。