使用 Pyton Flask API 传输来自 LangChain 的 OpenAI 的响应

问题描述 投票:0回答:2

我正在使用 Python Flask 应用程序进行数据聊天。因此,在控制台中,我可以直接从 OpenAI 获得可流式响应,因为我可以使用标志

streaming=True
来启用流处理。

问题是,我无法在 API 调用中“转发”流或“显示”流。

处理OpenAI和链的代码是:

def askQuestion(self, collection_id, question):
        collection_name = "collection-" + str(collection_id)
        self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')
        
        chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)


        self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
                                                            return_source_documents=True,verbose=VERBOSE, 
                                                            memory=self.memory)
        

        result = self.chain({"question": question})
        
        res_dict = {
            "answer": result["answer"],
        }

        res_dict["source_documents"] = []

        for source in result["source_documents"]:
            res_dict["source_documents"].append({
                "page_content": source.page_content,
                "metadata":  source.metadata
            })

        return res_dict`

以及API路由代码:

@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
    question = request.form["question"]
    # response_generator = document_thread.askQuestion(collection_id, question)
    # return jsonify(response_generator)

    def stream(question):
        completion = document_thread.askQuestion(collection_id, question)
        for line in completion['answer']:
            yield line

    return app.response_class(stream_with_context(stream(question)))

我正在使用curl测试我的端点,并且我将标志-N传递给curl,所以如果可能的话,我应该得到可流式响应。

当我首先进行 API 调用时,端点正在等待处理数据(我可以在 VS 代码的终端中看到可流式答案),完成后,我会一次性显示所有内容。

谢谢

python flask openai-api langchain
2个回答
3
投票

通过使用

threading
callback
,我们可以从 Flask API 获得流式响应。

在flask API中,您可以通过langchain的回调创建一个队列来注册代币。

class StreamingHandler(BaseCallbackHandler):
    ...

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.queue.put(token)

您可以从烧瓶路线中的同一队列中获得

get
令牌。

from flask import Response, stream_with_context
import threading 

@app.route(....):
def stream_output():
   q = Queue()
   
   def generate(rq: Queue):
      ...
      # add your logic to prevent while loop
      # to run indefinitely  
      while( ...):
          yield rq.get()
   
   callback_fn = StreamingHandler(q)
   
   threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
   return Response(stream_with_context(generate(q))

在你的langchain的

ChatOpenAI
中添加上面的自定义回调
StreamingHandler

self.llm = ChatOpenAI(
  model_name=self.model_name, 
  temperature=self.temperature, 
  openai_api_key=os.environ.get('OPENAI_API_KEY'), 
  streaming=True, 
  callback=[callback_fn,]
)

供参考:


0
投票

不确定我的答案是否更清晰,但与@varunsinghal的答案类似,希望这有帮助:)

token_queue = Queue() #from queue import Queue

class LLMTokenQueueHandler(BaseCallbackHandler): 
    """
    This is to change the behavior of LLMChain to 
    store the outputted tokens to a queue
    """ 
    def on_llm_new_token(
        self, 
        token: str, 
        **kwargs
        ) -> None:    
        token_queue.put({"type": "token", "value": token})  

    def on_llm_end(
        self, 
        response: LLMResult, 
        **kwargs
        ) -> None:
        token_queue.put({'type': 'end'})

#adding the LLMTokenQueueHandler to the callback manager
#so now the tokens are automatically stored into token_queue
gptchat = ChatOpenAI(
    model_name='model_name', 
    temperature= 0.25, 
    openai_api_key=os.environ.get('OPENAI_API_KEY'), 
    streaming = True,
    callback_manager=CallbackManager([LLMTokenQueueHandler()])
)

def stream_tokens():  
    """Generator function to stream tokens."""  
    while True:  
        # Wait for a token to be available in the queue and retrieve it  
        token_dict = token_queue.get()  
        print("token_dict: ", token_dict)

        if token_dict["type"] == "token":
            # Format the message as an SSE event  
            yield token_dict['value'].encode('utf-8')

        #we need to implement when streaming ends
        #with the 'end' token, then break out of loop
        elif token_dict["type"] == "end":
            break


@app.route('/stream', methods=['POST'])  
def stream_text_response():  
    """
    Stream text response with memory
    """
    input_json = request.get_json()  
    input_query = input_json.get('stream', '')  
  
    # Start generate_text_response in a separate thread to avoid blocking  
    threading.Thread(
        target=generate_text_response, 
        args=(input_query,)
    ).start()  
  
    # Stream tokens back to the client as they are produced
    # not streaming generate_text_response as it doesn't produce
    # the streamed tokens directly  
    return Response(
        stream_with_context(stream_tokens())
    )  
© www.soinside.com 2019 - 2024. All rights reserved.