在实际项目中,我们经常会遇到类似下面的查询图数据库的需求:

  1. 查询根节点,根节点可能有多个,需要分页
  2. 点击根节点,查询其子节点以及子节点关联的边和更下一层的子节点

本文使用 pyHugeGraph 实现上述查询 HugeGraph 图数据库的功能。

关于 HugeGraph

HugeGraph 是百度开源的分布式图数据库,专为处理超大规模图数据(百亿级顶点和边)设计,注重高吞吐、低延迟和可扩展性。

我们还知道一种名气更大的图数据库Neo4jHugeGraph 与它的主要区别有:

对比维度 HugeGraph Neo4j
开源协议 Apache 2.0(完全开源,商业使用无限制) 社区版:GPLv3(开源,商业使用需付费);企业版:商业许可(闭源)
查询语言 支持 Gremlin(通用图查询语言,跨图数据库兼容) 主推 Cypher(专用于 Neo4j 的声明式查询语言,语法简洁,适合复杂关系查询)

从上面的对比来看,使用 HugeGraph 不用有任何心理压力,可惜貌似 HugeGraph 的社区不太完善,查询资料比较吃力。

Gremlin 的使用方法可以参见:tinkerpop ,可以通过下面的两句话入门:

  • g.V() 用于处理顶点(Vertex)
  • g.E() 用于处理边(Edge)

安装依赖并初始化客户端和缓存

pip install hugegraph-python
import json
from cachetools import TTLCache
from pyhugegraph.client import PyHugeClient

client = PyHugeClient(ip="10.10.145.103",port=11003,graph="final")

cache = TTLCache(maxsize=100, ttl=300)

设想这种使用场景下,用户大概率会反复的点来点去,所以使用缓存应该能大幅减少响应速度,提升使用体验。

根据图结构定义变量

这些变量是根据图结构定义的映射,可以提高代码的通用性:

FIELD_MAPPING = {
    "case_number": ("slice_case", "case_number", "案号"),
    "case_name": ("slice_case", "case_name", "案件名称"),
    "case_level": ("slice_case", "case_level", "审理阶段"),
    "cause_type": ("slice_cause", "cause_type", "案由"),
    "court_content": ("slice_court", "court_content", "法院")
}

SLICE_MAPPING={
    "slice_case":"案件",
    "slice_cause":"案由",
    "slice_court":"法院"
}

查询条件

下面的类定义了查询条件 condition,有了它便于理解后面的代码:

from typing import List,Literal
from pydantic import BaseModel,Field

class SearchField(BaseModel):
    field: Literal["case_number", "case_name","cause_type","court_content"] \
        = Field(..., description="case_number:案号,case_name:案件名称,cause_type:案由,court_content:法院")
    value: List[str] = Field(..., description="值。")

查询根节点

下面的代码用于查询根节点:

def execute_single_condition(condition):
    """根据查询条件查询符合条件的一级顶点"""
    field = condition.field
    value = condition.value

    if field not in FIELD_MAPPING:
        raise ValueError(f"Unsupported field: {field}")

    label, prop_key, _ = FIELD_MAPPING[field]

    g = client.gremlin()
    val_conditions = ",".join([f"'{v}'" for v in value])
    g_query = f"g.V().hasLabel('{label}').has('{prop_key}', within({val_conditions}))"

    results = g.exec(g_query)
    if results:
        return results["data"]
    return []

查询图数据

下面的代码用于查询图数据,并进行分页和自动缓存:

def search_child(vertices:set):
    """查询下一级边和顶点"""
    g = client.gremlin()

    child_edges = []    
    child_vertex_ids = set()
    child_vertices = []

    for vertice in vertices:
        vid = vertice["id"]
        g_query_out_edges = f"g.V('{vid}').outE()"
        out_edges = g.exec(g_query_out_edges)
        if out_edges:
            out_edges_data = out_edges["data"]
            child_edges.extend(out_edges_data)
            for edge in out_edges_data:
                child_vertex_ids.add(edge["inV"])

        g_query_in_edges = f"g.V('{vid}').inE()"
        in_edges = g.exec(g_query_in_edges)
        if in_edges:
            in_edges_data = in_edges["data"]
            child_edges.extend(in_edges_data)
            for edge in in_edges_data:
                child_vertex_ids.add(edge["outV"])
    
    # 下面对child_vertex_ids拆分,防止元素太多超出Gremlin的请求大小限制
    chunk_size = 100
    small_lists = []
    big_list = list(child_vertex_ids)
    for i in range(0, len(big_list), chunk_size):
        small_lists.append(big_list[i:i + chunk_size])    
    for small_list in small_lists:
        val_conditions = ",".join([f"'{v}'" for v in small_list])
        g_query = f"g.V({val_conditions})"
        verticles = g.exec(g_query)
        if verticles:
            for v in verticles["data"]:
                child_vertices.append(v)

    return child_edges,child_vertices

def search_graph_with_cache_and_pagination(condition, page=1, limit=10):

    request_json = {
        "condition": condition.model_dump(),
        "page": page,
        "limit": limit
    }

    cache_key = generate_cache_key(request_json)
    if cache_key in cache:
        cached = cache[cache_key]
        if cached:
            return cached

    # Step 1: 获取一级顶点
    first_vertices = execute_single_condition(condition)

    if not first_vertices:
        return build_echarts_data([], [])

    # 分页
    total = len(first_vertices)
    start_idx = (page - 1) * limit
    if start_idx >= total:
        return build_echarts_data([], [])
    end_idx = start_idx + limit    
    if total >= end_idx:
        first_vertices_page = first_vertices[start_idx:end_idx]
    else:
        first_vertices_page = first_vertices[start_idx:]

    # Step 2: 获取边和第二层顶点  
    second_edges,second_vertices = search_child(first_vertices_page)

    # Step 3: 获取第三层顶点
    # 如果获取第三层,大概率能找到1万左右的顶点和边,太大了。
    #third_edges,third_vertices = search_child(second_vertices)

    # 合并所有顶点和边
    all_vertices = first_vertices_page + second_vertices #+ third_vertices
    all_edges = second_edges #+ third_edges

    echarts_data = build_echarts_data(all_vertices, all_edges)

    # 缓存
    cache[cache_key] = echarts_data

    return echarts_data

转化为echart方便使用的json数据

下面是的方法将顶点和边数据转换为方便前端echart显示的json格式:

def build_echarts_data(vertices, edges):
    nodes = []
    links = []

    node_set = set()

    for v in vertices:
        vid = v["id"]
        if vid not in node_set:
            node_set.add(vid)
            properties_zh = {}
            for k, v_list in v["properties"].items():
                zh_key = k
                if k in FIELD_MAPPING:
                    zh_key = FIELD_MAPPING[k][2]
                    properties_zh[zh_key] = v_list[0] if isinstance(v_list, list) and len(v_list) > 0 else v_list

            nodes.append({
                "id": vid,
                "name": next(iter(properties_zh.values())),
                #"symbolSize": 50,
                "category": SLICE_MAPPING[v["label"]],
                "properties": properties_zh
            })

    for e in edges:
        source_id = e["outV"]
        target_id = e["inV"]
        links.append({
            "source": source_id,
            "target": target_id,
            "name": e["label"],
            #"value": 1
        })

    categories = list({SLICE_MAPPING[v["label"]] for v in vertices})

    return {
        "nodes": nodes,
        "links": links,
        "categories": [{"name": c} for c in categories]
    }

总结

使用 pyHugeGraph 实现上述查询 HugeGraph 图数据库的完整代码样例不大好找,希望对您有帮助。
使用 pyHugeGraph 也可以创建修改 HugeGraph 的 schema ,详见 :hugegraph-python


🪐祝好运🪐