在实际项目中,我们经常会遇到类似下面的查询图数据库的需求:
- 查询根节点,根节点可能有多个,需要分页
- 点击根节点,查询其子节点以及子节点关联的边和更下一层的子节点
本文使用 pyHugeGraph 实现上述查询 HugeGraph 图数据库的功能。
关于 HugeGraph
HugeGraph 是百度开源的分布式图数据库,专为处理超大规模图数据(百亿级顶点和边)设计,注重高吞吐、低延迟和可扩展性。
我们还知道一种名气更大的图数据库Neo4j,HugeGraph 与它的主要区别有:
对比维度 | HugeGraph | Neo4j |
---|---|---|
开源协议 | Apache 2.0(完全开源,商业使用无限制) | 社区版:GPLv3(开源,商业使用需付费);企业版:商业许可(闭源) |
查询语言 | 支持 Gremlin(通用图查询语言,跨图数据库兼容) | 主推 Cypher(专用于 Neo4j 的声明式查询语言,语法简洁,适合复杂关系查询) |
从上面的对比来看,使用 HugeGraph 不用有任何心理压力,可惜貌似 HugeGraph 的社区不太完善,查询资料比较吃力。
Gremlin 的使用方法可以参见:tinkerpop ,可以通过下面的两句话入门:
- g.V() 用于处理顶点(Vertex)
- g.E() 用于处理边(Edge)
安装依赖并初始化客户端和缓存
pip install hugegraph-python
import json
from cachetools import TTLCache
from pyhugegraph.client import PyHugeClient
client = PyHugeClient(ip="10.10.145.103",port=11003,graph="final")
cache = TTLCache(maxsize=100, ttl=300)
设想这种使用场景下,用户大概率会反复的点来点去,所以使用缓存应该能大幅减少响应速度,提升使用体验。
根据图结构定义变量
这些变量是根据图结构定义的映射,可以提高代码的通用性:
FIELD_MAPPING = {
"case_number": ("slice_case", "case_number", "案号"),
"case_name": ("slice_case", "case_name", "案件名称"),
"case_level": ("slice_case", "case_level", "审理阶段"),
"cause_type": ("slice_cause", "cause_type", "案由"),
"court_content": ("slice_court", "court_content", "法院")
}
SLICE_MAPPING={
"slice_case":"案件",
"slice_cause":"案由",
"slice_court":"法院"
}
查询条件
下面的类定义了查询条件 condition,有了它便于理解后面的代码:
from typing import List,Literal
from pydantic import BaseModel,Field
class SearchField(BaseModel):
field: Literal["case_number", "case_name","cause_type","court_content"] \
= Field(..., description="case_number:案号,case_name:案件名称,cause_type:案由,court_content:法院")
value: List[str] = Field(..., description="值。")
查询根节点
下面的代码用于查询根节点:
def execute_single_condition(condition):
"""根据查询条件查询符合条件的一级顶点"""
field = condition.field
value = condition.value
if field not in FIELD_MAPPING:
raise ValueError(f"Unsupported field: {field}")
label, prop_key, _ = FIELD_MAPPING[field]
g = client.gremlin()
val_conditions = ",".join([f"'{v}'" for v in value])
g_query = f"g.V().hasLabel('{label}').has('{prop_key}', within({val_conditions}))"
results = g.exec(g_query)
if results:
return results["data"]
return []
查询图数据
下面的代码用于查询图数据,并进行分页和自动缓存:
def search_child(vertices:set):
"""查询下一级边和顶点"""
g = client.gremlin()
child_edges = []
child_vertex_ids = set()
child_vertices = []
for vertice in vertices:
vid = vertice["id"]
g_query_out_edges = f"g.V('{vid}').outE()"
out_edges = g.exec(g_query_out_edges)
if out_edges:
out_edges_data = out_edges["data"]
child_edges.extend(out_edges_data)
for edge in out_edges_data:
child_vertex_ids.add(edge["inV"])
g_query_in_edges = f"g.V('{vid}').inE()"
in_edges = g.exec(g_query_in_edges)
if in_edges:
in_edges_data = in_edges["data"]
child_edges.extend(in_edges_data)
for edge in in_edges_data:
child_vertex_ids.add(edge["outV"])
# 下面对child_vertex_ids拆分,防止元素太多超出Gremlin的请求大小限制
chunk_size = 100
small_lists = []
big_list = list(child_vertex_ids)
for i in range(0, len(big_list), chunk_size):
small_lists.append(big_list[i:i + chunk_size])
for small_list in small_lists:
val_conditions = ",".join([f"'{v}'" for v in small_list])
g_query = f"g.V({val_conditions})"
verticles = g.exec(g_query)
if verticles:
for v in verticles["data"]:
child_vertices.append(v)
return child_edges,child_vertices
def search_graph_with_cache_and_pagination(condition, page=1, limit=10):
request_json = {
"condition": condition.model_dump(),
"page": page,
"limit": limit
}
cache_key = generate_cache_key(request_json)
if cache_key in cache:
cached = cache[cache_key]
if cached:
return cached
# Step 1: 获取一级顶点
first_vertices = execute_single_condition(condition)
if not first_vertices:
return build_echarts_data([], [])
# 分页
total = len(first_vertices)
start_idx = (page - 1) * limit
if start_idx >= total:
return build_echarts_data([], [])
end_idx = start_idx + limit
if total >= end_idx:
first_vertices_page = first_vertices[start_idx:end_idx]
else:
first_vertices_page = first_vertices[start_idx:]
# Step 2: 获取边和第二层顶点
second_edges,second_vertices = search_child(first_vertices_page)
# Step 3: 获取第三层顶点
# 如果获取第三层,大概率能找到1万左右的顶点和边,太大了。
#third_edges,third_vertices = search_child(second_vertices)
# 合并所有顶点和边
all_vertices = first_vertices_page + second_vertices #+ third_vertices
all_edges = second_edges #+ third_edges
echarts_data = build_echarts_data(all_vertices, all_edges)
# 缓存
cache[cache_key] = echarts_data
return echarts_data
转化为echart方便使用的json数据
下面是的方法将顶点和边数据转换为方便前端echart显示的json格式:
def build_echarts_data(vertices, edges):
nodes = []
links = []
node_set = set()
for v in vertices:
vid = v["id"]
if vid not in node_set:
node_set.add(vid)
properties_zh = {}
for k, v_list in v["properties"].items():
zh_key = k
if k in FIELD_MAPPING:
zh_key = FIELD_MAPPING[k][2]
properties_zh[zh_key] = v_list[0] if isinstance(v_list, list) and len(v_list) > 0 else v_list
nodes.append({
"id": vid,
"name": next(iter(properties_zh.values())),
#"symbolSize": 50,
"category": SLICE_MAPPING[v["label"]],
"properties": properties_zh
})
for e in edges:
source_id = e["outV"]
target_id = e["inV"]
links.append({
"source": source_id,
"target": target_id,
"name": e["label"],
#"value": 1
})
categories = list({SLICE_MAPPING[v["label"]] for v in vertices})
return {
"nodes": nodes,
"links": links,
"categories": [{"name": c} for c in categories]
}
总结
使用 pyHugeGraph 实现上述查询 HugeGraph 图数据库的完整代码样例不大好找,希望对您有帮助。
使用 pyHugeGraph 也可以创建修改 HugeGraph 的 schema ,详见 :hugegraph-python 。
🪐祝好运🪐