前言
本文较硬,结合zookeeper源码食用更佳哦!zookeeper 源码可以直接从github下载编译!
zookeeper 有两种通信方式:netty、Nio;本文是基于 Netty 源码进行的解读!
建议先阅读# 集群模式 zookeeper 启动时做了这些事!
zk 咋保证集群各副本的数据一致性?
zookeeper 基于 ZAB 协议 实现了一种主备模式的系统架构来保证集群各副本的数据一致性!
ZAB 协议:为 Zookeeper 专门设计的一种支持 崩溃恢复 和 原子广播 的协议!
客户端发送请求到zk服务端时,所有客户端写入数据都是写入到Leader节点(如果请求被发到 follower 节点,会被转发到 leader 节点),由leader节点复制到其他节点上,从而保证数据一致性;
从leader复制到follower的过程有点类似 两阶段提交(2PC),leader节点发送数据包给其他follower节点,follower节点确认接收后返回ACK,如果follow返回的ACK + leader本身的ACK,超过所有投票节点的一半,这个请求就会被commit,并写入zk内存中!
zk 原子广播详细流程
ZAB 协议的消息广播过程使用的是一个原子广播协议,类似一个 两阶段提交过程
源码分析是按代码顺序分析的,【】为本文的源码研读内容目录编号,可以根据业务逻辑跳转学习
- 客户端连接服务端时,所有客户端写入数据都是写入到Leader节点(如果请求被发到 follower 节点,会被转发到 leader 节点);
【源码研读目录:一 <1、2> + 2.2.1 】- leader 接收到请求,将请求封装成一个事务 proposal ,并发送给所有 follower 节点;
【源码研读目录:2.1.1 + 2.1.2 + 2.1.3<2.1.3.1、2.1.3.2>】- leader 将数据写入自身的本地文件;
【源码研读目录:2.1.3.3】- leader 给自己记一次ACK确认;
【源码研读目录:2.1.3.3】- follower 接收到 leader 发送的 proposal 后,将数据写入自身的本地文件中;
【源码研读目录:2.2.1 + 2.2.2 + 2.2.3】- follower 写入成功后给 leader 节点发送一个ACK确认请求;
【源码研读目录:2.2.5】- leader 接收到ACK确认请求时,判断是否收到了过半的投票节点(leader+follower)的ACK确认请求;如果过半,发送commit请求给follower节点;
【源码研读目录:2.1.4】- leader 节点将数据写入 observer 节点保存;
【源码研读目录:2.1.4】- leader 节点将数据写入内存中;
【源码研读目录:2.1.4】- follower 节点接收到 leader 的 commit 请求后,将数据写入内存;
【源码研读目录:2.2.2 + 2.2.3】;
测试类
知道基本流程后,我们从如下单元测试方法开始分析源码,看看 zookeeper 从客户端发送create消息到节点创建到底怎么实现的!
@Test
public void test() throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper("127.0.0.1",2183, this);
zk.create("/foo_2", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
一、客户端发送请求
1. 初始化 zk
ZooKeeper zk = new ZooKeeper("127.0.0.1",2183, this);
👇👇👇
public ZooKeeper(····) throws IOException {
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this.clientConfig,
watcher,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
zookeeper 初始化时,通过调用 createConnection初始化了一个 zk 连接对象ClientCnxn
,ClientCnxn在初始化时 new 了两个线程(SendThread、EventThread);
并在 cnxn.start() 启动这两个线程;
this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
2. 发送请求
发送过程
zookeeper 将create 方法的参数封装成 Request ;
zk.create("/foo_2", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 👇👇👇 public String create(···) throws KeeperException, InterruptedException { // 封装 CreateRequest RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); // 通过连接器发送 CreateRequest ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
通过 submitRequest() 调用 queuePacket() 将 Request 加入到 ClientCnxn.outgoingQueue 队列中;
向管道写一个字节,从而唤醒阻塞在selector上的线程,该字节只为触发 netty/nio 的写事件,从而让 sendThread() 读取队列中的数据包【zk 底层使用netty/Nio通信,如果没有事件发生,netty/Nio 会阻塞在selector上等待事件发生,此处发送一个空字节,就是为了唤醒netty/Nio的selector】
public ReplyHeader submitRequest(···) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h,r,request,response, null,null,null,null, watchRegistration,watchDeregistration); // 省略非主线代码 return r; } 👇👇👇 public Packet queuePacket(···) { Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; synchronized (state) { // 省略部分代码 // 将 Request 加入到 outgoingQueue 队列中; outgoingQueue.add(packet); } // 向channel 中写一个空字节,唤醒 SendThread 中的 selector sendThread.getClientCnxnSocket().packetAdded(); return packet; }
zookeeper 初始化中生成的 SendThread 线程会从这个队列中读取 Request 发送到服务端;
SendThread.run() 方法中有比较多的分支代码,这里我们主要看这两个关键方法;
clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
在2中我们发送的请求被加入到 outgoingQueue 队列中,这个队列时属于 ClientCnxn 对象;
introduce 方法将这个队列赋值给 clientCnxnSocket,以便后续在 doTransport 方法中调用;clientCnxnSocket.introduce(this, sessionId, outgoingQueue); 👇👇👇 void introduce(ClientCnxn.SendThread sendThread, long sessionId, LinkedBlockingDeque<Packet> outgoingQueue) { this.sendThread = sendThread; this.sessionId = sessionId; this.outgoingQueue = outgoingQueue; }
doTransport 方法,是真正将请求通过 Netty/NIO 写给服务端的方法;(以下是ClientCnxnSocketNetty源码)
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 👇👇👇 void doTransport(int waitTimeOut,Queue<Packet> pendingQueue,ClientCnxn cnxn) throws IOException, InterruptedException { try { // 省略非主线代码 if (head != null) { doWrite(pendingQueue, head, cnxn); } } finally { updateNow(); } } 👇👇👇 private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) { updateNow(); boolean anyPacketsSent = false; while (true) { if (p != WakeupPacket.getInstance()) { if ((p.requestHeader != null) && (p.requestHeader.getType() != ZooDefs.OpCode.ping) && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) { // 给请求设置 Xid p.requestHeader.setXid(cnxn.getXid()); synchronized (pendingQueue) { pendingQueue.add(p); } } // 发送请求 sendPktOnly(p); anyPacketsSent = true; } } }
最终通过Netty的 channel 将数据发送到客户端
二、服务端接收请求
1. 如何接收请求?
在# 集群模式 zookeeper 启动时做了这些事!这篇文章中我们说过 zookeeper 服务端在启动时,初始化并启动了一个Netty服务器【NettyServerCnxnFactory】,用于接收客户端/其他节点的请求【Netty 处理请求类:CnxnChannelHandler】
- zk客户端发送的请求,通过服务端 CnxnChannelHandler.channelRead() 方法,通过cnxn.processMessage((ByteBuf) msg) → receiveMessage(buf) → zks.processPacket(this, bb) → submitRequest(si) → enqueueRequest(si) → requestThrottler.submitRequest(si);
以上这段代码主要是对request的解析并没有过于复杂的逻辑,这边只做记录就不赘述了; - 服务端获取请求之后,会通过 requestThrottler.submitRequest(si) 方法将解析出来的请求加入到 RequestThrottler.submittedRequests ;
public void submitRequest(Request request) { if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { request.requestThrottleQueueTime = Time.currentElapsedTime(); submittedRequests.add(request); } }
- RequestThrottler 线程的 run 方法会从2中设置的submittedRequests队列中获取请求,并通过submitRequestNow方法,将请求提交给zk请求处理责任链RequestProcessor RequestThrottler 线程:zk服务端在选举完 leader 之后,leader / follower节点 分别在 leader.lead() / follower.followLeader()方法中调用 zk.startup() 实例化并启动 RequestThrottler 线程;
public void run() { try { while (true) { // 省略非主线代码 Request request = submittedRequests.take(); if (request != null) { // 省略非主线代码 zks.submitRequestNow(request); } } } catch (InterruptedException e) {} }
2. 如何处理请求?
zookeeper 在接收到请求后,将请求提交到责任链RequestProcessor处理;且不同属性的节点(leader/follower)实例化了不同的责任链;
zk服务端在选举完 leader 之后,leader / follower节点 分别在 leader.lead() / follower.followLeader()方法中调用 **zk.startup()**,startup()调用了 ZooKeeperServer.setupRequestProcessors() ;
ZooKeeperServer 根据节点不同存在不同实现;
2.1_ leader 节点责任链接收请求如何处理?
LeaderZooKeeperServer
👇👇👇
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
leader 节点构建了以上责任链,我们来看看责任链的每个节点都做了什么;
2.1.1_ LeaderRequestProcessor
LeaderRequestProcessor 的任务比较简单:
判断请求会话是否过期,过期的话抛出异常;
调用责任链的下一个方法;
public void processRequest(Request request) throws RequestProcessorException { if (!lzks.authWriteRequest(request)) { return; } Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { } catch (IOException ie) {} if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } nextProcessor.processRequest(request); }
2.1.2_ PrepRequestProcessor
责任链的 processRequest 方法,只是将请求加入到 submittedRequests 队列中;public void processRequest(Request request) { request.prepQueueStartTime = Time.currentElapsedTime(); submittedRequests.add(request); ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1); }
但是PrepRequestProcessor 本质是一个 Thread,在创建责任链时,启动了该线程;
PrepRequestProcessor.run() 将从 submittedRequests 队列中获取请求并处理;
public void run() {
try {
while (true) {
// 省略了部分非主线代码
Request request = submittedRequests.take();
request.prepStartTime = Time.currentElapsedTime();
pRequest(request);
}
} catch (Exception e) {}
}
👇👇👇
protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
// 根据请求类型的不同(create/delete/update),对请求进行封装和特殊处理
// 最终将请求放入zks.outstandingChanges
pRequestHelper(request);
}
// 给请求设置一个 zxid
request.zxid = zks.getZxid();
// 调用下一个接口
nextProcessor.processRequest(request);
}
pRequestHelper:
- 根据请求类型的不同(create/delete/update),对请求进行封装
- 校验请求是否合理:未定义的请求、参数不合理
- 检查上级路径是否存在
- 检查ACL
- 检查路径是否合法
- 将请求装入 outstandingChanges 队列
然后给请求设置Zxid,并调用下一个处理器2.1.3_ proposalProcessor (重点*)
这个处理器是zookeeper数据文件同步和本地事务写入的关键(可以理解为两阶段提交的第一阶段) - 调用责任链的下一个处理器;
- 通过 LearnerHandler 将请求发送到 follower 节点
- 启用 SyncRequestProcessor -> AckRequestProcessor 责任链将数据写入文件系统,并判断是否半数以上节点返回ACK确认消息,如果超过半数,则提交数据写入内存;
public void processRequest(Request request) throws RequestProcessorException {
// 判断是否是leader的同步请求
// 由于本次我们客户端直接发送消息到leader节点,
// 所以 request instanceof LearnerSyncRequest = false;
if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
// 是否需要调用下一个处理器
if (shouldForwardToNextProcessor(request)) {
nextProcessor.processRequest(request);
}
if (request.getHdr() != null) {
try {
// 处理请求
zks.getLeader().propose(request);
} catch (XidRolloverException e) {}
// 写本地文件同步
syncProcessor.processRequest(request);
}
}
}
2.1.3.1_ shouldForwardToNextProcessor(request)
shouldForwardToNextProcessor(request) 默认情况下为true;
// forwardLearnerRequestsToCommitProcessorDisabled 默认为false,
// 需要配置 zookeeper.forward_learner_requests_to_commit_processor_disabled = true 才能更改
private boolean shouldForwardToNextProcessor(Request request) {
if (!forwardLearnerRequestsToCommitProcessorDisabled) {
return true;
}
if (request.getOwner() instanceof LearnerHandler) {
ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.add(1);
return false;
}
return true;
}
nextProcessor.processRequest(request)调用下一个 CommitProcessor 处理器;
2.1.3.2_ zks.getLeader().propose(request);
通过 LearnerHandler 将请求发送到 follower 节点,并且将请求加入到 outstandingProposals 队列中;
将请求封装成 QuorumPacket,并通过循环将请求发送到 LearnerHandler.queuedPackets 队列中等待处理;
并将请求加入到 outstandingProposals 队列中
public Proposal propose(Request request) throws XidRolloverException { byte[] data = SerializeUtils.serializeRequest(request); proposalStats.setLastBufferSize(data.length); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized (this) { p.addQuorumVerifier(self.getQuorumVerifier()); // 省略部分非主线代码 lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p); sendPacket(pp); } return p; } 👇👇👇 void sendPacket(QuorumPacket qp) { synchronized (forwardingFollowers) { for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); } } } 👇👇👇 void queuePacket(QuorumPacket p) { queuedPackets.add(p); }
LearnerHandler 是一个线程,LearnerHandler.run() 方法中调用了 startSendingPackets() 将数据从 LearnerHandler.queuedPackets 队列取出,并发送到 follower 节点;
protected void startSendingPackets() {
if (!sendingThreadStarted) {
new Thread() {
public void run() {
try {
// 发送请求数据包到follower节点
sendPackets();
} catch (InterruptedException e) {}
}
}.start();
sendingThreadStarted = true;
}
}
👇👇👇
private void sendPackets() throws InterruptedException {
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
}
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
// 省略部分非主线代码
if (p.getZxid() > 0) {
lastZxid = p.getZxid();
}
oa.writeRecord(p, "packet");
packetsSent.incrementAndGet();
messageTracker.trackSent(p.getType());
} catch (IOException e) {}
}
}
2.1.3.3 syncProcessor.processRequest(request);
启用 SyncRequestProcessor -> AckRequestProcessor 责任链;
SyncRequestProcessor 将数据写入文件系统;
processRequest 方法将请求写入到 queuedRequests 队列中
public void processRequest(final Request request) { queuedRequests.add(request); }
SyncRequestProcessor.run();
zks.getZKDatabase().append(si) 将数据写入快照对象;
zks.takeSnapshot() 将数据序列化到zk底层的DataTree对象中;
flush() 中有个commit方法,真正将数据写入文件;然后调用下一个处理器 AckRequestProcessor;
// 省略大量非主线代码 public void run() { try { while (true) { Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); // zks.getZKDatabase().append(si) 把请求append 到快照对象中 if (!si.isThrottled() && zks.getZKDatabase().append(si)) { if (shouldSnapshot()) { resetSnapshotStats(); zks.getZKDatabase().rollLog(); if (!snapThreadMutex.tryAcquire()) { } else { new ZooKeeperThread("Snapshot Thread") { public void run() { try { // 这里将数据序列化到zk底层的DataTree对象中 zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { snapThreadMutex.release(); } } }.start(); } } } toFlush.add(si); if (shouldFlush()) { flush(); } } } } 👇👇👇 private void flush() throws IOException, RequestProcessorException { // 提交写快照文件 zks.getZKDatabase().commit(); if (this.nextProcessor == null) { this.toFlush.clear(); } else { while (!this.toFlush.isEmpty()) { final Request i = this.toFlush.remove(); this.nextProcessor.processRequest(i); } if (this.nextProcessor instanceof Flushable) { ((Flushable) this.nextProcessor).flush(); } } }
AckRequestProcessor 判断是否半数以上节点返回ACK确认消息,如果超过半数,则提交数据写入内存
public void processRequest(Request request) { QuorumPeer self = leader.self; if (self != null) { leader.processAck(self.getId(), request.zxid, null); } else {} }
从过程 2 设置的 outstandingProposals 中获取请求,尝试提交;
判断收到的ACK消息是否超过集群节点的一半;
如果超过一半则提交;
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) { Proposal p = outstandingProposals.get(zxid); p.addAck(sid); // 尝试提交 boolean hasCommitted = tryToCommit(p, zxid, followerAddr); if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) { long curZxid = zxid; while (allowedToCommit && hasCommitted && p != null) { curZxid++; p = outstandingProposals.get(curZxid); if (p != null) { hasCommitted = tryToCommit(p, curZxid, null); } } } } 👇👇👇 public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { // 判断收到的ACK消息是否超过集群节点的一半 // 由于follower节点还没有返回ACK消息所以此处直接返回; if (!p.hasAllQuorums()) { return false; } outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { } else if (p.request.getHdr().getType() == OpCode.reconfig) { // 省略部分非本次主线代码 } else { p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY); commit(zxid); inform(p); } // 将请求发送到 commitProcessor 的 committedRequests 队列中; zk.commitProcessor.commit(p.request); return true; }
此时由于 follower 节点收到 proposal 请求,但是还没有返回ACK消息,所以 tryToCommit 返回false;这边逻辑也就结束了;等待 follower 节点返回ACK后会再次调用该方法;
如果 follower 节点+ leader 节点 超过半数,则 p.hasAllQuorums() = true,后续逻辑继续执行,我们发送的是create 请求,所以会执行 commit(zxid); inform(p) 以及 zk.commitProcessor.commit(p.request);
commit(zxid):leader 会发送一个 commit 请求,携带zxid,到 follower 节点,follower节点接收到请求时,会将之前在2. zks.getLeader().propose(request); 中 leader 发送给 follower 节点的数据写入到自己的内存中(该逻辑在后续follower节点处理过程中会讲到)
public void commit(long zxid) { synchronized (this) { lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); ServerMetrics.getMetrics().COMMIT_COUNT.add(1); }
inform(p):会将请求发送到obsever节点保存起来;
public void inform(Proposal proposal) { QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null); sendObserverPacket(qp); }
zk.commitProcessor.commit(p.request):将请求发送到 commitProcessor 的 committedRequests 队列中;该队列会在 commitProcessor的 run 方法中消费;
2.1.4_ CommitProcessor
总的来说 CommitProcessor 功能比较简单,CommitProcessor 接收到 leader 认为可以提交的请求,将请求进行简单处理后,转发给下一个处理器;
但是由于 zk 追求效率,代码中存在大量的线程和队列,导致比较难读;
LearnerHandler 线程的 run 方法 中存在一个死循环,无限等待接收 SendAckRequestProcessor 发送的 ACK 消息;
当接收到 ACK 请求时,会调用 processAck(); 后续逻辑与 2.1.3.3 中的 AckRequestProcessor 处理器相同;
while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType()); ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { LOG.debug("Received ACK from Observer {}", this.sid); } syncLimitCheck.updateAck(qp.getZxid()); learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; // 省略其他不相干的 case } }
在半数以上的节点接收到请求并发送 ACK 后,会将请求通过 zk.commitProcessor.commit(p.request); 写入到 CommitProcessor.committedRequests 队列中;
CommitProcessor.processRequest();
将请求加入 queuedRequests 和 queuedWriteRequests 队列
public void processRequest(Request request) { queuedRequests.add(request); // 如果是写请求 if (needCommit(request)) { queuedWriteRequests.add(request); numWriteQueuedRequests.incrementAndGet(); } else { numReadQueuedRequests.incrementAndGet(); } wakeup(); }
CommitProcessor 也是一个线程,run() 方法比较长,篇幅原因直接给出大致逻辑;
- 队列为空,线程 wait();【CommitProcessor.processRequest() 中的 wakeup() 唤醒】
- 请求为写请求,将请求写入 pendingRequests 中;【类型Map<sessionId,Queue>】
否则直接调用下一个处理器; - 根据committedRequests 是否空,设置 commitIsWaiting 属性;
空 false,非空 true; - 从 committedRequests 和 queuedWriteRequests 中取出一个请求,判断是否是同一个请求;
是继续,否 break; - 从 pendingRequests 中根据 request.sessionId 获取一个队列,并从该队列头部取出一个 request;
- 最后通过 processWrite(request) 将请求发送给下一个处理器;
2.1.5_ ToBeAppliedRequestProcessor
该处理器主要功能就是,将处理过的请求从队列 toBeApplied 中删除;并调用下一个处理器;
toBeApplied 队列中的请求是在 tryCommit() 方法中,leader判断可以提交后写入的;
public void processRequest(Request request) throws RequestProcessorException {
// 调用下一个处理器
next.processRequest(request);
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator<Proposal> iter = leader.toBeApplied.iterator();
if (iter.hasNext()) {
Proposal p = iter.next();
if (p.request != null && p.request.zxid == zxid) {
iter.remove();
return;
}
}
}
}
2.1.6_ FinalRequestProcessor
将请求写入到zk 自带的数据结构 DataTree 中,并根据请求类型返回不一样的响应内容;
2.2_ follower 节点责任链接收请求如何处理?
follower 节点初始化了两个调用链;
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
2.2.1_ FollowerRequestProcessor
FollowerRequestProcessor.processRequest() 将请求写入 queuedRequests 队列中
void processRequest(Request request, boolean checkForUpgrade) { if (!finished) { if (checkForUpgrade) { Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { } catch (IOException ie) {} if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } } queuedRequests.add(request); } }
FollowerRequestProcessor.run()
follower 节点在调用下一个处理器前,会判断是否是 leader 节点发送过来的消息;如果不是,将请求转发给 leader 节点;否则调用下一个处理器 CommitProcessor
public void run() { try { while (!finished) { Request request = queuedRequests.take(); // 尝试调用下一个处理器 maybeSendRequestToNextProcessor(request); // 根据请求的类型,将请求转发给 leader switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (RuntimeException e) { } catch (Exception e) {} } 👇👇👇 private void maybeSendRequestToNextProcessor(Request request) throws RequestProcessorException { if (skipLearnerRequestToNextProcessor && request.isFromLearner()) { ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.add(1); } else { nextProcessor.processRequest(request); } }
2.2.2_ CommitProcessor
同 leader 节点的 CommitProcessor 一致
2.2.3_ FinalRequestProcessor
同 leader 节点的 FinalRequestProcessor 一致
2.2.4_ SyncRequestProcessor
同 leader 节点的 SyncRequestProcessor 一致
2.2.5_ SendAckRequestProcessor
leader 节点的AckRequestProcessor 是直接调用leader 节点的tryToCommit() 方法;
follower 节点 SendAckRequestProcessor 需要将Ack消息发送到leader节点进行统计处理;
public void processRequest(Request si) {
if (si.type != OpCode.sync) {
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
// 省略异常处理代码
}
}
}
最后
在线蹲赞环节
老板!点个赞呗!!