hg-store-node-note
store-node阅读笔记
在看store-node的源码之前,首先需要知道这个包是做什么的:
- 这个包主要启动了Raft的节点服务,负责节点生命周期的管理、分区/Raft任务的提交(主要)、扫描/聚合查询流水线、TTL 清理任务、以及 RocksDB/JRaft/系统层指标采集等能力。
- 向外提供了grpc(供内部组件使用)和一些Rest API(用户来管理节点)
- 这个类不仅负责当前store实例中的raft peer的监控方法,还会负责将对应的grpc路由到正确的raft peer上。但是需要注意的是,raft peer的日志同步是由JRaft来负责的,而不是由这个类来负责的。

概括的来说:node包仅仅包含了当前store实例的peers的生命周期管理的Hook与任务提交的方法,并不负责底层的共识部分。
一些关键的包
- controller包
包含了对外部提供的的Rest API,用于监控节点状态、节点信息等等 - grpc包
grpc是node模块最重要的包,grpc/HgStoreNodeService.java封装了HgStoreEngine(和底层存储交互),以及HgStoreSessionImpl。其中addRaftTask()负责将Raft任务提交到对应Partition的Raft Group中,这里的invoke()方法会将涉及到 Raft状态改变的操作同步到各个的Raft Peer。一般的读请求在代码中也看不到(进一步证明了这段只是将状态同步到Raft Peer,而不是实际的读写处理,读写处理在先前core包下的BussinessImpl)。

同时如果当前的target peer不是leader,由底层的raft处理之后会通过Raft Closure返回`NOT_LEADER`的信息,让Client端进行选择另一store实例进行重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
switch (methodId) {
case HgStoreNodeService.BATCH_OP:
hgStoreSession.doBatch(partId, (BatchReq) req, response);
break;
case HgStoreNodeService.TABLE_OP:
hgStoreSession.doTable(partId, (TableReq) req, response);
break;
case HgStoreNodeService.GRAPH_OP:
if (((GraphReq) req).getMethod() == GRAPH_METHOD_DELETE) {
storeEngine.deletePartition(partId, ((GraphReq) req).getGraphName());
}
hgStoreSession.doGraph(partId, (GraphReq) req, response);
break;
case HgStoreNodeService.CLEAN_OP:
hgStoreSession.doClean(partId, (CleanReq) req, response);
break;
case HgStoreNodeService.TTL_CLEAN_OP:
hgStoreSession.cleanTtl(partId, (TTLCleanRequest) req, response);
break;
default:
return false; // Unhandled
}
return true;
另外还有3.7中新增的一个子包
task,包含了几个新增的类DefaulTaskSubmitter、RaftTaskSubmitter、TaskInfo、TaskSubmitter以及TtlCleaner。TtlCleaner在run()的时候都做了什么事?- 初始化executor,KvClient(pd侧)
- 从pd侧搜索”HUGEGRAPH/hg/EXPIRED”这个key,获取拥有ttl配置的图信息,字符串的大致形式是 “graphName1:startTime1:isRaft1, graphName2:startTime2:isRaft2”
- 在
runAll()中解析字符串,判断后续是否需要进行raft提交,并加入到任务队列,交给executor执行,最后输出日志。
并且这个类是通过定时任务启动的,下边是他的初始化代码:
1
2
3
4public TtlCleaner(){
future =
scheduler.scheduleAtFixedRate(this, delay, 24 > * 3600, TimeUnit.SECONDS);
}如果是raft任务,会进一步调用前面说过的
StoreNodeService的addRaftTask()方法,进> 行raft任务的提交。
最后在runAll()方法中,通过while循环来获取到任务结果(Raft任务),并利用countdownlatch等待,直到所有任务完成/触发超时。
如果是直接调用DefaultTaskSubmitter的cleanTtl()则是同步的等待响应。listener包,
ContextClosedListener、PdConfigureListener、PlaceHolderListener,三个类是监听器,主要监听以下三种事件:- 上下文关闭事件
- pd配置变更事件
- 应用开始事件
PlaceHolderListener在应用准备就绪的时候会在dataPath创建占位文件,防止在RocksDB写入数据的时候触发磁盘已满的问题。ContextClosedListener在在应用关闭时,会尝试将正在运行的任务完成,最后调度Raft将Leader移交给其他的peer。PdConfigurationListener会监听(KvClient.listen)Pd的配置变化,底层是利用了grpc的监听机制。
这里3.7版本的ContextClosedListener并没有标记为Spring框架的Bean,因此成员变量的@AutoWired注解也没有生效,那么这里shutdownHook的结果似乎没有生效。
hg-store-node-note
http://example.com/2026/01/22/hg-store-node-note/