Falcon API - 客户端获取后处理响应

问题描述 投票:0回答:1

我有一个

falcon
我写的 API (WSGI) 需要尽可能快。

在代码的一部分,我有需要发送给客户端的响应,但我有一个数据湖,我在其中发送所有响应+使用 kafka 的更多计算结果。 我想将额外的计算 + 发送到 kafka 分开,因为客户端不需要等待它,而且它需要比我想要的更多的时间。

有没有办法在 Falcon 中做到这一点而无需像这样自己处理线程:

class Compute(Thread):
    def __init__(self, request):
        Thread.__init__(self)
        self.request = request

    def run(self):
        print("start")
        time.sleep(5)
        print(self.request)
        print("done")


class FalconApi
    def on_post(self, request: falcon.Request, response: falcon.Response):
        thread_a = Compute(request.__copy__())
        thread_a.start()

我使用

gunicorn
运行 API,所以我想也许我可以使用挂钩
post_request
但我似乎无法在挂钩中获取响应数据。我也尝试过使用
asyncio
但它似乎不喜欢当我使用它时因为我的应用程序是 WSGI.

我的异步代码:

class FalconApi
        @staticmethod
    def http_post(response: falcon.Response) -> None:
        requests.post(consts.ENDPOINT, headers=response.headers, data=json.dumps(response.data))

    async def http_post_async(self, response: falcon.Response) -> None:
        await asyncio.to_thread(self.http_post, response)

    def on_post(self, request: falcon.Request, response: falcon.Response):
        self.http_post_async(response)

和我得到的错误:

RuntimeWarning: coroutine 'http_post_async' was never awaited

当我把它改成:

    async def on_post(self, request: falcon.Request, response: falcon.Response):
        self.http_post_async(response)

我得到:

TypeError: The <bound methodon_post of object at 0x7f7cf0c4ffd0>> responder must be a regular synchronous method to be used with a WSGI app. 

python gunicorn falconframework
1个回答
0
投票

这就是我最终做的对我有用的事情。

我添加了一个

gunicorn
挂钩
post_request
我添加了我想要发布过程的响应到
falcon.request.env
属性然后我在挂钩中提取它。

我的代码看起来像:

api.py:

class FalconApi
    def on_post(self, request: falcon.Request, response: falcon.Response):
        response = <something>
        request.env['response'] = response

hooks.py:

def post_request(worker: SyncWorker, request: gunicorn.http.Request, environ: dict,
                 response: gunicorn.http.wsgi.Response) -> None:
    response = environ.get('response')
    if not response:
        logger.error(f'could not post process')
    PostProcessingRoute().post_process_response(response_body)

然后我使用

gunicorn -c hooks.py

运行它
© www.soinside.com 2019 - 2024. All rights reserved.