集群选举问题:
Nacos支持集群模式,很显然。而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?
Nacos的集群类似于zookeeper, 它分为leader角色和follower角色, 那么从这个角色的名字可以看出来,这个集群存在选举的机制。 因为如果自己不具备选举功能,角色的命名可能就是master/slave了.
选举算法 :
Nacos集群采用 raft 算法来实现,它是相对zookeeper的选举算法较为简单的一种。选举算法的核心在 RaftCore 中,包括数据的处理和数据同步。
raft 算法演示地址 :/raft/
在Raft中,节点有三种角色:
- Leader:负责接收客户端的请求
- Candidate:用于选举Leader的一种角色(竞选状态)
- Follower:负责响应来自Leader或者Candidate的请求
选举分为两个节点
- 服务启动的时候
- leader挂了的时候
所有节点启动的时候,都是follower状态。 如果在一段时间内如果没有收到leader的心跳(可能是没有leader,也可能是leader挂了),那么follower会变成Candidate。然后发起选举,选举之前,会增加 term,这个 term 和 zookeeper 中的 epoch 的道理是一样的。
follower会投自己一票,并且给其他节点发送票据vote,等到其他节点回复在这个过程中,可能出现几种情况
- 收到过半的票数通过,则成为leader
- 被告知其他节点已经成为leader,则自己切换为follower
- 一段时间内没有收到过半的投票,则重新发起选举
约束条件在任一term中,单个节点最多只能投一票
选举的几种情况 :
-
第一种情况,赢得选举之后,leader会给所有节点发送消息,避免其他节点触发新的选举
-
第二种情况,比如有三个节点A B C。A B同时发起选举,而A的选举消息先到达C,C给A投了一票,当B的消息到达C时,已经不能满足上面提到的约束条件,即C不会给B投票,而A和B显然都不会给对方投票。A胜出之后,会给B,C发心跳消息,节点B发现节点A的term不低于自己的term,知道有已经有Leader了,于是转换成follower
-
第三种情况, 没有任何节点获得majority投票,可能是平票的情况。加入总共有四个节点(A/B/C/D),Node C、Node D同时成为了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,这就出现了平票 split vote的情况。这个时候大家都在等啊等,直到超时后重新发起选举。如果出现平票的情况,那么就延长了系统不可用的时间,因此raft引入了 randomizedelection timeouts来尽量避免平票情况.
源码分析 :
RaftCore 初始化 :Raft选举算法,是在RaftCore这个类中实现的。
/** * Init raft core. * * @throws Exception any exception during init */ @PostConstruct public void init() throws Exception { ("initializing Raft sub-system"); //开启一个notifier监听,这个线程中会遍历listeners,根据ApplyAction执行相应的逻辑 (notifier); final long start = (); //遍历/nacos/data/naming/data/文件件,也就是从磁盘中加载Datum到内存,用来做数据恢复。(数据同步采用2pc协议,leader收到请求会写写入到磁盘日志,然后再进行数据同步) (notifier, datums); //从/nacos_home/data/naming/文件中读取term,term表示当前的时钟周期。 setTerm((().getProperty("term"), 0L)); ("cache loaded, datum count: {}, current term: {}", (), ()); while (true) { if (() <= 0) { break; } (1000L); } initialized = true; ("finish to load data from disk, cost: {} ms.", (() - start)); //开启定时任务,每500ms执行一次,用来判断是否需要发起leader选举,每500ms发起一次心跳 (new MasterElection()); (new HeartBeat()); ("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }
这里我们重点关注 MasterElection 选举:
public class MasterElection implements Runnable { @Override public void run() { try { //判断本机是否具准备就绪 if (!()) { return; } //获取本机的节点信息 RaftPeer local = (); //leader选举触发间隔时间,第一次进来,会生成(0~15000毫秒)之间的一个随机数-500. // //后面由于500ms调度一次,所以每次该线程被调起,会将该leaderDueMs减去TICK_PERIOD_MS(500ms),直到小于0的时候会触发选举 //后面每次收到一次leader的心跳就会重置leaderDueMs = 15s+(随机0-5s) -= GlobalExecutor.TICK_PERIOD_MS; //当间隔时间>0,直接返回,等到下一次500ms后再调用 if ( > 0) { return; } //重新设置本地的leaderDueMs // reset timeout (); ();//设置心跳间隔5s //发起投票 sendVote(); } catch (Exception e) { ("[RAFT] error while master election {}", e); } } //发送票据数据 private void sendVote() { RaftPeer local = (()); ("leader timeout, start voting,leader: {}, term: {}", (getLeader()), ); //重置peer (); //每一次投票,都累加一次term,表示当前投票的轮数 (); //选自己,此时peers中有一个votefor就是自己 = ; //本地server状态设置为CANDIDATE = ; Map<String, String> params = new HashMap<>(1); ("vote", (local));//设置参数 //遍历除了本机ip之外的其他节点,把自己的票据发送给所有节点 for (final String server : ()) { final String url = buildUrl(server, API_VOTE);//API_VOTE接口路径:/raft/vote try { (url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (() != HttpURLConnection.HTTP_OK) { .error("NACOS-RAFT vote failed: {}, url: {}", (), url); return 1; } //获取其他server的响应 RaftPeer peer = ((), ); ("received approve from peer: {}", (peer)); //计算leader (peer); return 0; } }); } catch (Exception e) { ("error while sending vote to server: {}", server); } } } }
RaftController : 我们先来看一下,其他节点收到投票请求后,如何处理呢?在没有看代码之前,不难猜测到,他应该要做票据的判断,到底是不是赞同你作为leader。
@PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception { //接受到投票请求 RaftPeer peer = (((request, "vote"), )); //返回结果 return (peer); }
处理逻辑非常简单。
-
判断收到的请求的term是不是过期的数据,如果是,则认为对方的这个票据无效,直接告诉发送这个票据的节点,你应该选择当前收到请求的节点。
-
否则,当前收到请求的节点会自动接受对方的票据,并把自己设置成follower
public synchronized RaftPeer receivedVote(RaftPeer remote) { if (!(remote)) { throw new IllegalStateException("can not find peer: " + ); } //得到本机节点信息 RaftPeer local = (()); //判断周期是否过期,如果收到的票据是过期状态 if (() <= ()) { String msg = "received illegitimate vote" + ", voter-term:" + + ", votee-term:" + ; (msg); //如果voteFor为空,表示在此之前没有收到其他节点的票据。则把remote节点的票据设置到自己的节点上 if (()) { = ; } return local; } //如果上面if不成立,说明remote机器率先发起的投票,那么就认同他的投票 //重置选举间隔时间 (); //设置为follower = ; = ; (());//同步term ("vote {} as leader, term: {}", , ); return local; }
(peer) 表示用来决策谁能成为leader
public RaftPeer decideLeader(RaftPeer candidate) { //假设3个节点:A,B,C。local节点为A,假设A,B,C第一轮同时发起选举请求 //第一轮:处理B,C节点返回结果:peers{"ip_a":"candidate_a","ip_b":"candidate_b","ip_C":"candidate_C"} (, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; /**第一轮投票结果: * 第一次for循环是a自己的投票: * maxApproveCount = 1,maxApprovePeer = A
* 第二次for循环是B服务器返回的投票,该投票投向B:数据同步addInstance
* 比如我们在注册服务时,调用addInstance之后,最后会调用 (key,instances); 这个方法,来实现数据一致性的同步。 * if (() > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A * * 第三次for循环是C服务器返回的投票,该投票投向C: * if (() > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A */ for (RaftPeer peer : ()) { if (()) { continue; } (); if (() > maxApproveCount) { maxApproveCount = (); maxApprovePeer = ; } } //majorityCount():2(假设3个节点) //第一轮:maxApproveCount = 1 if条件不成立,返回leader,此时leader为null,没有选举成功 if (maxApproveCount >= majorityCount()) { //找到得票最多的那个peer RaftPeer peer = (maxApprovePeer); //设置这个peer为leader = ; if (!(leader, peer)) { leader = peer; (new LeaderElectFinishedEvent(this, leader, local())); ("{} has become the LEADER", ); } } return leader; }
通过上面的分析想必大家应该很清楚 Nacos 的集群选举实现了 。如果还有不明白的小伙伴可以对着 Raft 的演示地址进行理解。
数据同步:
在注册服务时,调用addInstance之后,最后会调用 (key,instances); 这个方法,来实现数据一致性的同步。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = (namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); (instanceList); (key, instances); } }
调用 用来发布类容,也就是实现数据的一致性同步。
@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } private ConsistencyService mapConsistencyService(String key) { return (key) ? ephemeralConsistencyService : persistentConsistencyService; }
这里会走后面 persistentConsistencyService ,由于 public class RaftConsistencyServiceImpl implements PersistentConsistencyService 所以这里走 RaftConsistencyServiceImpl :
@Override public void put(String key, Record value) throws NacosException { try { (key, value); } catch (Exception e) { ("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } } public void signalPublish(String key, Record value) throws Exception { //如果自己不是leader,则找到leader节点,把数据转发到leader节点 if (!isLeader()) { ObjectNode params = (); ("key", key); ("value", (value)); Map<String, String> parameters = new HashMap<>(1); ("key", key); final RaftPeer leader = getLeader(); //代理转发 (, API_PUB, (), parameters); return; } //如果自己是leader,则向所有节点发送onPublish请求。这个所有节点包含自己。 try { OPERATE_LOCK.lock();//先上个锁 final long start = (); final Datum datum = new Datum(); = key; = value; if (getDatum(key) == null) { (1L); } else { (getDatum(key).()); } ObjectNode json = (); ("datum", (datum)); ("source", (())); //onPublish可以当做是一次心跳了,更新选举检查时间,然后一个重点就是term增加100了 //当然还是就是更新内容了,先写文件,再更新内存缓存。(也就是先记录本地日志) onPublish(datum, ());//发送数据到所有节点 final String content = (); final CountDownLatch latch = new CountDownLatch(()); //遍历所有节点,发送事务提交请求,把记录在本地日志中的数据进行提交 for (final String server : ()) { if (isLeader(server)) {//再次判断是否leader (); continue; }//构建发布同步接口地址 final String url = buildUrl(server, API_ON_PUB); (url, ("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (() != HttpURLConnection.HTTP_OK) { .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", , server, ()); return 1; } (); return 0; } @Override public STATE onContentWriteCompleted() { return ; } }); } // ..... }
其中 onPublish(datum, ()); 发送数据到所有节点:
public void onPublish(Datum datum, RaftPeer source) throws Exception { RaftPeer local = (); // 本地数据信息 if ( == null) { //这个value就是服务注册的服务,为空报错 ("received empty datum"); throw new IllegalStateException("received empty datum"); } // 判断是否事leader 不是报错 if (!()) { .warn("peer {} tried to publish data but wasn't leader, leader: {}", (source), (getLeader())); throw new IllegalStateException("peer(" + + ") tried to publish " + "data but wasn't leader"); } // 判断 term if (() < ()) { ("out of date publish, pub-term: {}, cur-term: {}", (source), (local)); throw new IllegalStateException( "out of date publish, pub-term:" + () + ", cur-term: " + ()); } //重置事件 (); // if data should be persisted, usually this is true: if (()) { (datum);// 发送持久化数据,完成数据同步 } (, datum); if (isLeader()) {//如果是leader ,则增加 term (PUBLISH_TERM_INCREASE_COUNT); } else { if (() + PUBLISH_TERM_INCREASE_COUNT > ()) { //set leader term: getLeader().(()); (getLeader().()); } else { (PUBLISH_TERM_INCREASE_COUNT); } } //更新/nacos_home/data/naming/文件 (()); (, ); ("data added/updated, key={}, term={}", , ); }
通过以上的操作就完成了数据的同步。