Zookeeper使用–Java API

package com.beicai.api1.zkJavaApi;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * @author cc zk功能都是事件触发(子线程中运行的)
 */
public class ZkApi_01 implements Watcher {
	// CountDownLatch通知主线程等待子线程所有内容执行完成退出
	// 参数是几个子线程
	public static CountDownLatch countDownLatch = new CountDownLatch(2);
	static ZooKeeper zooKeeper = null;
	static boolean flag = true;

	@Override
	public void process(WatchedEvent event) {
		// zk 需要实现的监听类
		if (event.getState() == Event.KeeperState.SyncConnected) {
			try {
				// createSyscNode();
				// createAnSyscNode();
				// SyncDeleteNode();
				// AnSyncDeleteNode();
				// syncSetData();
				if (flag) {
					// AnsyncSetData();
					// syncGetData();
					ansyncGetData();
					flag = false;
				}
				if (EventType.None == event.getType() && null == event.getPath()) {
					System.out.println("None  " + event.getPath());
//					countDownLatch.countDown();
				} else if (EventType.NodeCreated == event.getType()) {
					System.out.println("success create znode: " + event.getPath());
					zooKeeper.exists(event.getPath(), true, new IIStatCallback1(), null);
				} else if (EventType.NodeDeleted == event.getType()) {
					System.out.println("success delete znode: " + event.getPath());
					zooKeeper.exists(event.getPath(), true, new IIStatCallback1(), null);
				} else if (EventType.NodeDataChanged == event.getType()) {
					System.out.println("data changed of znode: " + event.getPath());
					zooKeeper.exists(event.getPath(), true, new IIStatCallback1(), null);
				}
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				countDownLatch.countDown();
			}
		}
	}

	public static void main(String[] args) throws Exception {
		/*
		 * 创建ZooKeeper对象 1.连接地址:三台机器逗号分隔,也可以只有一台 2.超时事件 5000毫秒 3.监听
		 */
		zooKeeper = new ZooKeeper("hadoop-1:2181,hadoop-2:2181,hadoop-3:2181", 5000, new ZkApi_01());
		System.out.println(zooKeeper.getState());
		countDownLatch.await(); // 等待子线程 直到看到 countDown()
		zooKeeper.close();
	}

	// ================= 同步和异步创建znode
	/**
	 * 同步创建znode创建节点有异步和同步两种方式。无论是异步或者同步,Zookeeper都不支持递归调用,
	 * 即无法在父节点不存在的情况下创建一个子节点,如在/zk-ephemeral节点不存在的情况下创建/zk-ephemeral/ch1节点;
	 * 并且如果一个节点已经存在, 那么创建同名节点时,会抛出NodeExistsException异常。
	 */
	private static String createSyscNode() throws Exception {
		/*
		 * Ids.OPEN_ACL_UNSAFE : 所有权限 CreateMode.EPHEMERAL : 临时文件;The znode will
		 * be deleted upon the client's disconnect.
		 */
		String path1 = zooKeeper.create("/zkTest0112", "hello world".getBytes(), Ids.OPEN_ACL_UNSAFE,
				CreateMode.EPHEMERAL);
		System.out.println("Success create znode: " + path1);
		String path2 = zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
				CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("Success create znode: " + path2);
		return path1;
	}

	/*
	 * 异步创建 znode 使用异步方式于同步方式的区别在于节点的创建过程(包括网络通信和服务端的节点创建过程)是异步的, 在同步接口调用过程中,
	 * 开发者需要关注接口抛出异常的可能,但是在异步接口中,接口本身不会抛出异常, 所有异常都会在回调函数中通过Result Code来体现。
	 */
	private static void createAnSyscNode() throws Exception {
		/*
		 * Ids.OPEN_ACL_UNSAFE : 所有权限 CreateMode.EPHEMERAL : 临时文件;The znode will
		 * be deleted upon the client's disconnect. IStringCallback() //默认触发这个功能
		 * 回调
		 */
		zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
				new IStringCallback(), "this is context");
	}

	static class IStringCallback implements StringCallback {// 创建异步回调
		public void processResult(int rc, String path, Object ctx, String name) {
			System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
		}
	}

	// ================= 同步和异步删除znode
	private static String SyncDeleteNode() throws Exception {// 同步删除节点
		// -1表示忽略版本号
		zooKeeper.delete("/zk-test0000000003", -1);
		System.out.println("delete ok");
		return "delete ok";
	}

	private static String AnSyncDeleteNode() throws Exception {// 异步删除
		// -1表示忽略版本号
		zooKeeper.delete("/zk-test-ephemeral-0000000013", -1, new MyDeleCallBack(), "delte context");
		System.out.println("AnSyncDeleteNode delete ok");
		return "delete ok";
	}

	static class MyDeleCallBack implements VoidCallback {
		@Override
		public void processResult(int rc, String path, Object ctx) {
			System.out.println("delete path result: [" + rc + ", " + path + ", " + ctx);
			countDownLatch.countDown();
		}
	}

	// ========== 同步和异步修改数据
	private String syncSetData() throws Exception {// 同步修改znode数据
		zooKeeper.setData("/zk-test0000000004", "this is syncSetData ".getBytes(), -1);
		System.out.println("syncSetData ok ");
		return "set ok";
	}

	private String AnsyncSetData() throws Exception {// 异步修改
		/*
		 * exists () param 2 : 是否注册监听; 调用一次,表示使用一次监听事件; 默认值是false
		 */
		if (zooKeeper.exists("/zk-test0000000004", true) != null) {
			String n = (System.currentTimeMillis() + "");
			System.out.println(n);
			zooKeeper.setData("/zk-test0000000004", n.getBytes(), -1, new mySetCallBack(), "this is context");
			System.out.println("AnsyncSetData ok ");
		}
		return "set ok";
	}

	static class mySetCallBack implements StatCallback {
		@Override
		public void processResult(int rc, String path, Object ctx, Stat stat) {
			System.out.println("delete path result: [" + rc + ", " + path + ", " + ctx + ", " + stat + "]");
			countDownLatch.countDown();
		}
	}

	// ========查询节点数据(同步和异步)
	private static void syncGetData() throws Exception { // 同步查询数据
		Stat stat = new Stat();
		byte[] b = zooKeeper.getData("/zk-test0000000004", false, stat);
		System.out.println(new String(b));
		System.out.println("getVersion:" + stat.getVersion());
		System.out.println("getNumChildren:" + stat.getNumChildren());
		System.out.println("getDataLength:" + stat.getDataLength());
	}

	private static void ansyncGetData() throws Exception { // 异步查询数据
		zooKeeper.getData("/zk-test0000000004", false, new myGetCallBack(), null);
	}

	static class myGetCallBack implements DataCallback {
		@Override
		public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
			System.out.println(new String(data));
			System.out.println("getVersion:" + stat.getVersion());
			System.out.println("getNumChildren:" + stat.getNumChildren());
			System.out.println("getDataLength:" + stat.getDataLength());
			System.out.println("path:" + path);
		}
	}
}

//exist回调监听
class IIStatCallback1 implements AsyncCallback.StatCallback {
	public void processResult(int rc, String path, Object ctx, Stat stat) {
		System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
	}
}

源码下载地址

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐