RAG 与 Langchain 和 FastAPI:流式传输生成的答案并返回源文档

问题描述 投票:0回答:1

我已经使用 Langchain 构建了一个 RAG 应用程序,现在想使用 FastAPI 来部署它。一般来说,它的工作原理是调用 FastAPI 端点,并且 LCEL 链的答案得到流式传输。不过,我希望实现我的答案被流式传输,如果流式传输完成,我想返回源文档。这是代码,在调用端点时流式传输正在工作。目前我正在生成 source_documents 但我不希望用户看到它们。我想在用户看到源文档之前对其进行预处理:

# example endpoint call: `http://127.0.0.1:8000/rag_model_response?question=Welche%203%20wesentlichen%20Merkmale%20hat%20die%20BCMS%20Leitlinie%3F`
# this example call streams the response perfectly in the browser


from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large-instruct", model_kwargs={'device': "mps"})
db = FAISS.load_local("streamlit_vectorstores/vectorstores/db_maxiw_testfreitag", embeddings, allow_dangerous_deserialization=True)
retriever = db.as_retriever(search_kwargs={'k': cfg.STREAMLIT_VECTOR_COUNT, 'score_threshold': cfg.SCORE_THRESHOLD,'sorted': True}, search_type="similarity_score_threshold")
model_path = cfg.MIXTRAL_PATH
llm = build_llm(model_path) # loads a model from Llamacpp with streaming enabled

def rag_model_response(question: str):
    start_time = time.time()
    context = retriever.get_relevant_documents(question)
    response_dict = {"question": question, "result": "", "source_documents": []}
    rag_prompt = f"""<s> [INST] Du bist RagBot, ein hilfsbereiter Assistent. Antworte nur auf Deutsch: 
{context} 


{question} 


Antwort: [/INST]
"""
    result_content = ""
    first_response = True
    for resp in llm.stream(rag_prompt):
        if resp:
            result_content += resp
            if first_response:
            # Calculate and print time after the first batch of text is streamed
                end_time = time.time()
                elapsed_time = round(end_time - start_time, 1)
                first_response = False 
                yield f"(Response Time: {elapsed_time} seconds)\n"
        yield resp 
    if context:
       # yield context # hier aufgehört
        yield "\n\nQuellen:\n"
        for i, doc in enumerate(context):
            yield doc.metadata["source"].split("/")[-1] + ", Seite: " + str(doc.metadata["page"]+1) + "\n\n"
        response_dict["source_documents"] = [{"source": doc.metadata["source"], "page": doc.metadata["page"]+1} for doc in context]
    else:
        yield "\n\nVorsicht, für die vorliegende Antwort wurden keine interne Quellen verwendet, da die Suche nach relevanten Dokumenten kein Ergebnis geliefert hat." 
    yield response_dict

app = FastAPI(    
    title="FastAPI for Database Management",
    description="An API that handles user Vectordatabase creation or deletion",
    version="1.0",)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get('/rag_model_response',response_class=JSONResponse)
async def main(question: str):
    return StreamingResponse(rag_model_response(question), media_type='text/event-stream')

所以我的第一个问题是:

  • 我需要如何更改我的更改,以便它也返回从检索器检索到的文档?
  • 如何在我的 FastAPI 端点响应中返回这些 source_documents?如果生成的答案能够被流式传输并且源文档能够被返回,那就完美了。也可以进行流式传输,但以某种方式应该可以仅向用户显示生成答案的流式传输。如果流传输完成,我想向用户展示用于生成答案的文档。

另一种解决方案,我认为不是很有效,是我只是创建一个返回源文档的新端点:

@app.get('/source_documents')
async def source_documents(question: str):
    source_docs = retriever.get_relevant_documents(question)
    return source_docs

但是这样一来,每个问题都会被搜索两次,一次是针对链,一次是针对检索器。

提前致谢!

python fastapi langchain llama-cpp-python
1个回答
0
投票

你看过这个吗:https://python.langchain.com/docs/use_cases/question_answering/streaming/

首先,构建链,以便它也返回检索到的文档 - 因此,如果您调用

invoke
,上下文也会包含在结果中。

接下来,当您进行流式传输时,如果会流式传输相同的值。您可以在输出示例中看到结果包含上下文。

在客户端,您必须知道如何正确处理收到的数据(假设您想对每个部分做不同的事情) - 在输出的正下方有一个 python 示例代码,该代码将聚合分解为您可以使用的单独部分然后使用。

我希望我理解了这个问题并且这个答案有帮助......

© www.soinside.com 2019 - 2024. All rights reserved.