Zookeeper使用--Java API
Zookeeper使用–Java APIpackage com.beicai.api1.zkJavaApi;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.AsyncCallback.DataCallback;import org.apache.zookeeper.AsyncCallback....
·
Zookeeper使用–Java API
package com.beicai.api1.zkJavaApi;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @author cc zk功能都是事件触发(子线程中运行的)
*/
public class ZkApi_01 implements Watcher {
// CountDownLatch通知主线程等待子线程所有内容执行完成退出
// 参数是几个子线程
public static CountDownLatch countDownLatch = new CountDownLatch(2);
static ZooKeeper zooKeeper = null;
static boolean flag = true;
@Override
public void process(WatchedEvent event) {
// zk 需要实现的监听类
if (event.getState() == Event.KeeperState.SyncConnected) {
try {
// createSyscNode();
// createAnSyscNode();
// SyncDeleteNode();
// AnSyncDeleteNode();
// syncSetData();
if (flag) {
// AnsyncSetData();
// syncGetData();
ansyncGetData();
flag = false;
}
if (EventType.None == event.getType() && null == event.getPath()) {
System.out.println("None " + event.getPath());
// countDownLatch.countDown();
} else if (EventType.NodeCreated == event.getType()) {
System.out.println("success create znode: " + event.getPath());
zooKeeper.exists(event.getPath(), true, new IIStatCallback1(), null);
} else if (EventType.NodeDeleted == event.getType()) {
System.out.println("success delete znode: " + event.getPath());
zooKeeper.exists(event.getPath(), true, new IIStatCallback1(), null);
} else if (EventType.NodeDataChanged == event.getType()) {
System.out.println("data changed of znode: " + event.getPath());
zooKeeper.exists(event.getPath(), true, new IIStatCallback1(), null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws Exception {
/*
* 创建ZooKeeper对象 1.连接地址:三台机器逗号分隔,也可以只有一台 2.超时事件 5000毫秒 3.监听
*/
zooKeeper = new ZooKeeper("hadoop-1:2181,hadoop-2:2181,hadoop-3:2181", 5000, new ZkApi_01());
System.out.println(zooKeeper.getState());
countDownLatch.await(); // 等待子线程 直到看到 countDown()
zooKeeper.close();
}
// ================= 同步和异步创建znode
/**
* 同步创建znode创建节点有异步和同步两种方式。无论是异步或者同步,Zookeeper都不支持递归调用,
* 即无法在父节点不存在的情况下创建一个子节点,如在/zk-ephemeral节点不存在的情况下创建/zk-ephemeral/ch1节点;
* 并且如果一个节点已经存在, 那么创建同名节点时,会抛出NodeExistsException异常。
*/
private static String createSyscNode() throws Exception {
/*
* Ids.OPEN_ACL_UNSAFE : 所有权限 CreateMode.EPHEMERAL : 临时文件;The znode will
* be deleted upon the client's disconnect.
*/
String path1 = zooKeeper.create("/zkTest0112", "hello world".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
System.out.println("Success create znode: " + path1);
String path2 = zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode: " + path2);
return path1;
}
/*
* 异步创建 znode 使用异步方式于同步方式的区别在于节点的创建过程(包括网络通信和服务端的节点创建过程)是异步的, 在同步接口调用过程中,
* 开发者需要关注接口抛出异常的可能,但是在异步接口中,接口本身不会抛出异常, 所有异常都会在回调函数中通过Result Code来体现。
*/
private static void createAnSyscNode() throws Exception {
/*
* Ids.OPEN_ACL_UNSAFE : 所有权限 CreateMode.EPHEMERAL : 临时文件;The znode will
* be deleted upon the client's disconnect. IStringCallback() //默认触发这个功能
* 回调
*/
zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallback(), "this is context");
}
static class IStringCallback implements StringCallback {// 创建异步回调
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
}
}
// ================= 同步和异步删除znode
private static String SyncDeleteNode() throws Exception {// 同步删除节点
// -1表示忽略版本号
zooKeeper.delete("/zk-test0000000003", -1);
System.out.println("delete ok");
return "delete ok";
}
private static String AnSyncDeleteNode() throws Exception {// 异步删除
// -1表示忽略版本号
zooKeeper.delete("/zk-test-ephemeral-0000000013", -1, new MyDeleCallBack(), "delte context");
System.out.println("AnSyncDeleteNode delete ok");
return "delete ok";
}
static class MyDeleCallBack implements VoidCallback {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("delete path result: [" + rc + ", " + path + ", " + ctx);
countDownLatch.countDown();
}
}
// ========== 同步和异步修改数据
private String syncSetData() throws Exception {// 同步修改znode数据
zooKeeper.setData("/zk-test0000000004", "this is syncSetData ".getBytes(), -1);
System.out.println("syncSetData ok ");
return "set ok";
}
private String AnsyncSetData() throws Exception {// 异步修改
/*
* exists () param 2 : 是否注册监听; 调用一次,表示使用一次监听事件; 默认值是false
*/
if (zooKeeper.exists("/zk-test0000000004", true) != null) {
String n = (System.currentTimeMillis() + "");
System.out.println(n);
zooKeeper.setData("/zk-test0000000004", n.getBytes(), -1, new mySetCallBack(), "this is context");
System.out.println("AnsyncSetData ok ");
}
return "set ok";
}
static class mySetCallBack implements StatCallback {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("delete path result: [" + rc + ", " + path + ", " + ctx + ", " + stat + "]");
countDownLatch.countDown();
}
}
// ========查询节点数据(同步和异步)
private static void syncGetData() throws Exception { // 同步查询数据
Stat stat = new Stat();
byte[] b = zooKeeper.getData("/zk-test0000000004", false, stat);
System.out.println(new String(b));
System.out.println("getVersion:" + stat.getVersion());
System.out.println("getNumChildren:" + stat.getNumChildren());
System.out.println("getDataLength:" + stat.getDataLength());
}
private static void ansyncGetData() throws Exception { // 异步查询数据
zooKeeper.getData("/zk-test0000000004", false, new myGetCallBack(), null);
}
static class myGetCallBack implements DataCallback {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(new String(data));
System.out.println("getVersion:" + stat.getVersion());
System.out.println("getNumChildren:" + stat.getNumChildren());
System.out.println("getDataLength:" + stat.getDataLength());
System.out.println("path:" + path);
}
}
}
//exist回调监听
class IIStatCallback1 implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
}
}
更多推荐



所有评论(0)