我创建了一个简单的 FastAPI 应用程序,用于将文档更新/上传到 Mac OSX 上的 ChromaDB Vectorstore,以便进行简单的查询搜索。这是下面的代码,
import asyncio
from fastapi import BackgroundTasks, FastAPI
from langchain.document_loaders import DirectoryLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.chroma import Chroma
app = FastAPI()
directory = "pets"
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
def load_docs(directory):
return [Document(page_content="Hi, My name is Tom. My job is to collect tickets.", metadata={"source": "tom"})]
def split_docs(documents, chunk_size=1000, chunk_overlap=20):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
docs = text_splitter.split_documents(documents)
return docs
@app.post("/update")
def update():
print("loading docs")
documents = load_docs(directory)
print("splitting docs")
docs = split_docs(documents)
print("Index updating..")
db = Chroma.from_documents(docs, embedding, persist_directory="chromadb")
db.persist()
print('Done.')
return {"status": "done"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
如果我直接运行上述脚本,索引更新在调用
/update
端点时工作得非常好。
(venv) $ python test.py
INFO: Started server process [32951]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8001 (Press CTRL+C to quit)
loading docs
splitting docs
Index updating..
Done.
INFO: 127.0.0.1:53373 - "POST /update HTTP/1.1" 200 OK
但是如果我使用
gunicorn
和多个 UvicornWorker
运行相同的代码,它会抛出
The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec().
错误。
(venv) $ gunicorn test:app -w 4 -k uvicorn.workers.UvicornWorker --preload
[2023-12-27 11:44:06 +0530] [33014] [INFO] Starting gunicorn 21.2.0
[2023-12-27 11:44:06 +0530] [33014] [INFO] Listening at: http://127.0.0.1:8000 (33014)
[2023-12-27 11:44:06 +0530] [33014] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-12-27 11:44:06 +0530] [33022] [INFO] Booting worker with pid: 33022
[2023-12-27 11:44:06 +0530] [33023] [INFO] Booting worker with pid: 33023
[2023-12-27 11:44:06 +0530] [33022] [INFO] Started server process [33022]
[2023-12-27 11:44:06 +0530] [33022] [INFO] Waiting for application startup.
[2023-12-27 11:44:06 +0530] [33022] [INFO] Application startup complete.
[2023-12-27 11:44:06 +0530] [33023] [INFO] Started server process [33023]
[2023-12-27 11:44:06 +0530] [33023] [INFO] Waiting for application startup.
[2023-12-27 11:44:06 +0530] [33023] [INFO] Application startup complete.
[2023-12-27 11:44:06 +0530] [33024] [INFO] Booting worker with pid: 33024
[2023-12-27 11:44:06 +0530] [33024] [INFO] Started server process [33024]
[2023-12-27 11:44:06 +0530] [33024] [INFO] Waiting for application startup.
[2023-12-27 11:44:06 +0530] [33024] [INFO] Application startup complete.
[2023-12-27 11:44:06 +0530] [33025] [INFO] Booting worker with pid: 33025
[2023-12-27 11:44:06 +0530] [33025] [INFO] Started server process [33025]
[2023-12-27 11:44:06 +0530] [33025] [INFO] Waiting for application startup.
[2023-12-27 11:44:06 +0530] [33025] [INFO] Application startup complete.
loading docs
splitting docs
Index updating..
The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec().
Break on __THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY___YOU_MUST_EXEC__() to debug.
[2023-12-27 11:44:32 +0530] [33014] [ERROR] Worker (pid:33025) was sent SIGSEGV!
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
[2023-12-27 11:44:32 +0530] [33047] [INFO] Booting worker with pid: 33047
规格:
OS: MacOsx Ventura
Python Version: 3.10.1
gunicorn: 21.2.0
PS:我希望包含
--preload
选项。
如有任何帮助,我们将不胜感激。我还将为工作解决方案提供赏金!
看来 langchain 包会导致分叉多重处理出现问题。 我已将该部分提取到一个单独的模块中。
测试.py
from fastapi import FastAPI
app = FastAPI()
@app.post("/update")
def update():
from update_index import update_index
return update_index()
# gunicorn test:app -w 4 -k uvicorn.workers.UvicornWorker --preload
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
更新索引.py
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.chroma import Chroma
from langchain_core.documents.base import Document
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
directory = "pets"
def load_docs(directory):
return [Document(page_content="Hi, My name is Tom. My job is to collect tickets.", metadata={"source": "tom"})]
def split_docs(documents, chunk_size=1000, chunk_overlap=20):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
docs = text_splitter.split_documents(documents)
return docs
def update_index():
documents = load_docs(directory)
print("splitting docs")
docs = split_docs(documents)
print("Index updating..")
db = Chroma.from_documents(docs, embedding, persist_directory="chromadb")
db.persist()
print('Done.')
return {"status": "done"}