我有一个
falcon
我写的 API (WSGI) 需要尽可能快。
在代码的一部分,我有需要发送给客户端的响应,但我有一个数据湖,我在其中发送所有响应+使用 kafka 的更多计算结果。 我想将额外的计算 + 发送到 kafka 分开,因为客户端不需要等待它,而且它需要比我想要的更多的时间。
有没有办法在 Falcon 中做到这一点而无需像这样自己处理线程:
class Compute(Thread):
def __init__(self, request):
Thread.__init__(self)
self.request = request
def run(self):
print("start")
time.sleep(5)
print(self.request)
print("done")
class FalconApi
def on_post(self, request: falcon.Request, response: falcon.Response):
thread_a = Compute(request.__copy__())
thread_a.start()
我使用
gunicorn
运行 API,所以我想也许我可以使用挂钩 post_request
但我似乎无法在挂钩中获取响应数据。我也尝试过使用 asyncio
但它似乎不喜欢当我使用它时因为我的应用程序是 WSGI.
我的异步代码:
class FalconApi
@staticmethod
def http_post(response: falcon.Response) -> None:
requests.post(consts.ENDPOINT, headers=response.headers, data=json.dumps(response.data))
async def http_post_async(self, response: falcon.Response) -> None:
await asyncio.to_thread(self.http_post, response)
def on_post(self, request: falcon.Request, response: falcon.Response):
self.http_post_async(response)
和我得到的错误:
RuntimeWarning: coroutine 'http_post_async' was never awaited
当我把它改成:
async def on_post(self, request: falcon.Request, response: falcon.Response):
self.http_post_async(response)
我得到:
TypeError: The <bound methodon_post of object at 0x7f7cf0c4ffd0>> responder must be a regular synchronous method to be used with a WSGI app.
这就是我最终做的对我有用的事情。
我添加了一个
gunicorn
挂钩post_request
我添加了我想要发布过程的响应到falcon.request.env
属性然后我在挂钩中提取它。
我的代码看起来像:
api.py:
class FalconApi
def on_post(self, request: falcon.Request, response: falcon.Response):
response = <something>
request.env['response'] = response
hooks.py:
def post_request(worker: SyncWorker, request: gunicorn.http.Request, environ: dict,
response: gunicorn.http.wsgi.Response) -> None:
response = environ.get('response')
if not response:
logger.error(f'could not post process')
PostProcessingRoute().post_process_response(response_body)
然后我使用
gunicorn -c hooks.py
运行它