有没有办法在中间件中获取响应内容? 以下代码是从here复制的。
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
response
主体是一个迭代器,一旦迭代完成,就无法再次迭代。因此,您要么必须将所有迭代数据保存到 list
(或 bytes
变量)并使用它返回自定义 Response
,要么再次启动迭代器。下面的选项演示了这两种方法。如果您也想将 request
主体放入 middleware
内,请查看 这个答案。
将数据保存到
list
并使用 iterate_in_threadpool
再次启动迭代器,如此处所述 - 这就是 StreamingResponse
使用的内容,如此处所示。
from starlette.concurrency import iterate_in_threadpool
@app.middleware("http")
async def some_middleware(request: Request, call_next):
response = await call_next(request)
response_body = [chunk async for chunk in response.body_iterator]
response.body_iterator = iterate_in_threadpool(iter(response_body))
print(f"response_body={response_body[0].decode()}")
return response
注 1: 如果您的代码使用
StreamingResponse
,则 response_body[0]
将仅返回 chunk
中的第一个 response
。要获取整个 response
主体,您应该加入该字节(块)列表,如下所示(.decode()
返回 bytes
对象的字符串表示形式):
print(f"response_body={(b''.join(response_body)).decode()}")
注 2: 如果您的
StreamingResponse
流式传输的正文不适合服务器 RAM(例如,30GB 的响应),则在迭代 response.body_iterator
时可能会遇到内存错误(这适用于此答案中列出的两个选项),除非循环遍历response.body_iterator
(如选项2所示),但不是将块存储在内存变量中,而是将其存储在磁盘上的某个位置。然而,您随后需要从该磁盘位置检索整个响应数据并将其加载到 RAM 中,以便将其发送回客户端(这可能会进一步延长响应客户端的延迟),在这种情况下,您可以将内容分块加载到 RAM 中并使用 StreamingResponse
,类似于 here、here,以及 here、here 和 here(在选项 1 中,您可以只需将迭代器/生成器函数传递给 iterate_in_threadpool
)。但是,我不建议遵循这种方法,而是让此类端点返回从中间件中排除的大型流响应,如此答案中所述。
下面演示了另一种方法,其中响应正文存储在
bytes
对象中(而不是如上所示的列表),并用于直接返回自定义 Response
(以及 status_code
,原始响应的 headers
和 media_type
)。
@app.middleware("http")
async def some_middleware(request: Request, call_next):
response = await call_next(request)
response_body = b""
async for chunk in response.body_iterator:
response_body += chunk
print(f"response_body={response_body.decode()}")
return Response(content=response_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)