使用 langchain 和 websockets 完成流式聊天

问题描述 投票:0回答:2

我不确定我做错了什么,我正在使用长链完成并想将它们发布到我的 WebSocket 房间。使用 BaseCallbackHandler,我可以将令牌打印到控制台,但是,使用 AsyncCallbackHandler 是一个挑战,基本上,似乎什么也没有发生,我尝试打印东西,但是在 init 上打印消息之后,似乎什么也没有发生。

async def send_message_to_room(room_group_name, message):
    print("sending message to room", room_group_name, message)
    channel_layer = get_channel_layer()
    await channel_layer.group_send(
        room_group_name,
        {
            "type": "chat_message",
            "message": message,
        }
    )

class MyCustomHandler(AsyncCallbackHandler):

    def __init__(self, room_group_name):
        self.channel_layer = get_channel_layer()
        self.room_group_name = room_group_name

        print("MyCustomHandler init")

    async def on_llm_new_token(self, token: str, **kwargs):
        print(token)
        await send_message_to_room(self.room_group_name, token)

def generate_cited_answer_stream(roomname, question=question, texts=texts, responsetype="Simple and Pedagogical"
                                       , system_message_with_response_type=system_message_with_response_type
                                       , human_message_with_response_type=human_message_with_response_type):


    handler = MyCustomHandler(room_group_name=roomname)

    chat = ChatOpenAI(temperature=0, streaming=True, callbacks=[handler])

    system_message_with_response_type = SystemMessagePromptTemplate.from_template(system_message_with_response_type)
    human_message_prompt = HumanMessagePromptTemplate.from_template(human_message_with_response_type)

    chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

    prompt_value = chat_prompt.format_prompt(question=question, texts=texts, responsetype=responsetype)

    chat(prompt_value.to_messages())
python django websocket django-channels langchain
2个回答
2
投票

截取一段有效的代码

from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

class TestLangchainAsync(unittest.IsolatedAsyncioTestCase):
    async def test_aiter(self):
        handler = AsyncIteratorCallbackHandler()
        llm = OpenAI(
            temperature=0,
            streaming=True,
            callbacks=[handler],
            openai_api_key="sk-xxxxx",
            openai_proxy="http://127.0.0.1:7890",
        )
        prompt = PromptTemplate(
            input_variables=["product"],
            template="What is a good name for a company that makes {product}?",
        )
        prompt = prompt.format(product="colorful socks")
        asyncio.create_task(llm.agenerate([prompt]))
        async for i in handler.aiter():
            print(i)

参考

https://github.com/hwchase17/langchain/issues/2428#:~:text=AsyncIteratorCallbackHandler%20works.%20这里%20is%20the%20example%20code%3A


0
投票

我也在使用langchain和websockets,这是我的代码,但它似乎不起作用。

class CustomSyncHandler(StreamingStdOutCallbackHandler):

def __init__(self, room) -> None:
    self.room_name = room
    self.response = ""

def on_llm_new_token(self, token, **kwargs) -> None:
    self.response += token
    asyncio.sleep(0.01)
    # async_to_sync(channel_layer.group_send)(self.room_name, {"type": "chat_message", "message": self.response, "stream": True})
    # await channel_layer.group_send(self.room_name, {"type": "chat_message", "message": self.response, "stream": True})
    print("response after sync token: ", self.response)



 class OpenAIDocConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        QA_CHAIN_PROMPT = PromptTemplate.from_template(template)
        handler = CustomSyncHandler(self.room_group_name)
        # handler = AsyncIteratorCallbackHandler()
        llm = OpenAI(streaming=True, callbacks=[handler], temperature=0)
        qa_chain = VectorDBQA.from_chain_type(
            llm=llm,
            return_source_documents=True,
            chain_type_kwargs={"prompt": QA_CHAIN_PROMPT},
            vectorstore=vectordb
        )
        result = qa_chain({"query": question})
        # async for i in handler.aiter():
        #     print(i)
        print("results ------------>", result["result"])

我尝试将 on_llm_new_token 方法更改为异步,但它停止响应,并且在简单的同步功能中,我无法在 websocket 中发送消息,这是我尝试在 websocket 中同步发布数据时遇到的错误具有 async_to_sync 功能。

CustomSyncHandler.on_llm_new_token回调中的错误:您不能在与异步事件循环相同的线程中使用AsyncToSync - 只需直接等待异步函数。

© www.soinside.com 2019 - 2024. All rights reserved.