zookeeper 启动过程/leader 选举


前言

本篇博文结合zookeeper源码食用更佳哦!zookeeper 源码可以直接从github下载编译!

zookeeper 集群模式下做了哪些事情?

  1. 初始化通信方式 netty/NIO,并在后续启动
  2. 初始化zookeeper保存自身信息的zookeeper节点
  3. 启动内嵌的 jetty 服务器(通过jetty的start方法,本文不作赘述)
  4. 选举出 leader *

接下来让我们顺着源码图,从源码出发看看zookeeper到底是如何启动的。
zookeeper 启动 + leader选举.png

zookeeper 集群启动入口:QuorumPeerMain.main();

main方法中调用 initializeAndRun() 解析zookeeper配置文件,如果配置文件中配置了集群,则执行 runFromConfig() 方法;

所有关键性的代码都在这个方法中,仔细研究这个方法后我们发现以下关键性代码

1、 初始化通信方式 netty/NIO,并在后续启动

zookeeper 作为一个分布式应用的高性能协调服务,必然需要和其他分布式应用相互通信,nio/netty等基于socket的通信方式成为了他的不二之选。

zookeeper 默认使用NIO做为通信方式,但是官方推荐使用netty。

【QuorumPeerMain.runFromConfig() 中代码】

if (config.getClientPortAddress() != null) {
    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}

【ServerCnxnFactory.createFactory() 中代码】

public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";

public static ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                                       .getDeclaredConstructor()
                                                                       .newInstance();
        return serverCnxnFactory;
    } catch (Exception e) {}
}

zookeeper 从系统配置中尝试获取 zookeeper.serverCnxnFactory ,如果能获取到则使用配置的通信方法,否则使用NIOServerCnxnFactory;然后通过反射实例化。此时netty还没有设置监听端口,没有开始事件监听

我们可以通过系统配置zookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory启用 netty;

ServerCnxnFactory 有4种实现,默认使用nio,推荐使用netty;

image.png

2、 初始化zookeeper保存自身信息的zookeeper节点

使用过zookeeper的同学应该会发现,zookeeper在启动后,默认会有一个zookeeper节点,这个节点是zookeeper用于储存自身信息的;

【QuorumPeerMain.runFromConfig() 中代码】

quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));

ZKDatabase 中通过createDataTree();调用 new DataTree() 新建初始化节点

【new DataTree() 中代码】

private static final String rootZookeeper = "/";
public static final String procZookeeper = "/zookeeper";
public static final String quotaZookeeper = "/zookeeper/quota";
private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
DataTree(DigestCalculator digestCalculator) {
    this.digestCalculator = digestCalculator;
    nodes = new NodeHashMapImpl(digestCalculator);
    nodes.put("", root);
    nodes.putWithoutDigest(rootZookeeper, root);
    root.addChild(procChildZookeeper);
    nodes.put(procZookeeper, procDataNode);
    procDataNode.addChild(quotaChildZookeeper);
    nodes.put(quotaZookeeper, quotaDataNode);
    addConfigNode();
    nodeDataSize.set(approximateDataSize());
    try {
        dataWatches = WatchManagerFactory.createWatchManager();
        childWatches = WatchManagerFactory.createWatchManager();
    } catch (Exception e) {
    }
}

4、选举出 leader

第一轮:zookeeper集群中的节点在启动时,会将自身推举为leader【生成一张自身信息的选票(myid+zxid+选举周期)】,发送给所有其他参与选举的节点,这是第一轮,这一轮肯定无法选举出leader;

第二轮:在接受到其他节点的选票时,当前节点会将接收到的选票 和 自己发出的选票 进行对比【对比规则:选举周期最大 > 数据最新 > id最大】,根据选举规则判断两张选票的优先级,优先级高的选票会再次被发送出去,并记录在当前节点本地缓存中,直到集群半数以上的节点的选票一致;

【QuorumPeerMain.runFromConfig() 中代码】

quorumPeer.start();

【quorumPeer.start() 中代码】

public synchronized void start() {
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {}
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}

startServerCnxnFactory() 方法会调用【1】中初始化的通信类的 start 方法,官方推荐netty,所以我们看下 NettyServerCnxnFactory

【NettyServerCnxnFactory.statrt() 中代码】

netty开启监听端口,接受客户端/其他节点的传输的数据

public void start() {
    if (listenBacklog != -1) {
        bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
    }
    parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
    localAddress = (InetSocketAddress) parentChannel.localAddress();
}

【startLeaderElection()】

将自己做为leader,新建一张选票【不是真正发送出去的选票,这是当前节点存储的自己认为的 leader信息】,可以看出选票的信息【myid+zxid+选举周期】

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {}
    this.electionAlg = createElectionAlgorithm(electionType);
}

本质上 quorumPeer 是一个线程,所以 super.start(); 其实调用的是 quorumPeer的 run() 方法

让我们看看run 方法到底做了什么,这个方法比较长,此处我们只看 LOOKING 状态的重点代码【省略了其他状态的代码以及异常处理代码】

【quorumPeer.run() 中代码】

case LOOKING:
    ······
    setCurrentVote(makeLEStrategy().lookForLeader());
    ······

【FastLeaderElection.lookForLeader() 中代码】

这段代码加上注释长达230行,此处也只看重点部分。

updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());是将自身的 myid+zxid+选举周期 保存到 FastLeaderElection 类变量中;

sendNotifications(); 将数据保存到一个 LinkedBlockingQueue 中,然后有其他线程去发送给其他节点;

Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); 中的 recvqueue 【是实例化 quorumPeer 是注册的一个WorkerReceiver 线程去接受其他节点发送过来的选票,保存在这个队列中】

n.electionEpoch > logicalclock.get() 比较收到的选票的选举周期和自己本地记录的逻辑时钟;

  1. 如果接受到的选票 选举周期比较大,更新本地的逻辑时钟,然后对比两个选票
  2. 如果接收到的选票 选举周期小,直接跳过这个选票
  3. 如果相等,对比两个选票

totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch()) 将接收到的选票和当前选票进行对比【对比规则:选举周期最大 > 数据最新 > id最大】,将最合适做leader的选票,保存到 FastLeaderElection 类变量中;

image.png

voteSet.hasAllQuorums() 直到接收到所有节点的选票,此时 FastLeaderElection 类变量中保存的数据就是最终的leader节点数据,返回最终节点数据

Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

synchronized (this) {
    logicalclock.incrementAndGet();
    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();

Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
switch (n.state) {
    case LOOKING:
        if (n.electionEpoch > logicalclock.get()) {
            logicalclock.set(n.electionEpoch);
            recvset.clear();
            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                updateProposal(n.leader, n.zxid, n.peerEpoch);
            } else {
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }
            sendNotifications();
        } else if (n.electionEpoch < logicalclock.get()) {
            break;
        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
            sendNotifications();
        }
        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
        if (voteSet.hasAllQuorums()) {                      
            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                    recvqueue.put(n);
                    break;
                }
            }
            if (n == null) {
                setPeerState(proposedLeader, voteSet);
                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }
        break;
}

image.png


文章作者: zhouxh-z
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 zhouxh-z !
  目录