我有一个相对简单的 FastAPI 应用程序,它接受查询并流回来自 ChatGPT API 的响应。 ChatGPT 正在流回结果,我可以看到它在进入时被打印到控制台。
不工作的是
StreamingResponse
通过 FastAPI 返回。相反,响应会一起发送。我真的不知道为什么这不起作用。
这是 FastAPI 应用程序代码:
import os
import time
import openai
import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
auth_scheme = HTTPBearer()
app = fastapi.FastAPI()
openai.api_key = os.environ["OPENAI_API_KEY"]
def ask_statesman(query: str):
#prompt = router(query)
completion_reason = None
response = ""
while not completion_reason or completion_reason == "length":
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True,
)
for line in openai_stream:
completion_reason = line["choices"][0]["finish_reason"]
if "content" in line["choices"][0].delta:
current_response = line["choices"][0].delta.content
print(current_response)
yield current_response
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
if auth_key != "123":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": auth_scheme.scheme_name},
)
else:
stream_response = ask_statesman(query)
return StreamingResponse(stream_response, media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
这里是非常简单的
test.py
文件来测试这个:
import requests
query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}
response = requests.post(url, params=params, stream=True)
for chunk in response.iter_lines():
if chunk:
print(chunk.decode("utf-8"))
POST
请求从服务器请求数据不是好的做法。使用 GET
请求会更适合您的情况。除此之外,您不应该发送凭据,例如 auth_key
作为 URL 的一部分(即,使用 query string),而是您应该使用 Headers
和/或 Cookies
(使用HTTPS
)。请查看 this answer 以获取有关标头和 cookie 概念的更多详细信息和示例,以及使用查询参数时所涉及的风险。也可以在here和here以及here、here和here中找到有关此主题的有用帖子。
StreamingResponse
的生成器函数中执行阻塞操作(即 I/O 绑定或 CPU 绑定任务),您应该使用 def
定义该函数(就像您目前所做的那样) ) 而不是 async def
,否则,阻塞操作以及生成器内部使用的 time.sleep()
函数将阻塞事件循环。正如 here 所解释的,如果用于流式传输响应主体的函数是普通生成器/迭代器(即 def
)而不是 async def
一个,FastAPI 将使用 iterate_in_threadpool()
运行迭代器/生成器一个单独的线程,然后 await
ed。看看这个详细的答案以及更多细节。
第三,您正在使用
requests
' iter_lines()
函数,它迭代响应数据,一次一行。但是,如果响应数据不包含任何换行符(即\n
),则在客户端接收到整个响应并将其打印为所有的。在这种情况下,您应该改为使用 iter_content()
并根据需要指定 chunk_size
(两种情况都在下面的示例中演示)。
最后,如果您希望
StreamingResponse
在所有浏览器(也包括 Chrome)中工作——就能够在数据流入时看到数据的意义而言——您应该将 media_type
指定为与text/plain
(例如,text/event-stream
),或禁用MIME Sniffing。正如 here 所解释的,浏览器将开始缓冲 text/plain
一定量的响应(大约 1445 字节,如 here 所记录),以检查接收到的内容是否实际上是纯文本。为避免这种情况,您可以将 media_type
设置为 text/event-stream
(用于 服务器发送的事件),或者继续使用 text/plain
,但将 X-Content-Type-Options
响应标头设置为 nosniff
,这将禁用 MIME 嗅探(这两个选项都在下面的示例中演示)。
app.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
def fake_data_streamer():
for i in range(10):
yield b'some fake data\n\n'
time.sleep(0.5)
@app.get('/')
async def main():
return StreamingResponse(fake_data_streamer(), media_type='text/event-stream')
# or, use:
#headers = {'X-Content-Type-Options': 'nosniff'}
#return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain')
test.py(使用 Python
requests
)
import requests
url = "http://localhost:8000/"
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(1024): # or, for line in r.iter_lines():
if chunk:
print(chunk)
httpx
——参见this,以及this和this使用httpx
优于requests
的好处)
import httpx
url = 'http://127.0.0.1:8000/'
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw(): # or, for line in r.iter_lines():
print(chunk)
ask_statesman
函数中,改变yield current_response
声明产生{"data": current_response}
。这将包装每个
带有"data"
键的字典中的响应行。request_handler
函数中,不是直接返回 stream_response
,而是返回一个生成器表达式,该表达式从 ask_statesman
中生成每个响应行,包裹在字典中,如上所示。
这是修改后的代码:import os
import time
import openai
import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
auth_scheme = HTTPBearer()
app = fastapi.FastAPI()
openai.api_key = os.environ["OPENAI_API_KEY"]
def ask_statesman(query: str):
#prompt = router(query)
completion_reason = None
response = ""
while not completion_reason or completion_reason == "length":
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True,
)
for line in openai_stream:
completion_reason = line["choices"][0]["finish_reason"]
if "content" in line["choices"][0].delta:
current_response = line["choices"][0].delta.content
print(current_response)
yield {"data": current_response}
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
if auth_key != "123":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": auth_scheme.scheme_name},
)
else:
return StreamingResponse((line for line in ask_statesman(query)), media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
如果你选择使用 Langchain 与 OpenAI 交互(我强烈推荐),它提供了流方法,它有效地返回一个生成器。
对上面的Chris'代码稍作修改,
api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.llms import OpenAI
llm = OpenAI(
streaming=True,
verbose=True,
temperature=0,
)
app = FastAPI()
def chat_gpt_streamer(query: str):
for resp in llm.stream(query):
yield resp["choices"][0]["text"]
@app.get('/streaming/ask')
async def main(query: str):
return StreamingResponse(chat_gpt_streamer(query), media_type='text/event-stream')
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
同样,您可以使用 httpx 或请求进行测试(再次从 Chris 的代码中复制粘贴):
test.py
import httpx
url = 'http://127.0.0.1:8000/streaming/ask?query=How are you, write in 10 sentences'
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw(): # or, for line in r.iter_lines():
print(chunk)
可能会考虑查看服务器发送事件:https://github.com/sysid/sse-starlette
首先安装库:
pip install sse-starlette
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import time
app = FastAPI()
def data_streamer():
for i in range(10):
yield f"_{i}_".encode("utf-8")
time.sleep(1)
@app.get('/')
async def main():
return EventSourceResponse(data_streamer(), media_type='text/event-stream')