基于SpringBoot实现AOP+jdk/CGlib动态代理详解
客户端的启动流程#
upload/201912061149315774.png
看上面的客户端启动的脚本图,可以看到,zookeeper客户端脚本运行的入口ZookeeperMain.java的main()方法, 关于这个类可以理解成它是程序启动的辅助类,由它提供开始的位置,进而加载出zk client的上下文
创建ZooKeeperMain对象#
upload/201912061149310542.png
Copy
// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
// todo new ZK客户端
ZooKeeperMain main = new ZooKeeperMain(args);
// todo run方法的实现在下面
main.run();
}
跟踪ZooKeeperMain main = new ZooKeeperMain(args); 能往下追很长的代码,提前说main.run()的作用,就是对用户输入的命令进行下一步处理
如上是入口函数的位置,跟进这两个函数,可以找到我们在client端的命令行中可以输入命令和zookeeper服务端进行通信的原因(开起了新的线程),以及zookeeper的客户端所依赖的其他类
跟进ZooKeeperMain main = new ZooKeeperMain(args);
Copy
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 连接到客户端
connectToZK(cl.getOption("server"));
}
我们在命令行启动客户端时,输入命令zkCli.sh -server localhost:2181,其中的args数组, 就是我们在启动就是我们输入的参数,
构建zookeeperMain对象时,上面主要做了两件事
解析args参数数组
连接客户端
解析参数数组的逻辑就在下面, 很熟悉,就是我们在命令行启动zookeeper时输入的命令可选项
Copy
public boolean parseOptions(String[] args) {
List argList = Arrays.asList(args);
Iterator it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}
创建ZooKeeper客户端的对象#
upload/201912061149328356.png
接着看如果连接客户端, connectToZK(String newHost) 同样是本类方法,源码如下:
Copy
// todo 来到这里
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
//todo 命令行中的server 后面跟着 host主机地址
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
// todo 创建zookeeper的实例
zk = new ZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
new MyWatcher(), readOnly);
}
到这里算是个小高潮吧,毕竟看到了zookeeper client的封装类ZooKeeper, 这个类上的注解大概是这么介绍这个类的
它是个Zookeeper 客户端的封装类, 它的第一个参数是 host:port,host:port,host:port这种格式的字符串,逗号左右是不同的服务端的地址
会异步的创建session,通常这个session在构造函数执行完之间就已经创建完成了
watcher 是监听者,它被通知的时刻不确定,可能是构造方法执行完成前,也可能在这之后
只要没有连接成功, zookeeper客户端,会一直尝试从提供的服务地址串中选择出一个尝试链接
跟进ZooKeeper的构造方法
Copy
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
// todo 包装服务端的地址
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//todo 将服务端的地址封装进 StaticHostProvider -> HostProvider中
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
// todo 创建客户端的上下文, 这个上下文对象的亮点就是它维护了一个客户端的socket
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
// todo 跟进这个方法,getClientCnxnSocket, 获取出客户端上下文中的socket
getClientCnxnSocket(), canBeReadOnly);
// todo 启动客户端
cnxn.start();
}
主要做了这么几件事
将服务端的地址解析封装进了StaticHostProvider类中, 可以把这个类理解成专门存放服务端地址的set 集合
创建出了客户端的上下文对象: ClientCnxn, 当然在这之前,入参位置还有一个getClientCnxnSocket()这个函数可以创建出客户端的NIO Socket
然后调用cnxn.start() 其实就是启动了客户端的另外两条线程sendThread和eventThread 下面会详细说
创建客户端的 NioSocket#
upload/201912061149326164.png
继续跟进源码getClientCnxnSocket()通过反射,zk客户端使用的socket对象是ClientCnxnSocketNIO
Copy
//todo 通过反射创建出客户端上下文中的 socket , 实际的ClientCnxnSocketNIO 是 ClientCnxnSocket的子类
// todo ---> zookeeper 封装的 NIO的逻辑都在 实际的ClientCnxnSocketNIO
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
// todo zookeeper.clientCnxnSocket
String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
// todo 上面String其实就是这个类的name, 根进去看一下它的属性
// todo 这个类维护了NioSocket使用到的 selector 选择器 , 已经发生的感兴趣的事件SelectionKey
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// todo 可以看到客户端使用的 NioSocket
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
创建 ClientCnxn客户端的上下文#
upload/201912061149331512.png
创建上下文,构造函数中的诸多属性都是在前面读取配置文件或是新添加进来的,重点是最后两行,它创建了两条线程类,和zk客户端的IO息息相关
Copy
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId; // todo 刚才传递过来的值为0
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
// todo 添加read的超时时间
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// todo 创建了一个seadThread 线程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
创建SendThread#
upload/201912061149330485.png
sendThred是一个客户端的线程类,什么时候开启? 其实就在上面,当创建了ClientCnxn后,调用的cnxn.start()就是在开启它的run() , 它有什么作用? 它的run()是一个无限循环,除非运到了close的条件,否则他就会一直循环下去, 比如向服务端发送心跳,或者向服务端发送我们在控制台输入的数据以及接受服务端发送过来的响应
这是他的构造方法,可以看到它还是一个守护线程,并拥有客户端socket的引用,有了NIO Socket相关技能
Copy
//todo
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
// todo 设置状态 Connecting
state = States.CONNECTING;
// todo 就是在 Zookeeper new ClientCnxn 时, 在倒数第二个位置使传递进去一个函数实际的
this.clientCnxnSocket = clientCnxnSocket;
// todo 设置成守护线程
setDaemon(true);
}
它的Run方法, 真的是好长啊, 比我上面写的部分内容还长(大概两百行了), 大概它的流程 ,每次循环:
检查一下客户端的socket有没有和服务端的socket建立连接
没有建立连接
尝试选出其他的server地址进行连接
如果满足close的条件,直接break 跳出整个while循环
如果已经建立了连接
计算 to = 读取的超时时间 - 服务端的响应时间
未连接的状态
计算 to = 连接超时时间 - 服务端的响应时间
上面的两个to, 如果小于0, 说明客户端和服务端通信出现了异常, 很可能是server的session time out,于是抛出异常
如果连接状态是健康的,向服务端发送心跳
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服务端发送数据
在这个负责和服务端进行IO操作的线程中,只要不是close或其他重大错误,一般可以预知的异常都有try起来,然后记录日志,并没有其他操作,循环还是会进行
Copy
// todo introduce 介绍
clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// todo 这个while循环中存在建立连接的过程, 已经连接建立失败后不断重试的过程
//todo state.isAlive() 默认是 NOT_CONNECTED
while (state.isAlive()) {
try {
//todo 1111 如果socket还没有连接 /////////////////////////////////////////////////////////////////////////////////////////////////////////
//todo 如果socket还没有连接
if (!clientCnxnSocket.isConnected()) {
// todo 判断是不是第一次连接, 如果不是第一次进入下面try代码块, 随机产生一个小于一秒的时间
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
// todo 如果是closing 或者 已经关闭了, 直接退出这个循环
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// todo 连接失败时,来这里重试连接
// todo 从我们传递进来的host地址中选择一个地址
serverAddress = hostProvider.next(1000);
}
// todo client和server进行socket连接
// todo 跟进去 ,实现逻辑在上面
// todo 这个方法开始建立连接,并将 isFasterConnect改成了 false
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//todo 2222 如果socket处于连接状态 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面的连接状态
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
// todo 连接成功的话执行to 为下面值
// todo to = 读取的超时时间 - 上一次的读取时间
// todo 如果预订的超时时间 - 上次读的时间 <= 0 说明超时了
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
// todo 如果没有连接成功, 就会来到这里, 给 to 赋值
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//todo 3333 异常处理 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面抛出来了异常
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
// todo 这里抛出来了异常, 下面的try 就会把它抓住
throw new SessionTimeoutException(warnInfo);
}
//todo 44444 连接成功执行的逻辑 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面的是连接成功执行的逻辑
if (state.isConnected()) {
// todo 为了防止竞争状态丢失发送第二个ping, 同时也避免出现很多的ping
//1000(1 second) is to prevent(阻止) race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
// todo 客户端一直在这里循环, 如果连接成功的话, 每次循环都来到这个逻辑这里发送 ping
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// If we are in read-only mode, seek for read/write server
// todo 只读状态 相关逻辑
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);