从客户端发送create消息到节点创建,源码分析


前言

本文较硬,结合zookeeper源码食用更佳哦!zookeeper 源码可以直接从github下载编译!

zookeeper 有两种通信方式:netty、Nio;本文是基于 Netty 源码进行的解读!

建议先阅读# 集群模式 zookeeper 启动时做了这些事!

zk 咋保证集群各副本的数据一致性?

zookeeper 基于 ZAB 协议 实现了一种主备模式的系统架构来保证集群各副本的数据一致性!

ZAB 协议:为 Zookeeper 专门设计的一种支持 崩溃恢复原子广播 的协议!

ZAB.png

客户端发送请求到zk服务端时,所有客户端写入数据都是写入到Leader节点(如果请求被发到 follower 节点,会被转发到 leader 节点),由leader节点复制到其他节点上,从而保证数据一致性;

从leader复制到follower的过程有点类似 两阶段提交(2PC),leader节点发送数据包给其他follower节点,follower节点确认接收后返回ACK,如果follow返回的ACK + leader本身的ACK,超过所有投票节点的一半,这个请求就会被commit,并写入zk内存中!

zk 原子广播详细流程

ZAB 协议的消息广播过程使用的是一个原子广播协议,类似一个 两阶段提交过程
zk 原子广播.png

源码分析是按代码顺序分析的,【】为本文的源码研读内容目录编号,可以根据业务逻辑跳转学习

  1. 客户端连接服务端时,所有客户端写入数据都是写入到Leader节点(如果请求被发到 follower 节点,会被转发到 leader 节点);
    【源码研读目录:一 <1、2> + 2.2.1 】
  2. leader 接收到请求,将请求封装成一个事务 proposal ,并发送给所有 follower 节点;
    【源码研读目录:2.1.1 + 2.1.2 + 2.1.3<2.1.3.1、2.1.3.2>】
  3. leader 将数据写入自身的本地文件;
    【源码研读目录:2.1.3.3】
  4. leader 给自己记一次ACK确认;
    【源码研读目录:2.1.3.3】
  5. follower 接收到 leader 发送的 proposal 后,将数据写入自身的本地文件中;
    【源码研读目录:2.2.1 + 2.2.2 + 2.2.3】
  6. follower 写入成功后给 leader 节点发送一个ACK确认请求;
    【源码研读目录:2.2.5】
  7. leader 接收到ACK确认请求时,判断是否收到了过半的投票节点(leader+follower)的ACK确认请求;如果过半,发送commit请求给follower节点;
    【源码研读目录:2.1.4】
  8. leader 节点将数据写入 observer 节点保存;
    【源码研读目录:2.1.4】
  9. leader 节点将数据写入内存中;
    【源码研读目录:2.1.4】
  10. follower 节点接收到 leader 的 commit 请求后,将数据写入内存;
    【源码研读目录:2.2.2 + 2.2.3】;

测试类

知道基本流程后,我们从如下单元测试方法开始分析源码,看看 zookeeper 从客户端发送create消息到节点创建到底怎么实现的!

@Test
public void test() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zk = new ZooKeeper("127.0.0.1",2183, this);
    zk.create("/foo_2", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

一、客户端发送请求

1. 初始化 zk

zk 客户端发送数据到服务端 (1).png

ZooKeeper zk = new ZooKeeper("127.0.0.1",2183, this);
👇👇👇
public ZooKeeper(····) throws IOException {
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this.clientConfig,
        watcher,
        getClientCnxnSocket(),
        canBeReadOnly);
    cnxn.start();
}

zookeeper 初始化时,通过调用 createConnection初始化了一个 zk 连接对象ClientCnxn
ClientCnxn在初始化时 new 了两个线程(SendThread、EventThread);

并在 cnxn.start() 启动这两个线程;

this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();

2. 发送请求

zk 客户端发送数据到服务端.png

发送过程

  1. zookeeper 将create 方法的参数封装成 Request ;

    zk.create("/foo_2", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    👇👇👇
    public String create(···) throws KeeperException, InterruptedException {
        // 封装 CreateRequest
        RequestHeader h = new RequestHeader();
        h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create);
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        request.setAcl(acl);
        // 通过连接器发送 CreateRequest
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }
    
  2. 通过 submitRequest() 调用 queuePacket() 将 Request 加入到 ClientCnxn.outgoingQueue 队列中;

  3. 向管道写一个字节,从而唤醒阻塞在selector上的线程,该字节只为触发 netty/nio 的写事件,从而让 sendThread() 读取队列中的数据包【zk 底层使用netty/Nio通信,如果没有事件发生,netty/Nio 会阻塞在selector上等待事件发生,此处发送一个空字节,就是为了唤醒netty/Nio的selector】

    public ReplyHeader submitRequest(···) throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h,r,request,response, null,null,null,null,
            watchRegistration,watchDeregistration);
        // 省略非主线代码
        return r;
    }
    👇👇👇
    public Packet queuePacket(···) {
        Packet packet = null;
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        synchronized (state) {
            // 省略部分代码 
            // 将 Request 加入到 outgoingQueue 队列中;
           outgoingQueue.add(packet);
        }
        // 向channel 中写一个空字节,唤醒 SendThread 中的 selector
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }
    
  4. zookeeper 初始化中生成的 SendThread 线程会从这个队列中读取 Request 发送到服务端;

    SendThread.run() 方法中有比较多的分支代码,这里我们主要看这两个关键方法;

    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
    

    2中我们发送的请求被加入到 outgoingQueue 队列中,这个队列时属于 ClientCnxn 对象;
    introduce 方法将这个队列赋值给 clientCnxnSocket,以便后续在 doTransport 方法中调用;

    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    👇👇👇
    void introduce(ClientCnxn.SendThread sendThread, long sessionId, LinkedBlockingDeque<Packet> outgoingQueue) {
        this.sendThread = sendThread;
        this.sessionId = sessionId;
        this.outgoingQueue = outgoingQueue;
    }
    

    doTransport 方法,是真正将请求通过 Netty/NIO 写给服务端的方法;(以下是ClientCnxnSocketNetty源码)

    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
    👇👇👇
    void doTransport(int waitTimeOut,Queue<Packet> pendingQueue,ClientCnxn cnxn) throws IOException, InterruptedException {
        try {
            // 省略非主线代码
            if (head != null) {
                doWrite(pendingQueue, head, cnxn);
            }
        } finally {
            updateNow();
        }
    }
    👇👇👇
    private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
        updateNow();
        boolean anyPacketsSent = false;
        while (true) {
            if (p != WakeupPacket.getInstance()) {
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != ZooDefs.OpCode.ping)
                    && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
                    // 给请求设置 Xid
                    p.requestHeader.setXid(cnxn.getXid());
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
                // 发送请求
                sendPktOnly(p);
                anyPacketsSent = true;
            }
        }
    }
    

    最终通过Netty的 channel 将数据发送到客户端
    image.png

二、服务端接收请求

1. 如何接收请求?

服务端接收消息.png
# 集群模式 zookeeper 启动时做了这些事!这篇文章中我们说过 zookeeper 服务端在启动时,初始化并启动了一个Netty服务器【NettyServerCnxnFactory】,用于接收客户端/其他节点的请求【Netty 处理请求类:CnxnChannelHandler】

  1. zk客户端发送的请求,通过服务端 CnxnChannelHandler.channelRead() 方法,通过cnxn.processMessage((ByteBuf) msg) → receiveMessage(buf) → zks.processPacket(this, bb) → submitRequest(si) → enqueueRequest(si) → requestThrottler.submitRequest(si);
    以上这段代码主要是对request的解析并没有过于复杂的逻辑,这边只做记录就不赘述了;
  2. 服务端获取请求之后,会通过 requestThrottler.submitRequest(si) 方法将解析出来的请求加入到 RequestThrottler.submittedRequests ;
    public void submitRequest(Request request) {
        if (stopping) {
            LOG.debug("Shutdown in progress. Request cannot be processed");
            dropRequest(request);
        } else {
            request.requestThrottleQueueTime = Time.currentElapsedTime();
            submittedRequests.add(request);
        }
    }
    
  3. RequestThrottler 线程的 run 方法会从2中设置的submittedRequests队列中获取请求,并通过submitRequestNow方法,将请求提交给zk请求处理责任链RequestProcessor RequestThrottler 线程:zk服务端在选举完 leader 之后,leader / follower节点 分别在 leader.lead() / follower.followLeader()方法中调用 zk.startup() 实例化并启动 RequestThrottler 线程;
    public void run() {
        try {
            while (true) {
                // 省略非主线代码
                Request request = submittedRequests.take();
                if (request != null) {
                    // 省略非主线代码
                    zks.submitRequestNow(request);
                }
            }
        } catch (InterruptedException e) {}
    }
    

2. 如何处理请求?

zookeeper 在接收到请求后,将请求提交到责任链RequestProcessor处理;且不同属性的节点(leader/follower)实例化了不同的责任链;

zk服务端在选举完 leader 之后,leader / follower节点 分别在 leader.lead() / follower.followLeader()方法中调用 **zk.startup()**,startup()调用了 ZooKeeperServer.setupRequestProcessors() ;

ZooKeeperServer 根据节点不同存在不同实现;
image.png

2.1_ leader 节点责任链接收请求如何处理?

LeaderZooKeeperServer
👇👇👇
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    setupContainerManager();
}

leader责任链.png

leader 节点构建了以上责任链,我们来看看责任链的每个节点都做了什么;

2.1.1_ LeaderRequestProcessor

LeaderRequestProcessor 的任务比较简单:

  1. 判断请求会话是否过期,过期的话抛出异常;

  2. 调用责任链的下一个方法;

    public void processRequest(Request request) throws RequestProcessorException {
     if (!lzks.authWriteRequest(request)) {
         return;
     }
     Request upgradeRequest = null;
     try {
         upgradeRequest = lzks.checkUpgradeSession(request);
     } catch (KeeperException ke) {
     } catch (IOException ie) {}
     if (upgradeRequest != null) {
         nextProcessor.processRequest(upgradeRequest);
     }
    
     nextProcessor.processRequest(request);
    }
    
    2.1.2_ PrepRequestProcessor

    未命名文件 (2).png
    责任链的 processRequest 方法,只是将请求加入到 submittedRequests 队列中;

    public void processRequest(Request request) {
     request.prepQueueStartTime = Time.currentElapsedTime();
     submittedRequests.add(request);
     ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
    }
    

    但是PrepRequestProcessor 本质是一个 Thread,在创建责任链时,启动了该线程;

PrepRequestProcessor.run() 将从 submittedRequests 队列中获取请求并处理;

public void run() {
    try {
        while (true) {
            // 省略了部分非主线代码
            Request request = submittedRequests.take();
            request.prepStartTime = Time.currentElapsedTime();
            pRequest(request);
        }
    } catch (Exception e) {}
}
👇👇👇
protected void pRequest(Request request) throws RequestProcessorException {
    request.setHdr(null);
    request.setTxn(null);
    if (!request.isThrottled()) {
      // 根据请求类型的不同(create/delete/update),对请求进行封装和特殊处理
      // 最终将请求放入zks.outstandingChanges
      pRequestHelper(request);
    }
    // 给请求设置一个 zxid 
    request.zxid = zks.getZxid();
    // 调用下一个接口
    nextProcessor.processRequest(request);
}

pRequestHelper:

  1. 根据请求类型的不同(create/delete/update),对请求进行封装
  2. 校验请求是否合理:未定义的请求、参数不合理
  3. 检查上级路径是否存在
  4. 检查ACL
  5. 检查路径是否合法
  6. 将请求装入 outstandingChanges 队列
    然后给请求设置Zxid,并调用下一个处理器
    2.1.3_ proposalProcessor (重点*)
    这个处理器是zookeeper数据文件同步和本地事务写入的关键(可以理解为两阶段提交的第一阶段)
    ProposalRequestProcessor (2).png
  7. 调用责任链的下一个处理器;
  8. 通过 LearnerHandler 将请求发送到 follower 节点
  9. 启用 SyncRequestProcessor -> AckRequestProcessor 责任链将数据写入文件系统,并判断是否半数以上节点返回ACK确认消息,如果超过半数,则提交数据写入内存;
public void processRequest(Request request) throws RequestProcessorException {
    // 判断是否是leader的同步请求
    // 由于本次我们客户端直接发送消息到leader节点,
    // 所以 request instanceof LearnerSyncRequest = false;
    if (request instanceof LearnerSyncRequest) {
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        // 是否需要调用下一个处理器
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request);
        }
        if (request.getHdr() != null) {
            try {
                // 处理请求
                zks.getLeader().propose(request);
            } catch (XidRolloverException e) {}
            // 写本地文件同步
            syncProcessor.processRequest(request);
        }
    }
}
2.1.3.1_ shouldForwardToNextProcessor(request)

shouldForwardToNextProcessor(request) 默认情况下为true;

// forwardLearnerRequestsToCommitProcessorDisabled 默认为false, 
// 需要配置 zookeeper.forward_learner_requests_to_commit_processor_disabled = true 才能更改
private boolean shouldForwardToNextProcessor(Request request) {
    if (!forwardLearnerRequestsToCommitProcessorDisabled) {
        return true;
    }
    if (request.getOwner() instanceof LearnerHandler) {
        ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.add(1);
        return false;
    }
    return true;
}

nextProcessor.processRequest(request)调用下一个 CommitProcessor 处理器;

2.1.3.2_ zks.getLeader().propose(request);

通过 LearnerHandler 将请求发送到 follower 节点,并且将请求加入到 outstandingProposals 队列中;

  • 将请求封装成 QuorumPacket,并通过循环将请求发送到 LearnerHandler.queuedPackets 队列中等待处理;

    并将请求加入到 outstandingProposals 队列中

    public Proposal propose(Request request) throws XidRolloverException {
      byte[] data = SerializeUtils.serializeRequest(request);
      proposalStats.setLastBufferSize(data.length);
      QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
      Proposal p = new Proposal();
      p.packet = pp;
      p.request = request;
    
      synchronized (this) {
          p.addQuorumVerifier(self.getQuorumVerifier());
          // 省略部分非主线代码
          lastProposed = p.packet.getZxid();
          outstandingProposals.put(lastProposed, p);
          sendPacket(pp);
      }
      return p;
    }
    👇👇👇
    void sendPacket(QuorumPacket qp) {
      synchronized (forwardingFollowers) {
          for (LearnerHandler f : forwardingFollowers) {
              f.queuePacket(qp);
          }
      }
    }
    👇👇👇
    void queuePacket(QuorumPacket p) {
      queuedPackets.add(p);
    }
    
  • LearnerHandler 是一个线程,LearnerHandler.run() 方法中调用了 startSendingPackets() 将数据从 LearnerHandler.queuedPackets 队列取出,并发送到 follower 节点;

protected void startSendingPackets() {
    if (!sendingThreadStarted) {
        new Thread() {
            public void run() {
                try {
                    // 发送请求数据包到follower节点
                    sendPackets();
                } catch (InterruptedException e) {}
            }
        }.start();
        sendingThreadStarted = true;
    }
}
👇👇👇
private void sendPackets() throws InterruptedException {
    while (true) {
        try {
            QuorumPacket p;
            p = queuedPackets.poll();
            if (p == null) {
                bufferedOutput.flush();
                p = queuedPackets.take();
            }
            if (p.getType() == Leader.PROPOSAL) {
                syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
            }
            // 省略部分非主线代码
            if (p.getZxid() > 0) {
                lastZxid = p.getZxid();
            }
            oa.writeRecord(p, "packet");
            packetsSent.incrementAndGet();
            messageTracker.trackSent(p.getType());
        } catch (IOException e) {}
    }
}
2.1.3.3 syncProcessor.processRequest(request);

启用 SyncRequestProcessor -> AckRequestProcessor 责任链;

  • SyncRequestProcessor 将数据写入文件系统;

    • processRequest 方法将请求写入到 queuedRequests 队列中

      public void processRequest(final Request request) {
      queuedRequests.add(request);
      }
      
    • SyncRequestProcessor.run();

      zks.getZKDatabase().append(si) 将数据写入快照对象;

      zks.takeSnapshot() 将数据序列化到zk底层的DataTree对象中;

      flush() 中有个commit方法,真正将数据写入文件;然后调用下一个处理器 AckRequestProcessor;

      // 省略大量非主线代码
      public void run() {
          try {
              while (true) {
                  Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                  // zks.getZKDatabase().append(si) 把请求append 到快照对象中
                  if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                      if (shouldSnapshot()) {
                          resetSnapshotStats();
                          zks.getZKDatabase().rollLog();
                          if (!snapThreadMutex.tryAcquire()) {
                          } else {
                              new ZooKeeperThread("Snapshot Thread") {
                                  public void run() {
                                      try {
                                          // 这里将数据序列化到zk底层的DataTree对象中
                                          zks.takeSnapshot();
                                      } catch (Exception e) {
                                          LOG.warn("Unexpected exception", e);
                                      } finally {
                                          snapThreadMutex.release();
                                      }
                                  }
                              }.start();
                          }
                      }
                  }
                  toFlush.add(si);
                  if (shouldFlush()) {
                      flush();
                  }
              }
          }
      }
      👇👇👇
      private void flush() throws IOException, RequestProcessorException {
          // 提交写快照文件
          zks.getZKDatabase().commit();
          if (this.nextProcessor == null) {
              this.toFlush.clear();
          } else {
              while (!this.toFlush.isEmpty()) {
                  final Request i = this.toFlush.remove();
                  this.nextProcessor.processRequest(i);
              }
              if (this.nextProcessor instanceof Flushable) {
                  ((Flushable) this.nextProcessor).flush();
              }
          }
      }
      
  • AckRequestProcessor 判断是否半数以上节点返回ACK确认消息,如果超过半数,则提交数据写入内存

    public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if (self != null) {
            leader.processAck(self.getId(), request.zxid, null);
        } else {}
    }
    

    从过程 2 设置的 outstandingProposals 中获取请求,尝试提交;

    判断收到的ACK消息是否超过集群节点的一半;

    如果超过一半则提交;

    public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
        Proposal p = outstandingProposals.get(zxid);
        p.addAck(sid);
        // 尝试提交
        boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
        if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
            long curZxid = zxid;
            while (allowedToCommit && hasCommitted && p != null) {
                curZxid++;
                p = outstandingProposals.get(curZxid);
                if (p != null) {
                    hasCommitted = tryToCommit(p, curZxid, null);
                }
            }
        }
    }
    👇👇👇
    public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
        // 判断收到的ACK消息是否超过集群节点的一半
        // 由于follower节点还没有返回ACK消息所以此处直接返回;
        if (!p.hasAllQuorums()) {
            return false;
        }
        outstandingProposals.remove(zxid);
        if (p.request != null) {
            toBeApplied.add(p);
        }
        if (p.request == null) {
        } else if (p.request.getHdr().getType() == OpCode.reconfig) {
            // 省略部分非本次主线代码
        } else {
            p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
            commit(zxid);
            inform(p);
        }
        // 将请求发送到 commitProcessor 的 committedRequests 队列中;
        zk.commitProcessor.commit(p.request);
        return true;
    }
    

    此时由于 follower 节点收到 proposal 请求,但是还没有返回ACK消息,所以 tryToCommit 返回false;这边逻辑也就结束了;等待 follower 节点返回ACK后会再次调用该方法;

    如果 follower 节点+ leader 节点 超过半数,则 p.hasAllQuorums() = true,后续逻辑继续执行,我们发送的是create 请求,所以会执行 commit(zxid); inform(p) 以及 zk.commitProcessor.commit(p.request);

    commit(zxid):leader 会发送一个 commit 请求,携带zxid,到 follower 节点,follower节点接收到请求时,会将之前在2. zks.getLeader().propose(request); 中 leader 发送给 follower 节点的数据写入到自己的内存中(该逻辑在后续follower节点处理过程中会讲到)

    public void commit(long zxid) {
        synchronized (this) {
            lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        sendPacket(qp);
        ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
    }
    

    inform(p):会将请求发送到obsever节点保存起来;

    public void inform(Proposal proposal) {
        QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, 
            proposal.packet.getData(), null);
        sendObserverPacket(qp);
    }
    

    zk.commitProcessor.commit(p.request):将请求发送到 commitProcessor 的 committedRequests 队列中;该队列会在 commitProcessor的 run 方法中消费;

    2.1.4_ CommitProcessor

    CommitProcessor.png
    总的来说 CommitProcessor 功能比较简单,CommitProcessor 接收到 leader 认为可以提交的请求,将请求进行简单处理后,转发给下一个处理器;

但是由于 zk 追求效率,代码中存在大量的线程和队列,导致比较难读;

  1. LearnerHandler 线程的 run 方法 中存在一个死循环,无限等待接收 SendAckRequestProcessor 发送的 ACK 消息;

    当接收到 ACK 请求时,会调用 processAck(); 后续逻辑与 2.1.3.3 中的 AckRequestProcessor 处理器相同;

    while (true) {
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        messageTracker.trackReceived(qp.getType());
        ByteBuffer bb;
        long sessionId;
        int cxid;
        int type;
        switch (qp.getType()) {
        case Leader.ACK:
            if (this.learnerType == LearnerType.OBSERVER) {
                LOG.debug("Received ACK from Observer {}", this.sid);
            }
            syncLimitCheck.updateAck(qp.getZxid());
            learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
            break;
        // 省略其他不相干的 case
        }
    }
    

    在半数以上的节点接收到请求并发送 ACK 后,会将请求通过 zk.commitProcessor.commit(p.request); 写入到 CommitProcessor.committedRequests 队列中;

  2. CommitProcessor.processRequest();

    将请求加入 queuedRequests 和 queuedWriteRequests 队列

    public void processRequest(Request request) {
        queuedRequests.add(request);
        // 如果是写请求
        if (needCommit(request)) {
            queuedWriteRequests.add(request);
            numWriteQueuedRequests.incrementAndGet();
        } else {
            numReadQueuedRequests.incrementAndGet();
        }
        wakeup();
    }
    
  3. CommitProcessor 也是一个线程,run() 方法比较长,篇幅原因直接给出大致逻辑;

    1. 队列为空,线程 wait();【CommitProcessor.processRequest() 中的 wakeup() 唤醒】
    2. 请求为写请求,将请求写入 pendingRequests 中;【类型Map<sessionId,Queue>】
      否则直接调用下一个处理器;
    3. 根据committedRequests 是否空,设置 commitIsWaiting 属性;
      空 false,非空 true;
    4. 从 committedRequests 和 queuedWriteRequests 中取出一个请求,判断是否是同一个请求;
      是继续,否 break;
    5. 从 pendingRequests 中根据 request.sessionId 获取一个队列,并从该队列头部取出一个 request;
    6. 最后通过 processWrite(request) 将请求发送给下一个处理器;
2.1.5_ ToBeAppliedRequestProcessor

该处理器主要功能就是,将处理过的请求从队列 toBeApplied 中删除;并调用下一个处理器;

toBeApplied 队列中的请求是在 tryCommit() 方法中,leader判断可以提交后写入的;

public void processRequest(Request request) throws RequestProcessorException {
    // 调用下一个处理器
    next.processRequest(request);
    if (request.getHdr() != null) {
        long zxid = request.getHdr().getZxid();
        Iterator<Proposal> iter = leader.toBeApplied.iterator();
        if (iter.hasNext()) {
            Proposal p = iter.next();
            if (p.request != null && p.request.zxid == zxid) {
                iter.remove();
                return;
            }
        }
    }
}
2.1.6_ FinalRequestProcessor

将请求写入到zk 自带的数据结构 DataTree 中,并根据请求类型返回不一样的响应内容;

2.2_ follower 节点责任链接收请求如何处理?

follower 节点初始化了两个调用链;
未命名文件 (3).png

未命名文件 (4).png

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}
2.2.1_ FollowerRequestProcessor
  1. FollowerRequestProcessor.processRequest() 将请求写入 queuedRequests 队列中

    void processRequest(Request request, boolean checkForUpgrade) {
        if (!finished) {
            if (checkForUpgrade) {
                Request upgradeRequest = null;
                try {
                    upgradeRequest = zks.checkUpgradeSession(request);
                } catch (KeeperException ke) {
                } catch (IOException ie) {}
                if (upgradeRequest != null) {
                    queuedRequests.add(upgradeRequest);
                }
            }
            queuedRequests.add(request);
        }
    }
    
  2. FollowerRequestProcessor.run()

    follower 节点在调用下一个处理器前,会判断是否是 leader 节点发送过来的消息;如果不是,将请求转发给 leader 节点;否则调用下一个处理器 CommitProcessor

    public void run() {
        try {
            while (!finished) {
                Request request = queuedRequests.take();
                // 尝试调用下一个处理器
                maybeSendRequestToNextProcessor(request);
                // 根据请求的类型,将请求转发给 leader
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    if (!request.isLocalSession()) {
                        zks.getFollower().request(request);
                    }
                    break;
                }
            }
        } catch (RuntimeException e) {
        } catch (Exception e) {}
    }
    👇👇👇
    private void maybeSendRequestToNextProcessor(Request request) throws RequestProcessorException {
        if (skipLearnerRequestToNextProcessor && request.isFromLearner()) {
            ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.add(1);
        } else {
            nextProcessor.processRequest(request);
        }
    }
    
    2.2.2_ CommitProcessor

    同 leader 节点的 CommitProcessor 一致

2.2.3_ FinalRequestProcessor

同 leader 节点的 FinalRequestProcessor 一致

2.2.4_ SyncRequestProcessor

同 leader 节点的 SyncRequestProcessor 一致

2.2.5_ SendAckRequestProcessor

leader 节点的AckRequestProcessor 是直接调用leader 节点的tryToCommit() 方法;

follower 节点 SendAckRequestProcessor 需要将Ack消息发送到leader节点进行统计处理;

public void processRequest(Request si) {
    if (si.type != OpCode.sync) {
        QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
        try {
            learner.writePacket(qp, false);
        } catch (IOException e) {
            // 省略异常处理代码
        }
    }
}

最后

在线蹲赞环节

老板!点个赞呗!!
1631955135(1).png


  目录