Zookeeper 通知更新可靠吗? 解读源码找答案!

 欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

本文由特鲁门发表于云+社区专栏

导读:

遇到Keepper通知更新无法收到的问题,思考节点变更通知的可靠性,通过阅读源码解析了解到zk Watch的注册以及触发的机制,本地调试运行模拟zk更新的不可靠的场景以及得出相应的解决方案。

过程很曲折,但问题的根本原因也水落石出了,本文最后陈述了更新无法收到的根本原因,希望对其他人有所帮助。-----------------------------------------

通常Zookeeper是作为配置存储、分布式锁等功能被使用,配置读取如果每一次都是去Zookeeper server读取效率是非常低的,幸好Zookeeper提供节点更新的通知机制,只需要对节点设置Watch监听,节点的任何更新都会以通知的方式发送到Client端。

img

如上图所示:应用Client通常会连接上某个ZkServer,forPath不仅仅会读取Zk 节点zkNode的数据(通常存储读取到的数据会存储在应用内存中,例如图中Value),而且会设置一个Watch,当zkNode节点有任何更新时,ZkServer会发送notify,Client运行Watch来才走出相应的事件相应。这里假设操作为更新Client本地的数据。这样的模型使得配置异步更新到Client中,而无需Client每次都远程读取,大大提高了读的性能,(图中的re-regist重新注册是因为对节点的监听是一次性的,每一次通知完后,需要重新注册)。但这个Notify是可靠的吗?如果通知失败,那岂不是Client永远都读取的本地的未更新的值?

由于现网环境定位此类问题比较困难,因此本地下载源码并模拟运行ZkServer & ZkClient来看通知的发送情况。


1、git 下载源码 https://github.com/apache/zookeeper

2、cd 到路径下,运行ant eclipse 加载工程的依赖。

3、导入Idea中。

https://stackoverflow.com/questions/43964547/how-to-import-zookeeper-source-code-to-idea

查看相关问题和步骤。

首先运行ZkServer。QuorumPeerMain是Server的启动类。这个可以根据bin下ZkServer.sh找到入口。注意启动参数配置参数文件,指定例如启动端口等相关参数。

img

在此之前,需要设置相关的断点。

首先我们要看client设置监听后,server是如何处理的

ZkClient 是使用Nio的方式与ZkServer进行通信的,Zookeeper的线程模型中使用两个线程:

SendThread专门成立的请求的发送,请求会被封装为Packet(包含节点名称、Watch描述等信息)类发送给Sever。

EventThread则专门处理SendThread接收后解析出的Event。

ZkClient 的主要有两个Processor,一个是SycProcessor负责Cluster之间的数据同步(包括集群leader选取)。另一个是叫FinalRuestProcessor,专门处理对接受到的请求(Packet)进行处理。

    //ZookeeperServer 的processPacket方法专门对收到的请求进行处理。     public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {         // We have the request, now process and setup for next         InputStream bais = new ByteBufferInputStream(incomingBuffer);         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);         RequestHeader h = new RequestHeader();         h.deserialize(bia, "header");         // Through the magic of byte buffers, txn will not be         // pointing         // to the start of the txn         incomingBuffer = incomingBuffer.slice();         //鉴权请求处理         if (h.getType() == OpCode.auth) {             LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());             AuthPacket authPacket = new AuthPacket();             ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);             String scheme = authPacket.getScheme();             ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);             Code authReturn = KeeperException.Code.AUTHFAILED;             if(ap != null) {                 try {                     authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());                 } catch(RuntimeException e) {                     LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);                     authReturn = KeeperException.Code.AUTHFAILED;                 }             }             if (authReturn == KeeperException.Code.OK) {                 if (LOG.isDebugEnabled()) {                     LOG.debug("Authentication succeeded for scheme: " + scheme);                 }                 LOG.info("auth success " + cnxn.getRemoteSocketAddress());                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,                         KeeperException.Code.OK.intValue());                 cnxn.sendResponse(rh, null, null);             } else {                 if (ap == null) {                     LOG.warn("No authentication provider for scheme: "                             + scheme + " has "                             + ProviderRegistry.listProviders());                 } else {                     LOG.warn("Authentication failed for scheme: " + scheme);                 }                 // send a response...                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,                         KeeperException.Code.AUTHFAILED.intValue());                 cnxn.sendResponse(rh, null, null);                 // ... and close connection                 cnxn.sendBuffer(ServerCnxnFactory.closeConn);                 cnxn.disableRecv();             }             return;         } else {                          if (h.getType() == OpCode.sasl) {                 Record rsp = processSasl(incomingBuffer,cnxn);                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());                 cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?                 return;             }             else {                 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),                   h.getType(), incomingBuffer, cnxn.getAuthInfo());                 si.setOwner(ServerCnxn.me);                 // Always treat packet from the client as a possible                 // local request.                 setLocalSessionFlag(si);                 //交给finalRequestProcessor处理                 submitRequest(si);             }         }         cnxn.incrOutstandingRequests(h);     }

FinalRequestProcessor 对请求进行解析,Client连接成功后,发送的exist命令会落在这部分处理逻辑。

img

关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信