我已经使用 Langchain 构建了一个 RAG 应用程序,现在想使用 FastAPI 来部署它。一般来说,它的工作原理是调用 FastAPI 端点,并且 LCEL 链的答案得到流式传输。不过,我希望实现我的答案被流式传输,如果流式传输完成,我想返回源文档。这是代码,在调用端点时流式传输正在工作。目前我正在生成 source_documents 但我不希望用户看到它们。我想在用户看到源文档之前对其进行预处理:
# example endpoint call: `http://127.0.0.1:8000/rag_model_response?question=Welche%203%20wesentlichen%20Merkmale%20hat%20die%20BCMS%20Leitlinie%3F`
# this example call streams the response perfectly in the browser
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large-instruct", model_kwargs={'device': "mps"})
db = FAISS.load_local("streamlit_vectorstores/vectorstores/db_maxiw_testfreitag", embeddings, allow_dangerous_deserialization=True)
retriever = db.as_retriever(search_kwargs={'k': cfg.STREAMLIT_VECTOR_COUNT, 'score_threshold': cfg.SCORE_THRESHOLD,'sorted': True}, search_type="similarity_score_threshold")
model_path = cfg.MIXTRAL_PATH
llm = build_llm(model_path) # loads a model from Llamacpp with streaming enabled
def rag_model_response(question: str):
start_time = time.time()
context = retriever.get_relevant_documents(question)
response_dict = {"question": question, "result": "", "source_documents": []}
rag_prompt = f"""<s> [INST] Du bist RagBot, ein hilfsbereiter Assistent. Antworte nur auf Deutsch:
{context}
{question}
Antwort: [/INST]
"""
result_content = ""
first_response = True
for resp in llm.stream(rag_prompt):
if resp:
result_content += resp
if first_response:
# Calculate and print time after the first batch of text is streamed
end_time = time.time()
elapsed_time = round(end_time - start_time, 1)
first_response = False
yield f"(Response Time: {elapsed_time} seconds)\n"
yield resp
if context:
# yield context # hier aufgehört
yield "\n\nQuellen:\n"
for i, doc in enumerate(context):
yield doc.metadata["source"].split("/")[-1] + ", Seite: " + str(doc.metadata["page"]+1) + "\n\n"
response_dict["source_documents"] = [{"source": doc.metadata["source"], "page": doc.metadata["page"]+1} for doc in context]
else:
yield "\n\nVorsicht, für die vorliegende Antwort wurden keine interne Quellen verwendet, da die Suche nach relevanten Dokumenten kein Ergebnis geliefert hat."
yield response_dict
app = FastAPI(
title="FastAPI for Database Management",
description="An API that handles user Vectordatabase creation or deletion",
version="1.0",)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get('/rag_model_response',response_class=JSONResponse)
async def main(question: str):
return StreamingResponse(rag_model_response(question), media_type='text/event-stream')
所以我的第一个问题是:
另一种解决方案,我认为不是很有效,是我只是创建一个返回源文档的新端点:
@app.get('/source_documents')
async def source_documents(question: str):
source_docs = retriever.get_relevant_documents(question)
return source_docs
但是这样一来,每个问题都会被搜索两次,一次是针对链,一次是针对检索器。
提前致谢!
你看过这个吗:https://python.langchain.com/docs/use_cases/question_answering/streaming/
首先,构建链,以便它也返回检索到的文档 - 因此,如果您调用
invoke
,上下文也会包含在结果中。
接下来,当您进行流式传输时,如果会流式传输相同的值。您可以在输出示例中看到结果包含上下文。
在客户端,您必须知道如何正确处理收到的数据(假设您想对每个部分做不同的事情) - 在输出的正下方有一个 python 示例代码,该代码将聚合分解为您可以使用的单独部分然后使用。
我希望我理解了这个问题并且这个答案有帮助......