我正在使用 Python 3.10 和 FastAPI
0.92.0
编写服务器发送事件 (SSE) 流 api。这就是 Python 代码的样子:
from fastapi import APIRouter, FastAPI, Header
from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse
router = APIRouter()
@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
auth_mode, auth_token = authorization.split(' ')
if auth_token is None:
return "Authorization token is missing"
answer = chat_stream(q, auth_token)
return StreamingResponse(answer, media_type="text/event-stream")
这是
chat_stream
函数:
import openai
def chat_stream(question: str, key: str):
openai.api_key = key
# create a completion
completion = openai.Completion.create(model="text-davinci-003",
prompt=question,
stream=True)
return completion
当我使用此命令调用 api 时:
curl -N -H "Authorization: Bearer sk-the openai key" https://chat.poemhub.top/v1/completions?q=hello
服务器端显示以下错误:
INFO: 123.146.17.54:0 - "GET /v1/completions?q=hello HTTP/1.0" 200 OK
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "/usr/local/lib/python3.10/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/fastapi/applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 69, in app
await response(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 270, in __call__
async with anyio.create_task_group() as task_group:
File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 273, in wrap
await func()
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 264, in stream_response
chunk = chunk.encode(self.charset)
File "/usr/local/lib/python3.10/dist-packages/openai/openai_object.py", line 61, in __getattr__
raise AttributeError(*err.args)
AttributeError: encode
为什么会出现这个错误?我应该怎么做才能修复它?
如 FastAPI 的文档 中所述,
StreamingResponse
采用 async
生成器或普通生成器/迭代器并流式传输响应主体。正如 this answer 中所解释的,在任何一种情况下,FastAPI 仍将异步工作——如果传递给 StreamingResponse
的生成器不是异步的,FastAPI/Starlette 将在单独的线程中运行生成器(参见相关实现这里),使用iterate_in_threadpool()
,然后将被await
ed(见iterate_in_threadpool()
实现)。有关 FastAPI 中 def
与 async def
的更多详细信息,以及在 async def
端点内运行阻塞操作的解决方案,请查看此详细答案。
StreamingResponse
是 Response
的子类,在 bytes
中流式传输响应主体。因此,如果通过生成器传递的内容不是 bytes
格式,FastAPI/Starlette 将尝试将其编码/转换为 bytes
(使用 default utf-8
编码方案)。以下是相关实现的代码片段:
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})
但是,如果迭代器/生成器中的
chunk
不是可以编码的str
格式,则会引发AttributeError
(例如,AttributeError: ... has no attribute 'encode'
),类似于this answer中描述的内容(见选项 2,注 3)。此外,如果一个块包含 utf-8
编码范围之外的字符,您还不如面对 UnicodeEncodeError: ... codec can't encode character
。因此,鉴于您提供的错误回溯中的AttributeError: encode
,您很可能传递的对象不是str
.
下面的示例演示了一个普通的生成器(即
def gen()
),流式传输 JSON 数据,在这种情况下,需要先将 dict
对象转换为 str
(使用 json.dumps()
或任何其他 JSON 编码器,例如orjson
,请参阅here)然后到相应的字节值(使用.encode('utf-8')
,如前所述,如果省略,FastAPI/Starlette 将处理)。此外,下面的示例使用了text/event-stream
媒体类型(也称为MIME 类型),通常用于从服务器发送事件(另请参见event stream format)。如果您确信从服务器发送的数据始终为 JSON 格式,您也可以使用 application/json
媒体类型。请注意,如 this answer 中所述,使用 text/plain
媒体类型可能不会立即在浏览器中显示流数据,因为浏览器用于缓冲 text/plain
对所谓的“MIME 嗅探”的响应(请参阅上面关于如何禁用它的链接答案,如果你想使用text/plain
)。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import time
app = FastAPI()
@app.get('/')
def main():
def gen():
while True:
yield (json.dumps({'msg': 'Hello World!'}) + '\n\n').encode('utf-8')
time.sleep(0.5)
return StreamingResponse(gen(), media_type='text/event-stream')