本文将基于上一篇文章完善 Agent(智能体) ,主要完善的功能有:

  • 跟踪 Agent(智能体) 的执行过程
  • 记录消息历史

此次我们使用 qwen2.5LLM(大语言模型) 驱动 Agent(智能体) ,使用 shaw/dmeta-embedding-zh 做中文嵌入和检索。

准备

在正式开始撸代码之前,需要准备一下编程环境。

  1. 计算机
    本文涉及的所有代码可以在没有显存的环境中执行。 我使用的机器配置为:

    • CPU: Intel i5-8400 2.80GHz
    • 内存: 16GB
  2. Visual Studio Code 和 venv 这是很受欢迎的开发工具,相关文章的代码可以在 Visual Studio Code 中开发和调试。 我们用 pythonvenv 创建虚拟环境, 详见:
    在Visual Studio Code中配置venv

  3. Ollama 在 Ollama 平台上部署本地大模型非常方便,基于此平台,我们可以让 langchain 使用 llama3.1qwen2.5deepseek 等各种本地大模型。详见:
    在langchian中使用本地部署的llama3.1大模型

创建智能体

本智能体根据用户的问题决策调用哪个工具回答还是不使用工具直接回答。它要调度两个工具:

  • 查询天气预报
  • 检索本地知识库,其中包含了关于 elasticsearch 的知识
"""
1. 查天气的工具
"""
from langchain_core.tools import tool

@tool(parse_docstring=True)
def get_wheather_info(
    city_name: str = ''  #不设置默认值可能导致LLM强行解析city_name出错或者强行调用这个tool
) -> str:
    """获取某个城市的天气信息。如果没有可靠的依据来确定 city_name,则不要调用 get_wheather_info!

    Args:
        city_name: 城市名称。        
    """
    print(f'Getting weather information for:{city_name}')
    if not city_name:
        return "缺少 city_name 参数,无法检索天气信息。"        
        """
        **这个返回很重要**
        返回错误后,agent会放弃这个结果,用自己的能力回答问题,这样结果更加合理;
        否则,agent会使用空的city_name调用这个tool,并且会拼凑出new york的天气或者别的天气方面的信息。
        """
    else:
        return f"{city_name}的气温是25摄氏度。"

"""
2. 创建一个检索器
"""

def create_retriever(embed_model_name):
    """创建检索器"""

    persist_directory = get_persist_directory(embed_model_name)
    db = LocalVectorDBChroma(embed_model_name,persist_directory)

    # 基于Chroma 的 vector store 生成 检索器
    vector_store = db.get_vector_store()
    retriever = vector_store.as_retriever(
        search_type="similarity",
        search_kwargs={"k": 2},
    )
    return retriever

"""
3. 创建工具集 tools
"""

from langchain.tools.retriever import create_retriever_tool

def create_tools(embed_model_name):
    """创建工具集"""

    retriever_tool = create_retriever_tool(
        create_retriever(embed_model_name),
        "elastic_search",
        "只有当您搜索有关 elasticsearch 的知识时才能使用此工具!",
    )

    tools = [get_wheather_info, retriever_tool]
    return tools

"""
4. 创建智能体
"""

from langchain_core.prompts import ChatPromptTemplate
from langchain.agents import create_tool_calling_agent
from langchain_ollama import ChatOllama

def create_agent(llm_model_name,embed_model_name):
    """创建智能体"""

    from langchain_core.tools import render_text_description

    tools = create_tools(embed_model_name)
    #rendered_tools = render_text_description(tools)
    #print(rendered_tools)
    
    # 此prompt是基于hwchase17/openai-functions-agent修改的
    systemprompt = """\
    您是一名助理,有权使用以下工具集。
    下面是每个工具的名称和说明:

    [get_wheather_info, elastic_search]

    - **仅在需要时使用上述工具集!**
    - 如果没有可靠的依据来确定 city_name,则不要调用 get_wheather_info!
    """ 
    prompt = ChatPromptTemplate([
        ("system", systemprompt),
        ("placeholder", "{chat_history}"),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}"),
    ])

    llm = ChatOllama(model=llm_model_name,temperature=0,verbose=True)
    agent = create_tool_calling_agent(llm, tools, prompt)
    return agent

def create_agent_executor(llm_model_name,embed_model_name):
    """创建agent_executor"""

    tools = create_tools(embed_model_name)
    agent = create_agent(llm_model_name,embed_model_name)    

    agent_executor = AgentExecutor(agent=agent, tools=tools)
    """实际上在create_agent中已经创建了tools,这里还要再传入tools,似乎有点多余。"""

    return agent_executor

如果您想详细了解并实践上述代码,可以参考:本地大模型编程实战(15)初探智能体Agent(2)

流式输出

LLM(大语言模型) 中可以使用 stream 方法,它返回的是一个迭代器,可以逐步获取生成的文本块,这样用户可以看到AI的文本会一批一批的吐出来,体验比较好。
Agent(智能体) 中也可以使用 stream 方法,它返回的是一个迭代器,可以把智能体执行的步骤逐渐的打印出来,非常便于观察智能体的执行情况。 我们先定义智能体的流式输出方法:

def test_agent_executor_stream(llm_model_name,embed_model_name,queries):
    """测试AgentExecutor"""

    print(f'--------{llm_model_name}----------')

    agent_executor = create_agent_executor(llm_model_name,embed_model_name)

    for query in queries:
        for chunk in agent_executor.stream({"input": query}):
            print(chunk)
            print("----")

下面我们用3个问题,看看智能体的表现:

  • 问题1:“你好,你擅长能做什么?”
{'output': '我可以帮助您查询天气信息、或者在特定情况下搜索相关信息。请告诉我您需要什么具体帮助呢?例如,您可以询问某个城市的天气情况,或者有关 Elasticsearch 的问题。', 'messages': [AIMessage(content='...', additional_kwargs={}, response_metadata={})]}
----

智能体回答它的能力的时候,把它可以调度的工具的功能重点说出来了,但是并未调用任何工具,直接回答了问题。

  • 问题2:“上海的天气怎么样?”
{'actions': [ToolAgentAction(tool='get_wheather_info', tool_input={'city_name': '上海'}, log=... 'type': 'tool_call_chunk')]}
----
Getting weather information for:上海
{'steps': [AgentStep(action=ToolAgentAction(tool='get_wheather_info', tool_input={'city_name': '上海'}, log=... observation='上海的气温是25摄氏度。')], 'messages': [FunctionMessage(content='上海的气温是25摄氏度。', ..., name='get_wheather_info')]}
----
{'output': '上海现在的温度是25摄氏度。', 'messages': [AIMessage(content='上海现在的温度是25摄氏度。', additional_kwargs={}, response_metadata={})]}
----

我们可以看到智能体首先生成了 tool_call_chunk ,然后调用了 get_wheather_info ,最后再根据 get_wheather_info 的返回信息进行了妥善的回复。

  • 问题3:“如何实现elasticsearch的深度分页?”
{'actions': [ToolAgentAction(tool='elastic_search', tool_input={'query': 'Elasticsearch Scroll API'}, log=..., tool_call_chunks=[{'name': 'elastic_search', 'args': '{"query": "Elasticsearch Scroll API"}', 'id': '92fe0dcc-d12f-490e-b47f-6e91bd75a47a', ...}])]}
----
{'steps': [AgentStep(action=ToolAgentAction(tool='elastic_search', tool_input={'query': 'Elasticsearch Scroll API'}, log=... 总结#\n经过实际测试,在对几万文档分页时,速度挺快的;对几十万量级文档进行分页时,还是比较慢;因为skip次数多也比较消耗时 间。\n如果文档量达到百万计,要从操作使用方面着手进行设计,避免深度分页时skip太多次。...)]}
----
{'output': '在 Elasticsearch 中,使用 `search_after` 实现深度分页是一种有效的方法。这种方法避免了因结果窗口过大而引发的问题,并且可以处理大量数据。\n\n### 准备工作\n\n首先需要确定一个唯一标识文档的键(类似于关系数据库中的主键),并以此进行排序。例如:...)]}
----

与上一个问题类似,智能体首先生成了 tool_call_chunk ,然后调用了 elastic_search ,最后再根据 elastic_search 的返回的文档内容进行了妥善的回复。

记录消息历史

我们知道,大模型实际上是没有真正的记忆的,在与 LLM(大语言模型) 交互的时候,通常是在提示词中将消息历史记录传递给大模型,大模型才能根据上下文推理出合理的结果。
例如:我们问大模型两个问题:

queries = ["您好,我叫刘大钧。","请问我叫什么名字?"]

在我们直接问大模型第2个问题时,大模型是答不出来的;如果我们问第2个问题时,将第1个问题的内容也告诉大模型,大模型就可以准确的作答。
langchain 提供了 ChatMessageHistory 类来处理消息历史,我们用它完善一下 agent_executor

def create_agent_executor_with_history(llm_model_name,embed_model_name):
    """创建记录聊天历史的智能体"""
    store = {}

    def get_session_history(session_id: str) -> BaseChatMessageHistory:
        if session_id not in store:
            store[session_id] = ChatMessageHistory()
        return store[session_id]
    
    agent_executor = create_agent_executor(llm_model_name,embed_model_name)
    agent_with_chat_history = RunnableWithMessageHistory(
        agent_executor,
        get_session_history,
        input_messages_key="input",
        history_messages_key="chat_history",
    )
    return agent_with_chat_history

我们再定义一个函数来测试大模型:

def test_agent_with_chat_history(llm_model_name,embed_model_name,session_id,queries):
    """测试记录会话功能"""

    print(f'--------{llm_model_name}----------')

    agent_with_chat_history = create_agent_executor_with_history(llm_model_name,embed_model_name)    

    conf = {"configurable": {"session_id": session_id}}
    for query in queries:
        r = agent_with_chat_history.invoke(
            {"input": query},
            config=conf,
        )
        print(f'agent_with_chat_history.invoked:\n{r}')
{'input': '您好,我叫刘大钧。', 'chat_history': [], 'output': '您好,刘大钧!有什么问题或帮助需要提供给您吗?'}
{'input': '请问我叫什么名字?', 'chat_history': [HumanMessage(content='您好,我叫刘大钧。', additional_kwargs={}, response_metadata={}), AIMessage(content='您好,刘大钧!有什么问题或帮助需要提供给您吗?', additional_kwargs={}, response_metadata={})], 'output': '您叫刘大钧。'}

可以看到在智能体回答第2个问题时,chat_history 中已经包含了第1个问题的消息,智能体根据这个信息顺利得的推理出提问者的名字。

控制消息历史长度

上面的做法虽然自动记录了消息历史,但是有一个问题:如果消息历史不断累积,容易不断消耗内存,也容易超出大语言模型的 token 限制。

控制消息历史长度

我们可以自定义一个类,用于控制消息历史长度,主要代码如下:

class MessageHistory(ChatMessageHistory):
    """
    扩展的聊天历史记录类。可以限制聊天记录的最大长度。

    Args:
        max_size: 设置为偶数。因为User和AI的消息会分别记录为1条,设置为偶数后,User和AI才会成对。
    """

    def __init__(self, max_size: int):        
        super().__init__()       
        self._max_size = max_size 

    def add_message(self, message: BaseMessage):
        super().add_message(message)  

        # 保持聊天记录在限制范围内
        if len(self.messages) > self._max_size:
            print('消息超限,马上压缩!')
            self.messages = self.messages[-self._max_size:]

class SessionHistory(object):
    """
    处理消息历史
    """
    def __init__(self,max_size: int):
        super().__init__()
        self._max_size = max_size
        self._store = {}

    def process(self,session_id: str) -> BaseChatMessageHistory:
        """
        处理聊天历史
        """
        if session_id not in self._store:
            self._store[session_id] = MessageHistory(max_size=self._max_size)
        return self._store[session_id]

    def print_history(self,session_id):
        """
        查看聊天历史记录
        """
        print("显示聊天历史记录...")
        for message in self._store[session_id].messages:
            if isinstance(message, AIMessage):
                prefix = "AI"
            else:
                prefix = "User"

            print(f"{prefix}: {message.content}\n")

现在我们可以使用上面的类重新定义智能体:

def create_agent_executor_with_history_2(llm_model_name,embed_model_name):
    """可以控制聊天历史长度的智能体"""
    
    # 处理聊天历史
    from common.LimitedChatMessageHistory import SessionHistory
    session_history = SessionHistory(max_size=10)

    def get_session_history(session_id: str) -> BaseChatMessageHistory:
        return session_history.process(session_id)
    
    agent_executor = create_agent_executor(llm_model_name,embed_model_name)
    agent_with_chat_history = RunnableWithMessageHistory(
        agent_executor,
        get_session_history,
        input_messages_key="input",
        history_messages_key="chat_history",
    )
    return agent_with_chat_history

智能截取消息历史

上面的办法很简单直接,不过不智能,我们可以使用 langchaintrim_messages 智能截取消息历史。
先定义截取方法:

def get_trimmer(model_name,max_tokens):
    """
    重要:请务必在在加载之前的消息之后,并且在提示词模板之前使用它。
    """
    model = ChatOllama(model=model_name,temperature=0.3,verbose=True)
    trimmer = trim_messages(
        max_tokens=max_tokens,  #设置裁剪后消息列表中允许的最大 token 数量
        strategy="last",        #指定裁剪策略为保留最后的消息,即从消息列表的开头开始裁剪,直到满足最大 token 数量限制。
        token_counter=model,    #通过model来计算消息中的 token 数量。
        include_system=True,    #在裁剪过程中包含系统消息(SystemMessage)
        allow_partial=False,    #不允许裁剪出部分消息,即要么保留完整的消息,要么不保留,不会出现只保留消息的一部分的情况。
        start_on="human",   #从人类消息(HumanMessage)开始进行裁剪,即裁剪时会从第一个HumanMessage开始计算 token 数量,之前的系统消息等也会被包含在内进行整体裁剪考量。
    )
    return trimmer

然后我们在 MessageHistory 中使用这个 trimmer 就好:

class MessageHistory(ChatMessageHistory):
    """
    扩展的聊天历史记录类。可以限制聊天记录的最大长度。

    Args:
        max_size: 设置为偶数。因为User和AI的消息会分别记录为1条,设置为偶数后,User和AI才会成对。
    """

    def __init__(self, model_name:str,max_size: int):        
        super().__init__()   
        self._model_name = model_name    
        self._max_size = max_size 

    def add_message(self, message: BaseMessage):
        super().add_message(message)  

        # 保持聊天记录在限制范围内
        t = get_trimmer(self._model_name,self._max_size)
        messages_trimed = t.invoke(self.messages)
        self.messages.clear()
        self.messages = messages_trimed

trim_messages 的具体使用方法可参见:本地大模型编程实战(09)自制聊天机器人(3)

总结

到现在为止,我们操练制作的智能体已经可以调度两个工具,并且具有自动保持消息历史的功能。显然,它可以扩展到支持更多工具。

代码

本文涉及的所有代码以及相关资源都已经共享,参见:

为便于找到代码,程序文件名称最前面的编号与本系列文章的文档编号相同。

🪐感谢您观看,祝好运🪐