SOFAJRaft源码阅读(伍)-初识RheaKV

时间:2023-02-03 18:07:54

SOFAJRaft的SOFAJRaft-RheaKV 是基于 SOFAJRaft 和 RocksDB 实现的嵌入式、分布式、高可用、强一致的 KV 存储类库。SOFAJRaft-RheaKV 集群主要包括三个核心组件:PD,Store 和 Region。
@Author:Akai-yuan
@更新时间:2023/2/3

SOFAJRaft-RheaKV 存储类库主要包括** PD,Store 和 Region 三个核心组件**,支持轻量级的状态/元信息存储以及集群同步,分布式锁服务使用场景

(1)调度

PD 是全局的中心总控节点,负责整个集群的调度管理、Region ID 生成、维护 RegionRouteTable 路由表。一个 PDServer 管理多个集群,集群之间基于 clusterId 隔离;PD Server 需要单独部署,很多场景其实并不需要自管理,RheaKV 也支持不启用 PD,不需要自管理的集群可不启用 PD,设置 PlacementDriverOptions 的 fake选项为 true 即可。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表。

(2)存储

Store 是集群中的一个物理存储节点,一个 Store 包含一个或多个 Region。通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,比如,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等。

(3)数据

Region 是最小的 KV 数据单元,可理解为一个数据分区或者分片,每个 Region 都有一个左闭右开的区间 [startKey, endKey),能够根据请求流量/负载/数据量大小等指标自动分裂以及自动副本搬迁。Region 有多个副本 Replication 构建 Raft Groups 存储在不同的 Store 节点,通过 Raft 协议日志复制功能数据同步到同 Group 的全部节点。Region对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region的Epoch 感知 Region 是否有变化。
为了让大家更清楚PD,Store 和 Region 三个核心组件的功能,这里放一张官方图片以便于理解:
SOFAJRaft源码阅读(伍)-初识RheaKV

2.初始化

我们从JRaft-Example模块的RheaKV部分开始,首先看到com.alipay.sofa.jraft.example.rheakv.Server1的main方法。

  1. 声明了PlacementDriverOptions、StoreEngineOptions两个配置选项实体
  2. 定义了RheaKVStoreOptions,并将PDOptions和SEOptions装配到属性中,并初始化声明集群名、是否开启并行压缩、服务的IP:端口列表("127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183")。
  3. 声明一个Node节点
  4. 添加一个钩子函数,实现优雅停机。(作者曾经分析过钩子函数的作用,具体参照:SOFAJRaft源码阅读(叁)-ShutdownHook如何优雅的停机
public static void main(final String[] args) {
    final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
        .withFake(true) // use a fake pd
        .config();
    final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //
        //StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现
        .withStorageType(StorageType.RocksDB)
        .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())
        .withRaftDataPath(Configs.RAFT_DATA_PATH)
        .withServerAddress(new Endpoint("127.0.0.1", 8181))
        .config();
    final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
        .withClusterName(Configs.CLUSTER_NAME) //
        .withUseParallelCompress(true) //
        .withInitialServerList(Configs.ALL_NODE_ADDRESSES)
        .withStoreEngineOptions(storeOpts) //
        .withPlacementDriverOptions(pdOpts) //
        .config();
    System.out.println(opts);
    final Node node = new Node(opts);
    node.start();
    Runtime.getRuntime().addShutdownHook(new Thread(node::stop));
    System.out.println("server1 start OK");
}

关于Node节点的实现:
里面维护了一个RheaKVStoreOptions、RheaKVStore。

public class Node {
    private final RheaKVStoreOptions options;
    private RheaKVStore              rheaKVStore;
    public Node(RheaKVStoreOptions options) {
        this.options = options;
    }
    public void start() {
        this.rheaKVStore = new DefaultRheaKVStore();
        this.rheaKVStore.init(this.options);
    }
    public void stop() {
        this.rheaKVStore.shutdown();
    }
    public RheaKVStore getRheaKVStore() {
        return rheaKVStore;
    }
}

(1)DefaultRheaKVStore的初始化

可以看到调用了DefaultRheaKVStore,还调用了他的init方法进行初始化。

public synchronized boolean init(final RheaKVStoreOptions opts) {
    	//判断是否已经启动   
    	if (this.started) {
            LOG.info("[DefaultRheaKVStore] already started.");
            return true;
        }
        DescriberManager.getInstance().addDescriber(RouteTable.getInstance());
        this.opts = opts;
        //根据PlacementDriverOptions初始化PD
        final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();
        final String clusterName = opts.getClusterName();
        Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");
        Requires.requireNonNull(clusterName, "opts.clusterName");
        if (Strings.isBlank(pdOpts.getInitialServerList())) {
            // 如果为空,则继承父级的值
            pdOpts.setInitialServerList(opts.getInitialServerList());
        }
        //这里不启用 PD,就实例化一个FakePlacementDriverClient
        if (pdOpts.isFake()) {
            this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);
        //启用 PD,就实例化一个RemotePlacementDriverClient
        } else {
            this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);
        }
    	//初始化FakePlacementDriverClient/RemotePlacementDriverClient
        if (!this.pdClient.init(pdOpts)) {
            LOG.error("Fail to init [PlacementDriverClient].");
            return false;
        }
        // 初始化压缩策略
        ZipStrategyManager.init(opts);
        // 初始化存储引擎
        final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
        if (stOpts != null) {
            stOpts.setInitialServerList(opts.getInitialServerList());
            this.storeEngine = new StoreEngine(this.pdClient, this.stateListenerContainer);
            if (!this.storeEngine.init(stOpts)) {
                LOG.error("Fail to init [StoreEngine].");
                return false;
            }
        }
   		//获取当前节点的ip和端口号
        final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();
        final RpcOptions rpcOpts = opts.getRpcOptions();
        Requires.requireNonNull(rpcOpts, "opts.rpcOptions");
        //初始化一个RpcService,并重写getLeader方法
    	this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {
            @Override
            public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
                final Endpoint leader = getLeaderByRegionEngine(regionId);
                if (leader != null) {
                    return leader;
                }
                return super.getLeader(regionId, forceRefresh, timeoutMillis);
            }
        };
        if (!this.rheaKVRpcService.init(rpcOpts)) {
            LOG.error("Fail to init [RheaKVRpcService].");
            return false;
        }
        //获取重试次数,默认重试两次
        this.failoverRetries = opts.getFailoverRetries();
        //默认5000
        this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
        //是否只从leader读取数据,默认为true
        this.onlyLeaderRead = opts.isOnlyLeaderRead();
        //初始化kvDispatcher, 这里默认为true
        if (opts.isUseParallelKVExecutor()) {
            final int numWorkers = Utils.cpus();
            //乘以16
            final int bufSize = numWorkers << 4;
            final String name = "parallel-kv-executor";
            final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
                    ? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);
            //初始化Dispatcher
            this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT, threadFactory);
        }
        this.batchingOpts = opts.getBatchingOptions();
        //默认是true
        if (this.batchingOpts.isAllowBatching()) {
            this.getBatching = new GetBatching(KeyEvent::new, "get_batching",
                    new GetBatchingHandler("get", false));
            this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
                    new GetBatchingHandler("get_only_safe", true));
            this.putBatching = new PutBatching(KVEvent::new, "put_batching",
                    new PutBatchingHandler("put"));
        }
        LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);
        return this.started = true;
    }

(2)StoreEngine初始化

其中有些代码操作与DefaultRheaKVStore的init方法中的某些代码一致,就不再重复赘述,其余的用注释的方式标注在以下代码块中。

public synchronized boolean init(final StoreEngineOptions opts) {
        if (this.started) {
            LOG.info("[StoreEngine] already started.");
            return true;
        }
        DescriberManager.getInstance().addDescriber(this);
        this.storeOpts = Requires.requireNonNull(opts, "opts");
        Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");
        //获取ip和端口
    	final int port = serverAddress.getPort();
        final String ip = serverAddress.getIp();
        //如果传入的IP为空,那么就设置启动机器ip作为serverAddress的ip
        if (ip == null || Utils.IP_ANY.equals(ip)) {
            serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);
            opts.setServerAddress(serverAddress);
        }
        //获取度量上报时间
        final long metricsReportPeriod = opts.getMetricsReportPeriod();
        // 初始化RegionEngineOptions
        List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        //如果RegionEngineOptions为空,则初始化一个
        if (rOptsList == null || rOptsList.isEmpty()) {
            // -1 region
            final RegionEngineOptions rOpts = new RegionEngineOptions();
            rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
            rOptsList = Lists.newArrayList();
            rOptsList.add(rOpts);
            opts.setRegionEngineOptionsList(rOptsList);
        }
        //获取集群名
        final String clusterName = this.pdClient.getClusterName();
        //遍历rOptsList集合,为其中的RegionEngineOptions对象设置参数
        for (final RegionEngineOptions rOpts : rOptsList) {
            //用集群名+“-”+RegionId 拼接设置为RaftGroupId
            rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
            rOpts.setServerAddress(serverAddress);
            if (Strings.isBlank(rOpts.getInitialServerList())) {
                // if blank, extends parent's value
                rOpts.setInitialServerList(opts.getInitialServerList());
            }
            if (rOpts.getNodeOptions() == null) {
                // copy common node options
                rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
                    .getCommonNodeOptions().copy());
            }
            //如果原本没有设置度量上报时间,那么就重置一下
            if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
                // extends store opts
                rOpts.setMetricsReportPeriod(metricsReportPeriod);
            }
        }
        // 初始化Store和Store里面的region
        final Store store = this.pdClient.getStoreMetadata(opts);
        if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {
            LOG.error("Empty store metadata: {}.", store);
            return false;
        }
        this.storeId = store.getId();
        this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);
        // 初始化执行器
        if (this.readIndexExecutor == null) {
            this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
        }
        if (this.raftStateTrigger == null) {
            this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());
        }
        if (this.snapshotExecutor == null) {
            this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads(),
                opts.getSnapshotMaxThreads());
        }
        // init rpc executors
        final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();
        // 初始化rpc远程执行器,用来执行RPCServer的Processors
        if (!useSharedRpcExecutor) {
            if (this.cliRpcExecutor == null) {
                this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());
            }
            if (this.raftRpcExecutor == null) {
                this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());
            }
            if (this.kvRpcExecutor == null) {
                this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());
            }
        }
        // 初始化指标度量
        startMetricReporters(metricsReportPeriod);
        // 初始化rpcServer,供其他服务调用
        this.rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverAddress, this.raftRpcExecutor,
            this.cliRpcExecutor);
    	//为server加入各种processor
        StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);
        if (!this.rpcServer.init(null)) {
            LOG.error("Fail to init [RpcServer].");
            return false;
        }
        // init db store
    	// 根据不同的类型选择db
        if (!initRawKVStore(opts)) {
            return false;
        }
        if (this.rawKVStore instanceof Describer) {
            DescriberManager.getInstance().addDescriber((Describer) this.rawKVStore);
        }
        // init all region engine
    	// 为每个region初始化RegionEngine
        if (!initAllRegionEngine(opts, store)) {
            LOG.error("Fail to init all [RegionEngine].");
            return false;
        }
        // heartbeat sender
        // 如果开启了自管理的集群,那么需要初始化心跳发送器
        if (this.pdClient instanceof RemotePlacementDriverClient) {
            HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();
            if (heartbeatOpts == null) {
                heartbeatOpts = new HeartbeatOptions();
            }
            this.heartbeatSender = new HeartbeatSender(this);
            if (!this.heartbeatSender.init(heartbeatOpts)) {
                LOG.error("Fail to init [HeartbeatSender].");
                return false;
            }
        }
        this.startTime = System.currentTimeMillis();
        LOG.info("[StoreEngine] start successfully: {}.", this);
        return this.started = true;
    }

(3)RegionEngineOptions初始化

在上述StoreEngine的初始化中,可以看到有对regionEngine进行初始化,接下来我们单独再拿出该部分代码进行分析。

  1. 首先对opts.getRegionEngineOptionsList()判空,若是空则初始化一个RegionEngineOptions()
  2. 遍历新生成的每一个RegionEngineOptions,并设置参数(RaftGroupId、ServerAddress、InitialServerList、NodeOptions、MetricsReportPeriod)
        // init region options
        List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        if (rOptsList == null || rOptsList.isEmpty()) {
            // -1 region
            final RegionEngineOptions rOpts = new RegionEngineOptions();
            rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
            rOptsList = Lists.newArrayList();
            rOptsList.add(rOpts);
            opts.setRegionEngineOptionsList(rOptsList);
        }
        final String clusterName = this.pdClient.getClusterName();
        for (final RegionEngineOptions rOpts : rOptsList) {
            rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
            rOpts.setServerAddress(serverAddress);
            if (Strings.isBlank(rOpts.getInitialServerList())) {
                // if blank, extends parent's value
                rOpts.setInitialServerList(opts.getInitialServerList());
            }
            if (rOpts.getNodeOptions() == null) {
                // copy common node options
                rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
                    .getCommonNodeOptions().copy());
            }
            if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
                // extends store opts
                rOpts.setMetricsReportPeriod(metricsReportPeriod);
            }
        }

(4)Store初始化

调用pdClient的getStoreMetadata方法进行初始化:

final Store store = this.pdClient.getStoreMetadata(opts);

当调用FakePlacementDriverClient#getStoreMetadata时:

  1. 获取之前初始化得到的RegionEngineOptions链表
  2. 构造一个与RegionEngineOptions链表相同大小的Region链表,因为一个Store里面会有多个region,之前在那张图中能直观看到。
  3. RegionEngineOptions链表中的每个元素执行getLocalRegionMetadata方法,并将结果添加到region链表中。
    public Store getStoreMetadata(final StoreEngineOptions opts) {
        final Store store = new Store();
        final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());
        store.setId(-1);
        store.setEndpoint(opts.getServerAddress());
        for (final RegionEngineOptions rOpts : rOptsList) {
            regionList.add(getLocalRegionMetadata(rOpts));
        }
        store.setRegions(regionList);
        return store;
    }

我们来看AbstractPlacementDriverClient#getLocalRegionMetadata方法:

  1. 保证regionId在合理的范围内
  2. 设置key的范围(左闭右开
  3. 根据initialServerList转换成peer对象
  4. 将Region添加到regionRouteTable路由表
    protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {
        final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");
        Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "
                                                                         + Region.MIN_ID_WITH_MANUAL_CONF);
        Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "
                                                                        + Region.MAX_ID_WITH_MANUAL_CONF);
        final byte[] startKey = opts.getStartKeyBytes();
        final byte[] endKey = opts.getEndKeyBytes();
        final String initialServerList = opts.getInitialServerList();
        final Region region = new Region();
        final Configuration conf = new Configuration();
        // region
        region.setId(regionId);
        region.setStartKey(startKey);
        region.setEndKey(endKey);
        region.setRegionEpoch(new RegionEpoch(-1, -1));
        // peers
        Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");
        conf.parse(initialServerList);
        region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));
        this.regionRouteTable.addOrUpdateRegion(region);
        return region;
    }

RegionEpoch涉及到两个版本号:
(1)confVer:Conf 变化的版本号, 当增加或者移除一个peer时,版本号自增
(2)version:Region 版本号, 分裂或合并时,版本号自增

    public RegionEpoch(long confVer, long version) {
        this.confVer = confVer
        this.version = version;
    }

关于RegionRouteTable#addOrUpdateRegion:

    public void addOrUpdateRegion(final Region region) {
        Requires.requireNonNull(region, "region");
        Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
        final long regionId = region.getId();
        final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
        final StampedLock stampedLock = this.stampedLock;
        final long stamp = stampedLock.writeLock();
        try {
            this.regionTable.put(regionId, region.copy());
            this.rangeTable.put(startKey, regionId);
        } finally {
            stampedLock.unlockWrite(stamp);
        }
    }

我们看看RegionRouteTable的几个字段:

  • keyBytesComparator:是一个LexicographicByteArrayComparator字典序比较器
  • stampedLock:比读写锁性能更高的锁
  • rangeTable:是一个TreeMap,它实现了NavigableMap,并按照指定的keyBytesComparator排序,键值对为<regionId, region>
  • regionTable:是一个hashMap,键值对为<startKey, regionId>
    private static final Comparator<byte[]>  keyBytesComparator = BytesUtil.getDefaultByteArrayComparator();
    private final StampedLock                stampedLock        = new StampedLock();
    private final NavigableMap<byte[], Long> rangeTable         = new TreeMap<>(keyBytesComparator);
    private final Map<Long, Region>          regionTable        = Maps.newHashMap();

(5)RegionEngine初始化

StoreEngine#initAllRegionEngine

    private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {
        Requires.requireNonNull(opts, "opts");
        Requires.requireNonNull(store, "store");
        //获取主目录
        String baseRaftDataPath = opts.getRaftDataPath();
        if (Strings.isNotBlank(baseRaftDataPath)) {
            try {
                FileUtils.forceMkdir(new File(baseRaftDataPath));
            } catch (final Throwable t) {
                LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);
                return false;
            }
        } else {
            baseRaftDataPath = "";
        }
        final Endpoint serverAddress = opts.getServerAddress();
        //获取RegionEngineOptions和region
        final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        final List<Region> regionList = store.getRegions();
        Requires.requireTrue(rOptsList.size() == regionList.size());
        for (int i = 0; i < rOptsList.size(); i++) {
            final RegionEngineOptions rOpts = rOptsList.get(i);
            if (!inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {
                continue;
            }
            final Region region = regionList.get(i);
            //检验region路径是否为空,为空则重新设值
            if (Strings.isBlank(rOpts.getRaftDataPath())) {
                final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
                rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());
            }
            Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
            //根据Region初始化RegionEngine
            final RegionEngine engine = new RegionEngine(region, this);
            if (engine.init(rOpts)) {
                // 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求
                final RegionKVService regionKVService = new DefaultRegionKVService(engine);
                registerRegionKVService(regionKVService);
                //放入到ConcurrentMap<Long, RegionEngine> regionKVServiceTable 中
                this.regionEngineTable.put(region.getId(), engine);
            } else {
                LOG.error("Fail to init [RegionEngine: {}].", region);
                return false;
            }
        }
        return true;
    }

**

    public synchronized boolean init(final RegionEngineOptions opts) {
        if (this.started) {
            LOG.info("[RegionEngine: {}] already started.", this.region);
            return true;
        }
        this.regionOpts = Requires.requireNonNull(opts, "opts");
        //实例化状态机
        this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);

        // node options
        NodeOptions nodeOpts = opts.getNodeOptions();
        if (nodeOpts == null) {
            nodeOpts = new NodeOptions();
        }
        //如果度量间隔时间大于零,那么开启度量
        final long metricsReportPeriod = opts.getMetricsReportPeriod();
        if (metricsReportPeriod > 0) {
            // metricsReportPeriod > 0 means enable metrics
            nodeOpts.setEnableMetrics(true);
        }
        final Configuration initialConf = new Configuration();
        if (!initialConf.parse(opts.getInitialServerList())) {
            LOG.error("Fail to parse initial configuration {}.", opts.getInitialServerList());
            return false;
        }
        //初始化集群配置
        nodeOpts.setInitialConf(initialConf);
        nodeOpts.setFsm(this.fsm);
        //初始化各种日志的路径
        final String raftDataPath = opts.getRaftDataPath();
        try {
            FileUtils.forceMkdir(new File(raftDataPath));
        } catch (final Throwable t) {
            LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);
            return false;
        }
        if (Strings.isBlank(nodeOpts.getLogUri())) {
            final Path logUri = Paths.get(raftDataPath, "log");
            nodeOpts.setLogUri(logUri.toString());
        }
        if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {
            final Path meteUri = Paths.get(raftDataPath, "meta");
            nodeOpts.setRaftMetaUri(meteUri.toString());
        }
        if (Strings.isBlank(nodeOpts.getSnapshotUri())) {
            final Path snapshotUri = Paths.get(raftDataPath, "snapshot");
            nodeOpts.setSnapshotUri(snapshotUri.toString());
        }
        LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,
            nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());
        final Endpoint serverAddress = opts.getServerAddress();
        final PeerId serverId = new PeerId(serverAddress, 0);
        final RpcServer rpcServer = this.storeEngine.getRpcServer();
        this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);
        this.node = this.raftGroupService.start(false);
        //初始化node节点
        RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());
        if (this.node != null) {
            final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();
            final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();
            //RaftRawKVStore 是 RheaKV 基于 Raft 复制状态机 KVStoreStateMachine 的 RawKVStore 接口 KV 存储实现
        	//RheaKV 的 Raft 入口,从这里开始 Raft 流程
            this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);
            //拦截请求做指标度量
            this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);
            // metrics config
            if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {
                final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();
                if (metricRegistry != null) {
                    final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();
                    // start raft node metrics reporter
                    this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //
                        .prefixedWith("region_" + this.region.getId()) //
                        .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //
                        .outputTo(LOG) //
                        .scheduleOn(scheduler) //
                        .shutdownExecutorOnStop(scheduler != null) //
                        .build();
                    this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);
                }
            }
            this.started = true;
            LOG.info("[RegionEngine] start successfully: {}.", this);
        }
        return this.started;
    }