前言
本篇博文结合zookeeper源码食用更佳哦!zookeeper 源码可以直接从github下载编译!
zookeeper 集群模式下做了哪些事情?
- 初始化通信方式 netty/NIO,并在后续启动
- 初始化zookeeper保存自身信息的zookeeper节点
- 启动内嵌的 jetty 服务器(通过jetty的start方法,本文不作赘述)
- 选举出 leader *
接下来让我们顺着源码图,从源码出发看看zookeeper到底是如何启动的。
zookeeper 集群启动入口:QuorumPeerMain.main();
main方法中调用 initializeAndRun() 解析zookeeper配置文件,如果配置文件中配置了集群,则执行 runFromConfig() 方法;
所有关键性的代码都在这个方法中,仔细研究这个方法后我们发现以下关键性代码
1、 初始化通信方式 netty/NIO,并在后续启动
zookeeper 作为一个分布式应用的高性能协调服务,必然需要和其他分布式应用相互通信,nio/netty等基于socket的通信方式成为了他的不二之选。
zookeeper 默认使用NIO做为通信方式,但是官方推荐使用netty。
【QuorumPeerMain.runFromConfig() 中代码】
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
【ServerCnxnFactory.createFactory() 中代码】
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
public static ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor()
.newInstance();
return serverCnxnFactory;
} catch (Exception e) {}
}
zookeeper 从系统配置中尝试获取 zookeeper.serverCnxnFactory
,如果能获取到则使用配置的通信方法,否则使用NIOServerCnxnFactory;然后通过反射实例化。此时netty还没有设置监听端口,没有开始事件监听
我们可以通过系统配置zookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
启用 netty;
ServerCnxnFactory 有4种实现,默认使用nio,推荐使用netty;
2、 初始化zookeeper保存自身信息的zookeeper节点
使用过zookeeper的同学应该会发现,zookeeper在启动后,默认会有一个zookeeper节点,这个节点是zookeeper用于储存自身信息的;
【QuorumPeerMain.runFromConfig() 中代码】
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
ZKDatabase 中通过createDataTree();调用 new DataTree() 新建初始化节点
【new DataTree() 中代码】
private static final String rootZookeeper = "/";
public static final String procZookeeper = "/zookeeper";
public static final String quotaZookeeper = "/zookeeper/quota";
private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
DataTree(DigestCalculator digestCalculator) {
this.digestCalculator = digestCalculator;
nodes = new NodeHashMapImpl(digestCalculator);
nodes.put("", root);
nodes.putWithoutDigest(rootZookeeper, root);
root.addChild(procChildZookeeper);
nodes.put(procZookeeper, procDataNode);
procDataNode.addChild(quotaChildZookeeper);
nodes.put(quotaZookeeper, quotaDataNode);
addConfigNode();
nodeDataSize.set(approximateDataSize());
try {
dataWatches = WatchManagerFactory.createWatchManager();
childWatches = WatchManagerFactory.createWatchManager();
} catch (Exception e) {
}
}
4、选举出 leader
第一轮:zookeeper集群中的节点在启动时,会将自身推举为leader【生成一张自身信息的选票(myid+zxid+选举周期)】,发送给所有其他参与选举的节点,这是第一轮,这一轮肯定无法选举出leader;
第二轮:在接受到其他节点的选票时,当前节点会将接收到的选票 和 自己发出的选票 进行对比【对比规则:选举周期最大 > 数据最新 > id最大】,根据选举规则判断两张选票的优先级,优先级高的选票会再次被发送出去,并记录在当前节点本地缓存中,直到集群半数以上的节点的选票一致;
【QuorumPeerMain.runFromConfig() 中代码】
quorumPeer.start();
【quorumPeer.start() 中代码】
public synchronized void start() {
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
startServerCnxnFactory()
方法会调用【1】中初始化的通信类的 start 方法,官方推荐netty,所以我们看下 NettyServerCnxnFactory
【NettyServerCnxnFactory.statrt() 中代码】
netty开启监听端口,接受客户端/其他节点的传输的数据
public void start() {
if (listenBacklog != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
}
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
localAddress = (InetSocketAddress) parentChannel.localAddress();
}
【startLeaderElection()】
将自己做为leader,新建一张选票【不是真正发送出去的选票,这是当前节点存储的自己认为的 leader信息】,可以看出选票的信息【myid+zxid+选举周期】
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {}
this.electionAlg = createElectionAlgorithm(electionType);
}
本质上 quorumPeer 是一个线程,所以 super.start(); 其实调用的是 quorumPeer的 run() 方法
让我们看看run 方法到底做了什么,这个方法比较长,此处我们只看 LOOKING 状态的重点代码【省略了其他状态的代码以及异常处理代码】
【quorumPeer.run() 中代码】
case LOOKING:
······
setCurrentVote(makeLEStrategy().lookForLeader());
······
【FastLeaderElection.lookForLeader() 中代码】
这段代码加上注释长达230行,此处也只看重点部分。
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
是将自身的 myid+zxid+选举周期 保存到 FastLeaderElection 类变量中;
sendNotifications();
将数据保存到一个 LinkedBlockingQueue 中,然后有其他线程去发送给其他节点;
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
中的 recvqueue 【是实例化 quorumPeer 是注册的一个WorkerReceiver 线程去接受其他节点发送过来的选票,保存在这个队列中】
n.electionEpoch > logicalclock.get()
比较收到的选票的选举周期和自己本地记录的逻辑时钟;
- 如果接受到的选票 选举周期比较大,更新本地的逻辑时钟,然后对比两个选票
- 如果接收到的选票 选举周期小,直接跳过这个选票
- 如果相等,对比两个选票
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())
将接收到的选票和当前选票进行对比【对比规则:选举周期最大 > 数据最新 > id最大】,将最合适做leader的选票,保存到 FastLeaderElection 类变量中;
voteSet.hasAllQuorums()
直到接收到所有节点的选票,此时 FastLeaderElection 类变量中保存的数据就是最终的leader节点数据,返回最终节点数据
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
switch (n.state) {
case LOOKING:
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
}