在执行从文本中提取摘要的任务时,难免遇到大文本的情况:文本的长度超出了 LLM(大语言模型) 的token限制。
LangGraph 提供了 map-reduce 工作流,它先把大文本拆成小文档分别提取摘要,然后再提炼/压缩提取的摘要,直到最后提取的摘要的token长度满足既定要求。

LangGraph的map-reduce工作流

本次演练使用 qwen2.5:7b 开源大模型。

准备文档

我们先准备一个大文档,然后把它切分成小文档,为后面的演练做准备:

import os
os.environ['USER_AGENT'] = 'summarize'

llm_model_name = "qwen2.5"

from langchain_ollama import ChatOllama
llm = ChatOllama(model=llm_model_name,temperature=0.3, verbose=True)

def load_document(url):
    """加载文档"""

    from langchain_community.document_loaders import WebBaseLoader
    loader = WebBaseLoader(url,encoding='utf-8')
    doc = loader.load()

    return doc

def split_document(url):
    """分割文档,为Map做准备"""

    doc = load_document(url)
    
    from langchain_text_splitters import CharacterTextSplitter
    text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=1000, chunk_overlap=0
    )
    split_docs = text_splitter.split_documents(doc)
    print(f"Generated {len(split_docs)} documents.")

    return split_docs

split_docs = split_document("http://www.wfcoding.com/articles/practice/0325/")

Map-Reduce提示词

map_prompt 用于在map时对分割后的小文档提取摘要; reduce_prompt 用于在reduce阶段对小文档中提取出来的摘要进行进一步提炼/压缩,直到最终提炼出token数量小于 token_max 的摘要。

# Map时使用
from langchain_core.prompts import ChatPromptTemplate
map_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "请简明扼要地概括以下内容:\\n\\n{context}")
    ]
)

# Reduce时使用
reduce_template = """
以下是一组摘要:
{docs}
请将这些内容提炼成最终的、综合的主题摘要。
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])

确定token数量

token数量是决定是否继续提炼/压缩摘要的标准,这里我们用 jieba 来确定token数量。

这种办法不是特别准确,但是在绝大部分场合应该够用了。

from typing import List
from langchain_core.documents import Document
import jieba
def count_tokens(text):
    """返回token数量"""

    tokens = jieba.lcut(text)
    return len(tokens)

def length_function(documents: List[Document]) -> int:
    """获取输入内容的token数量。"""

    return sum(count_tokens(doc.page_content) for doc in documents)

定义状态

这些状态state是各个节点交换的数据规范,下面的代码注释有更加详细的说明:

import operator
from typing import Annotated, Literal, TypedDict

class OverallState(TypedDict):
    """主体状态
    
    这里我们使用了operator.add,这是因为我们想将所有从各个节点生成的摘要合并到一个列表中。
    """

    contents: List[str]     # 分割后的原始文档列表
    summaries: Annotated[list, operator.add]   # 由原始文档列表生成的摘要列表 
    collapsed_summaries: List[Document]     # 折叠/压缩的文档列表
    final_summary: str      # 最终提取的摘要


class SummaryState(TypedDict):
    """将所有文档“映射”到的节点的状态,以便生成摘要"""

    content: str

定义节点/步骤

下面我们实现一个简单的应用程序,它将摘要节点/步骤映射/map到文档列表上,然后使用上述提示词对其进行缩减/reduce。

当文本长度超过 LLM 的上下文窗口时,Map-Reduce 流程尤为有用。对于长文本,我们需要一种机制来确保在缩减步骤中要摘要的上下文不超过模型的上下文窗口大小。在这里,我们实现了摘要的递归折叠/压缩:根据token限制对输入进行分区,并根据分区生成摘要。重复此步骤,直到摘要的总长度达到所需限制,从而实现输提取任意长度文本的摘要。

接下来,我们定义一个langgraph工作流。为了演示折叠/压缩步骤,我们人为地将最大标记长度设置为 1,000 个。

from langchain.chains.combine_documents.reduce import (
    acollapse_docs,
    split_list_of_docs,
)
from langgraph.graph import END, START, StateGraph
from langgraph.constants import Send

token_max = 1000

async def generate_summary(state: SummaryState):
    """提取一个文档的摘要"""

    prompt = map_prompt.invoke(state["content"])
    response = await llm.ainvoke(prompt)
    return {"summaries": [response.content]}

def map_summaries(state: OverallState):
    """
    【边】把文档列表中的每一个文档map出去

    返回一个 `Send` 对象列表。每个 `Send` 对象包含图中一个节点的名称以及要发送到该节点的状态。
    """

    return [
        Send("generate_summary", {"content": content}) for content in state["contents"]
    ]


def collect_summaries(state: OverallState):
    """收集从map出去的文档提取的摘要
    
    所有摘要放在 collapsed_summaries 中,后面可以对它折叠/压缩,直到摘要小于token_max。
    """

    return {
        "collapsed_summaries": [Document(summary) for summary in state["summaries"]]
    }


async def _reduce(input: dict) -> str:
    prompt = reduce_prompt.invoke(input)
    response = await llm.ainvoke(prompt)
    return response.content


async def collapse_summaries(state: OverallState):
    """折叠/压缩摘要"""

    doc_lists = split_list_of_docs(
        state["collapsed_summaries"], length_function, token_max
    )

    # 使用reduce提示词折叠/压缩摘要列表
    results = []
    for doc_list in doc_lists:
        results.append(await acollapse_docs(doc_list, _reduce))

    return {"collapsed_summaries": results}


def should_collapse(
    state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
    """【边】确定我们是否应该折叠/压缩摘要"""

    num_tokens = length_function(state["collapsed_summaries"])
    if num_tokens > token_max:
        return "collapse_summaries"
    else:
        return "generate_final_summary"


async def generate_final_summary(state: OverallState):
    """生成最终摘要"""

    response = await _reduce(state["collapsed_summaries"])
    return {"final_summary": response}

值得注意的是 collapse_summaries 函数,它对输入的文档列表进行分块和压缩,以确保每个块的大小不超过 token_max。通过调用 split_list_of_docs 函数将文档列表拆分成多个子列表,然后对每个子列表调用 acollapse_docs 函数进行压缩,最终返回压缩后的文档列表。

构建langgraph图

万事俱备,下面开始构建langgraph图,并定义测试方法:

def create_graph():
    """构建langgraph图"""

    graph = StateGraph(OverallState)
    graph.add_node("generate_summary", generate_summary)
    graph.add_node("collect_summaries", collect_summaries)
    graph.add_node("collapse_summaries", collapse_summaries)
    graph.add_node("generate_final_summary", generate_final_summary)

    # Edges:
    graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
    graph.add_edge("generate_summary", "collect_summaries")
    graph.add_conditional_edges("collect_summaries", should_collapse)
    graph.add_conditional_edges("collapse_summaries", should_collapse)
    graph.add_edge("generate_final_summary", END)

    app = graph.compile()
    return app

async def summarize():
    """提取摘要"""

    app = create_graph()

    async for step in app.astream(
        {"contents": [doc.page_content for doc in split_docs]},
        {"recursion_limit": 10},
    ):
        step_keys = list(step.keys())
        print(step_keys)
        if 'generate_final_summary' in step_keys:
            print(step['generate_final_summary'])

summarize运行时,我们可以流式输出以观察其步骤顺序。在这里,将简单地打印出步骤的名称,在最后一个步骤,会打印出最终的摘要结果。

请注意,由于图表中有一个循环,因此在其执行过程中指定 recursion_limit 会防止出现死循环等问题:当超出指定的限制时,这将引发特定的错误。

见证效果

我们调用 summarize()函数可以看到下面的输出:

Generated 8 documents.
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
...
['collect_summaries']
['collapse_summaries']
['generate_final_summary']
{'final_summary': '本文是“本地大模型编程实战”系列的一部分,主要介绍了如何利用语言模型生成SQL查询以构建问答系统,并通过具体的实例展示了其实现过程和效果。主要内容包括:\n\n1. **系统实现步骤**:从准备阶段开 始,涉及设置SQLite数据库、使用`langgraph`框架创建能够根据自然语言问题生成SQL查询的链路图、状态管理以及执行SQL查询与生成答案等关键步骤。\n\n2. **具体实例展示**:通过定义和调用特定函数(如`execute_query`和`generate_answer`)来运行由模型生成的SQL查询,并基于查询结果生成最终答案。同时,将这些功能整合成一个包含三个节点的链路图。\n\n3. **模型对比与测试**:对不同大语言模型(如Qwen2.5和Llama3.1)生成的SQL语句进行了比较分析,指出各自的优缺点,并展示了它们在回答特定问题时的表现。\n\n4. **系统功能与支持情况**:定义了用于回答用户问题的`ask`函数,并分享了相关代码及资源。讨论了该系统的应用范围及其相对于其他方法的优势和局限性。\n\n5. **结果展示与注意事项**:具体展示了针对员工数量、消费最多的国家以及PlaylistTrack表结构等问题,通过SQL查询获取的结果,并强调当前系统仅支持SQL数据库查询而不支持人工审核SQL执行结果。\n\n整体而 言,本文详细介绍了如何利用大语言模型构建基于SQL数据的问答系统的方法和技术细节。'}

从上面的输出可以看出,整篇文章被拆分成8个文档,通过映射/map方式生成了8个摘要,然后进行了2次折叠/压缩递归调用后,才生成了最终简洁的摘要。

总结

基于 langgraph 对长文本提炼摘要的过程有一点复杂,可能需要自己实践一下才能有比较深刻的了解。


代码

本文涉及的所有代码以及相关资源都已经共享,参见:

为便于找到代码,程序文件名称最前面的编号与本系列文章的文档编号相同。

参考

🪐感谢您观看,祝好运🪐