FastAPI StreamingResponse 不使用生成器函数进行流传输

问题描述 投票:0回答:4

我有一个相对简单的 FastAPI 应用程序,它接受查询并流回来自 ChatGPT API 的响应。 ChatGPT 正在流回结果,我可以看到它在进入时被打印到控制台。

不工作的是

StreamingResponse
通过 FastAPI 返回。相反,响应会一起发送。我真的不知道为什么这不起作用。

这是 FastAPI 应用程序代码:

import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield current_response
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        stream_response = ask_statesman(query)
        return StreamingResponse(stream_response, media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")

这里是非常简单的

test.py
文件来测试这个:

import requests

query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}

response = requests.post(url, params=params, stream=True)

for chunk in response.iter_lines():
    if chunk:
        print(chunk.decode("utf-8"))
python python-requests streaming fastapi openai-api
4个回答
1
投票

首先,使用

POST
请求从服务器请求数据不是好的做法。使用
GET
请求会更适合您的情况。除此之外,您不应该发送凭据,例如
auth_key
作为 URL 的一部分(即,使用 query string),而是您应该使用
Headers
和/或
Cookies 
(使用
HTTPS
)。请查看 this answer 以获取有关标头和 cookie 概念的更多详细信息和示例,以及使用查询参数时所涉及的风险。也可以在herehere以及hereherehere中找到有关此主题的有用帖子。

其次,如果您在

StreamingResponse
的生成器函数中执行阻塞操作(即 I/O 绑定或 CPU 绑定任务),您应该使用
def
定义该函数(就像您目前所做的那样) ) 而不是
async def
,否则,阻塞操作以及生成器内部使用的
time.sleep()
函数将阻塞事件循环。正如 here 所解释的,如果用于流式传输响应主体的函数是普通生成器/迭代器(即
def
)而不是
async def
一个,FastAPI 将使用
iterate_in_threadpool()
运行迭代器/生成器一个单独的线程,然后
await
ed。看看这个详细的答案以及更多细节。

第三,您正在使用

requests
'
iter_lines()
函数,它迭代响应数据,一次一行。但是,如果响应数据不包含任何换行符(即
\n
),则在客户端接收到整个响应并将其打印为所有的。在这种情况下,您应该改为使用
iter_content()
并根据需要指定
chunk_size
(两种情况都在下面的示例中演示)。

最后,如果您希望

StreamingResponse
在所有浏览器(也包括 Chrome)中工作——就能够在数据流入时看到数据的意义而言——您应该将
media_type
指定为与
text/plain
(例如,
text/event-stream
),或禁用MIME Sniffing。正如 here 所解释的,浏览器将开始缓冲
text/plain
一定量的响应(大约 1445 字节,如 here 所记录),以检查接收到的内容是否实际上是纯文本。为避免这种情况,您可以将
media_type
设置为
text/event-stream
(用于 服务器发送的事件),或者继续使用
text/plain
,但将
X-Content-Type-Options
响应标头设置为
nosniff
,这将禁用 MIME 嗅探(这两个选项都在下面的示例中演示)。

工作示例

app.py

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time


app = FastAPI()


def fake_data_streamer():
    for i in range(10):
        yield b'some fake data\n\n'
        time.sleep(0.5)


@app.get('/')
async def main():
    return StreamingResponse(fake_data_streamer(), media_type='text/event-stream')
    # or, use:
    #headers = {'X-Content-Type-Options': 'nosniff'}
    #return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain')

test.py(使用 Python

requests

import requests

url = "http://localhost:8000/"

with requests.get(url, stream=True) as r:
    for chunk in r.iter_content(1024):  # or, for line in r.iter_lines():
        if chunk:
            print(chunk)

test.py(使用

httpx
——参见this,以及thisthis使用
httpx
优于
requests
的好处)

import httpx

url = 'http://127.0.0.1:8000/'
with httpx.stream('GET', url) as r:
    for chunk in r.iter_raw():  # or, for line in r.iter_lines():
        print(chunk)

0
投票
  • ask_statesman
    函数中,改变
    yield current_response
    声明产生
    {"data": current_response}
    。这将包装每个 带有
    "data"
    键的字典中的响应行。
  • request_handler
    函数中,不是直接返回
    stream_response
    ,而是返回一个生成器表达式,该表达式从
    ask_statesman
    中生成每个响应行,包裹在字典中,如上所示。 这是修改后的代码:
import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield {"data": current_response}
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        return StreamingResponse((line for line in ask_statesman(query)), media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")

0
投票

如果你选择使用 Langchain 与 OpenAI 交互(我强烈推荐),它提供了流方法,它有效地返回一个生成器。

对上面的Chris'代码稍作修改,

api.py

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.llms import OpenAI


llm = OpenAI(
    streaming=True,
    verbose=True,
    temperature=0,
)

app = FastAPI()


def chat_gpt_streamer(query: str):
    for resp in llm.stream(query):
        yield resp["choices"][0]["text"]


@app.get('/streaming/ask')
async def main(query: str):
    return StreamingResponse(chat_gpt_streamer(query), media_type='text/event-stream')

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")

同样,您可以使用 httpx 或请求进行测试(再次从 Chris 的代码中复制粘贴):

test.py

import httpx

url = 'http://127.0.0.1:8000/streaming/ask?query=How are you, write in 10 sentences'
with httpx.stream('GET', url) as r:
    for chunk in r.iter_raw():  # or, for line in r.iter_lines():
        print(chunk)

0
投票

可能会考虑查看服务器发送事件:https://github.com/sysid/sse-starlette

首先安装库:

pip install sse-starlette

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import time


app = FastAPI()


def data_streamer():
    for i in range(10):
        yield f"_{i}_".encode("utf-8")
        time.sleep(1)


@app.get('/')
async def main():
    return EventSourceResponse(data_streamer(), media_type='text/event-stream')

© www.soinside.com 2019 - 2024. All rights reserved.