Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用、ZKClient、Curator)

一、Zookeeper原生API如何进行调用 准备工作: 首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入zk的依赖 复制代码 org.apache.zookeeper zookeeper 3.4.10 复制代码 1. 连接zk并监听事件 复制代码 package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; //连接zk并监听事件 public class ZKDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); public static void main(String[] args) throws IOException { ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDemo()); System.out.println(zk.getState()); try { cdl.await(); } catch (Exception e) { System.out.println("ZK Session established."); } } //监听到事件时进行处理 public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if (KeeperState.SyncConnected == event.getState()) { cdl.countDown(); } } } 复制代码 输出结果: CONNECTING Receive watched event:WatchedEvent state:SyncConnected type:None path:null 2. 创建znode并监听事件 复制代码 package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //创建znode并监听事件 public class ZKOperateDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKOperateDemo()); cdl.await(); String path1 = zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create path: " + path1); String path2 = zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Success create path: " + path2); } //监听到事件时进行处理 public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if (KeeperState.SyncConnected == event.getState()) { cdl.countDown(); } } } 复制代码 输出结果: Receive watched event:WatchedEvent state:SyncConnected type:None path:null Success create path: /zk-test- Success create path: /zk-test-0000000011 3. 改变znode数据并监听事件 复制代码 package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; //改变znode数据并监听事件 public class ZKDataDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); private static ZooKeeper zk = null; private static Stat stat = new Stat(); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDataDemo()); cdl.await(); zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(new String(zk.getData("/zk-test", true, stat))); zk.getData("/zk-test", true, stat); System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); zk.setData("/zk-test", "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } //监听到事件时进行处理 public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { cdl.countDown(); } else if (event.getType() == EventType.NodeDataChanged) { try { System.out.println(new String(zk.getData(event.getPath(), true, stat))); System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } catch (Exception e) { } } } } } 复制代码 输出结果: 123 4294967354, 4294967354, 0 123 4294967354, 4294967355, 1 4. 改变子节点并监听事件 复制代码 package com.study.demo.zk; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //改变子节点并监听事件 public class ZKChildrenDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenDemo()); cdl.await(); zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); List list = zk.getChildren("/zk-test", true); for (String str : list) System.out.println(str); zk.create("/zk-test/c2", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Integer.MAX_VALUE); } //监听到事件时进行处理 public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) if (EventType.None == event.getType() && null == event.getPath()) { cdl.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("Child: " + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } } 复制代码 输出结果: c1 Child: [c1, c2] 5. 异步调用并完成回调 复制代码 package com.study.demo.zk; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; //异步调用并完成回调 class ChildrenCallback implements AsyncCallback.Children2Callback { public void processResult(int rc, String path, Object ctx, List children, Stat stat) { System.out.println( "Child: " + rc + ", path: " + path + ", ctx: " + ctx + ", children: " + children + ", stat: " + stat); } } public class ZKChildrenAsyncDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenAsyncDemo()); cdl.await(); zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zk.getChildren("/zk-test", true, new ChildrenCallback(), "ok"); Thread.sleep(Integer.MAX_VALUE); } //监听到事件时进行处理 public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) if (EventType.None == event.getType() && null == event.getPath()) { cdl.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("Child: " + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } } 复制代码 输出结果: Child: 0, path: /zk-test, ctx: ok, children: [c1], stat: 4294967369,4294967369,1535536716381,1535536716381,0,1,0,0,3,1,4294967370 6. 连接后创建回调 复制代码 package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //连接后创建回调 class IStringCallback implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("create path result: [" + rc + ", " + path + "," + ctx + ", real path name: " + name); } } public class ZKAsyncDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKAsyncDemo()); cdl.await(); zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), new String("I am context")); zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), new String("I am context")); zk.create("/zk-test-", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(), new String("I am context")); Thread.sleep(Integer.MAX_VALUE); } //监听到事件时进行处理 public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if (KeeperState.SyncConnected == event.getState()) { cdl.countDown(); } } } 复制代码 输出结果: Receive watched event:WatchedEvent state:SyncConnected type:None path:null create path result: [0, /zk-test-,I am context, real path name: /zk-test- create path result: [-110, /zk-test-,I am context, real path name: null create path result: [0, /zk-test-,I am context, real path name: /zk-test-0000000016 二、ZKClient ZKClient的优点: 1)可以递归创建。在zookeeper命令行和zookeeper的原生API里面得先创建父节点才能创建子节点 2)可以递归删除。在zookeeper命令行和zookeeper的原生API里面得先删除子节点才能删除父节点 3)避免不存在的异常 准备工作: 首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入ZKClient的依赖 复制代码 com.101tec zkclient 0.10 复制代码 1. ZkClient递归创建顺序节点 复制代码 package com.study.demo.client; import org.I0Itec.zkclient.ZkClient; /** * * @Description: ZkClient递归创建顺序节点 * @author leeSmall * @date 2018年9月2日 * */ public class CreateNodeDemo { public static void main(String[] args) { ZkClient client = new ZkClient("192.168.152.130:2181", 5000); String path = "/zk-client/c1"; // 递归创建顺序节点 true:先创建父节点/zk-client client.createPersistent(path, true); } } 复制代码 创建成功: 2. ZkClient获取数据并监听事件 复制代码 package com.study.demo.client; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; /** * * @Description: ZkClient获取数据 * @author leeSmall * @date 2018年9月2日 * */ public class GetDataDemo { public static void main(String[] args) throws InterruptedException { String path = "/zk-client"; ZkClient client = new ZkClient("192.168.152.130:2181", 5000); //创建临时节点 client.createEphemeral(path, "123"); //注册父节点数据改变的事件 client.subscribeDataChanges(path, new IZkDataListener() { //父节点数据改变事件 public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println(dataPath + " changed: " + data); } //父节点数据删除事件 public void handleDataDeleted(String dataPath) throws Exception { System.out.println(dataPath + " deleted"); } }); System.out.println(client.readData(path).toString()); client.writeData(path, "456"); Thread.sleep(1000); client.delete(path); //sleep的目的是为了更好的观察事件变化 Thread.sleep(Integer.MAX_VALUE); } } 复制代码 输出结果: 123 /zk-client changed: 456 /zk-client deleted 3. ZkClient获取子节点数据并监听事件 复制代码 package com.study.demo.client; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; /
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信