对于 LLM(大语言模型),流式传输已成为一种越来越受欢迎的功能。其理念是在 LLM 生成令牌时就快速返回,而不是等待创建完整响应后再返回全部内容。

对于简单的场景,流式传输实际上非常容易实现,但当涉及到智能体之类的应用时,它会变得复杂,这些智能体有自己的运行逻辑,可能会阻止我们尝试进行流式传输。

本文将探讨 langchain 的流式输出,后面将实现智能体最终结果的流式输出

本次演练将全部使用 qwen2.5:7b 开源大模型。

准备开发环境

  1. 计算机
    本文涉及的所有代码可以在没有显存的环境中执行。 我使用的机器配置为:

    • CPU: Intel i5-8400 2.80GHz
    • 内存: 16GB
  2. Visual Studio Code 和 venv 这是很受欢迎的开发工具,相关文章的代码可以在 Visual Studio Code 中开发和调试。 我们用 pythonvenv 创建虚拟环境, 详见:
    在Visual Studio Code中配置venv

  3. Ollama 在 Ollama 平台上部署本地大模型非常方便,基于此平台,我们可以让 langchain 使用 llama3.1qwen2.5deepseek 等各种本地大模型。详见:
    在langchian中使用本地部署的llama3.1大模型

大语言模型(LLM) 的流式输出

当我们用提示词调用 LLM(大语言模型) 时,LLM 通常原本就不是一次性返回结果的。只是有时候为了方便使用和其它原因,通常 langchain 给出了 invoke 这样特别好用的API。

一次返回全部内容

我们先用 invoke 实现与 LLM 聊天,为后面的内容做一点铺垫:

def chat(llm_model_name,question):
    """与大模型聊天,一次性输出"""
    model = ChatOllama(model=llm_model_name,temperature=0.3,verbose=True)
    response = model.invoke([HumanMessage(content=question)])
    print(f'AI:\n{response.content}')

现在我们可以问一个问题:

chat("qwen2.5","中国有多少个地级市?")

结果会一次性返回:

截至2021年底,中国的地级行政区划共有337个地级市。这个数字包括了直辖市、副省级市、计划单列市以及一般地级市。需要注意的是,随着城市的发展和行政区划的调整,具体数量可能会有所变化。最新的准确数据建议查询官方发布的最新统计资料以获得最准确的信息。

stream 方法实现流式输出

stream 方法是 langchain 提供的最简单的实现流式输出的方式。我们来试一下,先定义一个测试方法:

def chat_stream(llm_model_name,question):
    """与大模型聊天,流式输出"""
    model = ChatOllama(model=llm_model_name,temperature=0.3,verbose=True)
    for chunk in model.stream([HumanMessage(content=question)]):
        if isinstance(chunk, AIMessage) and chunk.content !='':
            print(chunk.content,end="^")

在上面的代码中,我们用 ^ 做分隔符分割每次 LLM 返回的内容。用同样的问题调用此函数后,返回的内容如下:

截至^2^0^2^1^年底^^中国的^^^^数量^^3^3^7^^^这个^数字^包括^^3^3^5^^^^^^其中^包含^2^7^6^^^^^^4^9^^地区^^8^^自治^^^以及^2^^^省级^^^深圳^^大连^)。^需要注意^的是^^这一^数据^可能会^随着^行政区^^^调整^^有所^变化^^

用回调函数实现流式输出

除了上述方式外,在 langchain 中还可以使用回调函数实现流式输出。

要想让智能体可以流式输出最终结果,驱动它的 LLM 首先得通过回调函数支持流式输出。

我们可以直接用 StreamingStdOutCallbackHandler 作为回调函数,实现流式传输,参见下面的代码:

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

def chat_stream_2(llm_model_name,question):
    """与大模型聊天,流式输出"""
    model = ChatOllama(model=llm_model_name,temperature=0.3,verbose=True,callbacks=[StreamingStdOutCallbackHandler()])
    model.invoke([HumanMessage(content=question)])

我们再用同样的问题试一下,可以看到输出结果:

截至2023年,中国的地级市数量大约为337个。这个数字会随着行政区划的调整而有所变化。请注意,这里包括了自治州、地区行署等在内的地级行政区划单位。如果您需要最新的准确数据,建议查询官方发布的最新统计信息。

上面的内容看起来和一次性返回的结果一样,但是您要是自己实验会发现:这些内容是一批一批显示出来的,与我们通过网页使用那些大模型的效果类似。

现在出现了一个问题:是否可以像调用 stream 方法一样,给流式输出的内容增加一个分隔符

当然是可以的,我们可以通过继承 StreamingStdOutCallbackHandler 类,把它定制一下即可。

class CustomStreamingHandler(StreamingStdOutCallbackHandler):
    """自定义流式回调处理器,在流式输出时使用 ^ 作为分隔符"""

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        """重写方法,修改输出格式"""
        print(token, end="^", flush=True)  # 使用 `^` 作为分隔符

def chat_stream_3(llm_model_name,question):
    """与大模型聊天,流式输出"""
    model = ChatOllama(model=llm_model_name,temperature=0.3,verbose=True,callbacks=[CustomStreamingHandler()])
    model([HumanMessage(content=question)])

可以实际测试一下上述函数,会发现它会以流的方式输出了 ^ 分割的内容:

截至^2^0^2^1^年底^,^中国的^地^级^市^数量^为^3^3^7^个^。^这个^数字^包括^了^2^7^6^个^地^级^市^、^4^5^个^地区^、^3^8^个^自治^州^和^8^个^盟^。^需要注意^的是^,^这一^数据^可能会^随着^行政区^划^的^调整^而^有所^变化^。^^

智能体(Agent) 的流式输出

智能体(Agent) 的流式输出就比较复杂些了。

创建智能体

我们先创建一个智能体,这个智能体可以调度 llm-math 工具做数学计算,也可以进行其它对话。

llm_model_name = "qwen2.5"
model = ChatOllama(model=llm_model_name,temperature=0.3,verbose=True,callbacks=[CustomStreamingHandler()])

from langchain_community.agent_toolkits.load_tools import load_tools

# 创建一个工具来观察它如何影响流的输出
tools = load_tools(["llm-math"], llm=model)

from langchain.agents import AgentType, initialize_agent

# 创建智能体
agent = initialize_agent(
    agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
    tools=tools,
    llm=model,
    memory=memory,
    verbose=True,
    max_iterations=3,
    early_stopping_method="generate",
    return_intermediate_steps=False
)

利用 LLM 的回调函数

在创建这个智能体的时候,我们给 LLM 设置了回调函数 CustomStreamingHandler() ,因为它以 ^ 做分隔符输出内容,所以我们可以很直观的看到智能体是如何处理输出流的。

下面定义测试方法:

def chat_agent(quesion):
    """与智能体聊天,它会把所有内容都流式输出"""
    agent.invoke(quesion)

我们用同样的问题测试,会发现它会输出以下内容:

```^json^
{
   ^ "^action^":^ "^Final^ Answer^",
   ^ "^action^_input^":^ "^根据^最新的^统计数据^,^中国的^地^级^市^数量^大约^为^3^0^0^个^左右^。^但^请注意^,^这个^数字^可能会^随着^行政区^划^的^调整^而^有所^变化^。^"
}
```^^

显然,这个智能体的输出是一个 json ,但是整个输出都被流式输出了:) 如果我们问一个数学问题,例如:“9的平方是多少?",智能体会把整个思考的过程都以流的方式输出:

```^json^
{
   ^ "^action^":^ "^Calculator^",
   ^ "^action^_input^":^ "^9^**^2^"
}
```^^
```text^
^9^**^2^
^```
^...^num^expr^.evaluate^("^9^**^2^")^...
^^
Observation: Answer: 81
Thought:```^json^
{
   ^ "^action^":^ "^Final^ Answer^",
   ^ "^action^_input^":^ "^9^的^平方^是^8^1^。^"
}
```^^

使用 FinalStreamingStdOutCallbackHandler

现在看只使用 LLM 的回调函数无法流式输出智能体最终结果,那我们就用 AgentExecutorFinalStreamingStdOutCallbackHandler 做回调函数试试,看字面意思是“把智能体结果流式输出的意思”。

def chat_agent_2(quesion):
    """与智能体聊天,它会把所有内容都流式输出"""
    from langchain.callbacks.streaming_stdout_final_only import (
        FinalStreamingStdOutCallbackHandler,
    )

    agent.agent.llm_chain.llm.callbacks = [
        FinalStreamingStdOutCallbackHandler(
            answer_prefix_tokens=["Final", "Answer"]   # 流式输出 Final 和 Answer 后面的内容 
        )
    ]

    agent.invoke(quesion)

这次我们依然使用问题:“9的平方是多少?“来测试:

Observation: Answer: 81
Thought:",
    "action_input": "9的平方是81。"
}

这次输出的内容少了一些,而且 "action_input": "9的平方是81。" 是以流的方式输出的。

流式输出智能体的最终结果

我们的最终结果是 只把最终目标:“9的平方是81。“以流的方式返回。现在看 langchain 提供的 FinalStreamingStdOutCallbackHandler 不完美,那我们只能自定义智能体的回调来解决了。

下面继承 StreamingStdOutCallbackHandler 定义回调类:

class CallbackHandler(StreamingStdOutCallbackHandler):
    """自定义输出"""
    def __init__(self):
        self.content: str = ""
        self.final_answer: bool = False

    def on_llm_new_token(self, token: str, **kwargs: any) -> None:
        """智能体会逐渐返回json格式的结果,这里只输出 action_input 的内容"""
        self.content += token
        if "Final Answer" in self.content:
            # 现在我们到了 Final Answer 部分,但不要打印。
            self.final_answer = True
            self.content = ""
        if self.final_answer:
            if '"action_input": "' in self.content:

                # 当字符串中包含 '}' 时,移除 '}' 和后面的字符。
                index = token.find('}')  # 找到 '}' 的索引
                if index != -1:
                    self.final_answer = False
                    token = token[:index]

                sys.stdout.write(token) 
                if index == -1:
                    sys.stdout.write('^')
                sys.stdout.flush()

在这个类中,流式输出的内容会用 ^ 做间隔,测试起来比较直观。

定义测试方法:

def chat_agent_3(quesion):
    """与智能体聊天,它会把所有内容都流式输出"""
    agent.agent.llm_chain.llm.callbacks =[CallbackHandler()]
    agent(quesion)

依然用同样的问题做测试,结果为:

Observation: Answer: 81
Thought: "^9^的^平方^是^8^1^。^"

显然,这个结果最理想,只把 “9的平方是81。“做流式输出了,nice:)

总结

通过本文的演练,我们发现在 langchain 中,让 LLM(大语言模型) 做流式输出是比较容易的,定制流式输出的细节也不复杂。
而让智能体(Agent)做流式输出比较复杂:需要驱动智能体的大模型首先支持流式输出,如果让智能体只流式输出最终的结果,则需要继承 StreamingStdOutCallbackHandler 自定义回调类来实现。
如果要控制智能体的细节,可能用 langgraph 实现更好。


代码

本文涉及的所有代码以及相关资源都已经共享,参见:

为便于找到代码,程序文件名称最前面的编号与本系列文章的文档编号相同。

参考

🪐感谢您观看,祝好运🪐