上一篇文章 用大语言模型LLM查询图数据库NEO4J(1) 介绍了使用GraphQACypherChain查询NEO4J。用它实现简单快捷,但是不容易定制,在生产环境中可能会面临挑战。

本文将基于langgraph 框架,用LLM(大语言模型)查询图数据库NEO4J。它可以定义清晰复杂的工作流,能应对比较复杂的应用场景。

以下是即将实现的可视化LangGraph流程:

LangGraph查询图数据库NEO4J

定义状态

我们将首先定义 LangGraph 应用程序的输入、输出和整体状态。
我们可以认为所谓的状态是:节点之间数据交换的数据格式。它们都继承自TypedDict

from operator import add
from typing import Annotated, List
from typing_extensions import TypedDict

class InputState(TypedDict):
    """输入"""
    question: str

class OverallState(TypedDict):
    """整体"""
    question: str
    next_action: str
    cypher_statement: str
    cypher_errors: List[str]
    database_records: List[dict]
    steps: Annotated[List[str], add]

class OutputState(TypedDict):
    """输出"""
    answer: str
    steps: List[str]
    cypher_statement: str

第一个节点:护栏/guardrails

第一个节点 guardrails 是一个简单的“护栏”步骤:我们会验证问题是否与电影或其演员阵容相关,如果不是,我们会通知用户我们无法回答任何其他问题。否则,我们将进入 Cypher 生成节点。

from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

guardrails_system = """
As an intelligent assistant, your primary objective is to decide whether a given question is related to movies or not. 
If the question is related to movies, output "movie". Otherwise, output "end".
To make this decision, assess the content of the question and determine if it refers to any movie, actor, director, film industry, 
or related topics. Provide only the specified output: "movie" or "end".
"""
guardrails_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            guardrails_system,
        ),
        (
            "human",
            ("{question}"),
        ),
    ]
)


class GuardrailsOutput(BaseModel):
    decision: Literal["movie", "end"] = Field(
        description="Decision on whether the question is related to movies"
    )

from langchain_ollama import ChatOllama
llm_llama = ChatOllama(model="llama3.1",temperature=0, verbose=True)

guardrails_chain = guardrails_prompt | llm_llama.with_structured_output(GuardrailsOutput)


def guardrails(state: InputState) -> OverallState:
    """
    Decides if the question is related to movies or not.
    """
    guardrails_output = guardrails_chain.invoke({"question": state.get("question")})
    database_records = None
    if guardrails_output.decision == "end":
        database_records = "This questions is not about movies or their cast. Therefore I cannot answer this question."
    return {
        "next_action": guardrails_output.decision,
        "database_records": database_records,
        "steps": ["guardrail"],
    }

该节点使用llama3.1,通过提示词判断输出的内容是否与电影有关:如果有关则返回movie,在后面会生成Cypher并查询图数据库NEO4J,如果无关则返回end,交给大语言模型处理。

节点:生成Cypher/generate_cypher(查询NEO4J的语句)

使用少量例子增强提示词

将自然语言转换为准确的 Cypher 查询极具挑战性。增强此过程的一种方法是提供相关的少样本示例来指导 LLM 生成查询。为此,我们将使用 Semantic SimilarityExampleSelector 来动态选择最相关的示例。

# Few-shot prompting
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_neo4j import Neo4jVector

examples = [
    {
        "question": "How many artists are there?",
        "query": "MATCH (a:Person)-[:ACTED_IN]->(:Movie) RETURN count(DISTINCT a)",
    },
    {
        "question": "Which actors played in the movie Casino?",
        "query": "MATCH (m:Movie {title: 'Casino'})<-[:ACTED_IN]-(a) RETURN a.name",
    },
    {
        "question": "How many movies has Tom Hanks acted in?",
        "query": "MATCH (a:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie) RETURN count(m)",
    },
    {
        "question": "List all the genres of the movie Schindler's List",
        "query": "MATCH (m:Movie {title: 'Schindler's List'})-[:IN_GENRE]->(g:Genre) RETURN g.name",
    },
    {
        "question": "Which actors have worked in movies from both the comedy and action genres?",
        "query": "MATCH (a:Person)-[:ACTED_IN]->(:Movie)-[:IN_GENRE]->(g1:Genre), (a)-[:ACTED_IN]->(:Movie)-[:IN_GENRE]->(g2:Genre) WHERE g1.name = 'Comedy' AND g2.name = 'Action' RETURN DISTINCT a.name",
    },
    {
        "question": "Which directors have made movies with at least three different actors named 'John'?",
        "query": "MATCH (d:Person)-[:DIRECTED]->(m:Movie)<-[:ACTED_IN]-(a:Person) WHERE a.name STARTS WITH 'John' WITH d, COUNT(DISTINCT a) AS JohnsCount WHERE JohnsCount >= 3 RETURN d.name",
    },
    {
        "question": "Identify movies where directors also played a role in the film.",
        "query": "MATCH (p:Person)-[:DIRECTED]->(m:Movie), (p)-[:ACTED_IN]->(m) RETURN m.title, p.name",
    },
    {
        "question": "Find the actor with the highest number of movies in the database.",
        "query": "MATCH (a:Actor)-[:ACTED_IN]->(m:Movie) RETURN a.name, COUNT(m) AS movieCount ORDER BY movieCount DESC LIMIT 1",
    },
]

from langchain_ollama import OllamaEmbeddings
embeddings = OllamaEmbeddings(model="nomic-embed-text")

example_selector = SemanticSimilarityExampleSelector.from_examples(
    examples, embeddings, Neo4jVector, k=5, input_keys=["question"]
)

用提示词推理Cypher

我们马上要实现 Cypher 生成链。提示词包含图数据的结构、动态选择的少样本示例以及用户的问题。这种组合能够生成 Cypher 查询,以从图数据库中检索相关信息。

import os

def create_enhanced_graph():
    """创建NEO4J对象"""

    os.environ["NEO4J_URI"] = "bolt://localhost:7687"
    os.environ["NEO4J_USERNAME"] = "neo4j"
    os.environ["NEO4J_PASSWORD"] = "neo4j"


    from langchain_neo4j import Neo4jGraph

    enhanced_graph = Neo4jGraph(enhanced_schema=True)
    #print(enhanced_graph.schema)
    return enhanced_graph
enhanced_graph = create_enhanced_graph()

from langchain_core.output_parsers import StrOutputParser

text2cypher_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            (
                "Given an input question, convert it to a Cypher query. No pre-amble."
                "Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!"
            ),
        ),
        (
            "human",
            (
                """You are a Neo4j expert. Given an input question, create a syntactically correct Cypher query to run.
Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!
Here is the schema information
{schema}

Below are a number of examples of questions and their corresponding Cypher queries.

{fewshot_examples}

User input: {question}
Cypher query:"""
            ),
        ),
    ]
)

llm_qwen = ChatOllama(model="qwen2.5",temperature=0, verbose=True)

text2cypher_chain = text2cypher_prompt | llm_qwen | StrOutputParser()


def generate_cypher(state: OverallState) -> OverallState:
    """
    Generates a cypher statement based on the provided schema and user input
    """
    NL = "\n"
    fewshot_examples = (NL * 2).join(
        [
            f"Question: {el['question']}{NL}Cypher:{el['query']}"
            for el in example_selector.select_examples(
                {"question": state.get("question")}
            )
        ]
    )
    generated_cypher = text2cypher_chain.invoke(
        {
            "question": state.get("question"),
            "fewshot_examples": fewshot_examples,
            "schema": enhanced_graph.schema,
        }
    )
    return {"cypher_statement": generated_cypher, "steps": ["generate_cypher"]}

节点:执行Cypher查询

现在我们添加一个节点来执行生成的 Cypher 语句。如果图数据库没有返回结果,我们应该明确告知 LLM,因为留空上下文有时会导致 LLM 幻觉。

可以在此节点前增加 校验查询更正查询 等节点提升结果的准确性。当然,增加这样的节点也不一定能达到预期效果,因为它们本身也可能出错,所以要小心对待。

no_results = "I couldn't find any relevant information in the database"

def execute_cypher(state: OverallState) -> OverallState:
    """
    Executes the given Cypher statement.
    """

    records = enhanced_graph.query(state.get("cypher_statement"))
    return {
        "database_records": records if records else no_results,
        "next_action": "end",
        "steps": ["execute_cypher"],
    }

生成最终回答

最后一步是生成答案。这需要将初始问题与图数据库输出相结合,以生成相关的答案。

generate_final_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant",
        ),
        (
            "human",
            (
                """Use the following results retrieved from a database to provide
a succinct, definitive answer to the user's question.

Respond as if you are answering the question directly.

Results: {results}
Question: {question}"""
            ),
        ),
    ]
)

generate_final_chain = generate_final_prompt | llm_llama | StrOutputParser()


def generate_final_answer(state: OverallState) -> OutputState:
    """
    Decides if the question is related to movies.
    """
    final_answer = generate_final_chain.invoke(
        {"question": state.get("question"), "results": state.get("database_records")}
    )
    return {"answer": final_answer, "steps": ["generate_final_answer"]}

构建工作流

我们将实现 LangGraph 工作流。

先定义条件边函数:

def guardrails_condition(
    state: OverallState,
) -> Literal["generate_cypher", "generate_final_answer"]:
    if state.get("next_action") == "end":
        return "generate_final_answer"
    elif state.get("next_action") == "movie":
        return "generate_cypher"

这个函数将添加到 护栏/guardrails 后面,根据上一步是否生成了Cypher查询来决定路由到下面哪个节点去。

下面的代码将把以上的节点和边连接起来,成为一个完整的工作流:

from langgraph.graph import END, START, StateGraph

langgraph = StateGraph(OverallState, input=InputState, output=OutputState)
langgraph.add_node(guardrails)
langgraph.add_node(generate_cypher)
langgraph.add_node(execute_cypher)
langgraph.add_node(generate_final_answer)

langgraph.add_edge(START, "guardrails")
langgraph.add_conditional_edges(
    "guardrails",
    guardrails_condition,
)

langgraph.add_edge("generate_cypher","execute_cypher")
langgraph.add_edge("execute_cypher","generate_final_answer")

langgraph.add_edge("generate_final_answer", END)

langgraph = langgraph.compile()

见证效果

万事俱备,我们给构建好的langgraph工作流提两个问题,看看它的表现吧:

def ask(question:str):
    response = langgraph.invoke({"question": question})
    print(f'response:\n{response["answer"]}')

ask("What's the weather in Spain?")
ask("What was the cast of the Casino?")

第一个问题与电影无关,没有查询NEO4J,问题直接由LLM做了回答:

I'm happy to help with that! Unfortunately, I don't have access to real-time weather information for specific locations like Spain. However, I can suggest checking a reliable weather website or app, such as AccuWeather or Weather.com, for the most up-to-date forecast.

Would you like me to provide some general information about Spain's climate instead?

对于第二个问题,执行时间较长,最后给出的回答是:

The cast of the movie "Casino" included James Woods, Joe Pesci, Robert De Niro, and Sharon Stone.

Nice!

总结

本文演示了通过比较复杂的langgraph构建了图形化的工作流,由它来处理对图数据的查询。
我觉得使用这种方式的弊端是比较麻烦,好处则是思路很清晰、容易定制修改,更加适合在生产环境中构建比较复杂的AI应用或者智能体Agent。


代码

本文涉及的所有代码以及相关资源都已经共享,参见:

为便于找到代码,程序文件名称最前面的编号与本系列文章的文档编号相同。

参考

🪐感谢您观看,祝好运🪐