Zkclient操作zookeeper案例
Zkclient操作zookeeper每天多学一点点~话不多说,这就开始吧…文章目录Zkclient操作zookeeper1.Zookeeper的Java客户端API2.代码2.1. pom依赖2.2. zkCilent的序列化2.3 zkCilent的curd2.4 zkCilent的监听3.结语1.Zookeeper的Java客户端API有三种方式原生(1)Zookeeper...
Zkclient操作zookeeper
每天多学一点点~
话不多说,这就开始吧…
文章目录
1.Zookeeper的Java客户端API
有三种方式
-
原生
(1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
(2)Session超时之后没有实现重连机制;
(3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
(4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
(5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
(6)删除节点无法实现级联删除; -
zkclient
ZkClient是一个开源客户端,在Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了Session超时重连,Watcher反复注册等功能。像dubbo等框架对其也进行了集成使用。
虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:
(1)几乎没有参考文档;
(2)异常处理简化(抛出RuntimeException);
(3)重试机制比较难用;
(4)没有提供各种使用场景的实现; -
Curator
Curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。目前已经成为Apache的顶级项目。另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
本文选用zkclient 进行操作
2.代码
2.1. pom依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
2.2. zkCilent的序列化
防止数据在客户端呈现乱码,且 能 启动监听机制
package com.jiagouedu.zkclient.watcher;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.UnsupportedEncodingException;
/**
* @Classname zk序列化 和 反 序列化 方式
* @Description TODO
* @Date 2019/3/24 3:54
* @Created by 爆裂无球
*/
public class MyZkSerializer implements ZkSerializer {
/**
* zk自带的序列化
*/
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
public byte[] serialize(Object obj) throws ZkMarshallingError {
try {
return String.valueOf(obj).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
2.3 zkCilent的Util
package com.jiagouedu.zkclient.znode;
import com.jiagouedu.zkclient.watcher.MyZkSerializer;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import java.util.List;
/**
* @Classname ZkClientCrud
* @Description TODO
* @Date 2019/3/24 2:54
* @Created by 爆裂无球
*/
public class ZkClientCrud<T> {
ZkClient zkClient;
// private String connectString="192.168.0.31:2181,192.168.0.32:2181,192.168.0.33:2181";
private String connectString = "127.0.0.1:2181";
// public ZkClientUtil() {
// this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer()); // zk默认的序列化方式
// }
public ZkClientUtil() {
//定义序列化方式,别忘了
this.zkClient = new ZkClient(connectString, 5000, 5000, new MyZkSerializer());
}
public void createPersistent(String path) {
zkClient.createPersistent(path, true); //创建持久化节点,true表示如果父节点不存在则创建父节点
}
//创建 永久 节点,并设置数据
public void createPersistent(String path, Object data) {
zkClient.createPersistent(path, data);
}
// 创建永久 有序节点
public void createPersistentSequential(String path, Object data) {
zkClient.createEphemeralSequential(path, data);
}
//创建临时节点 会话失效后删除
public void createEphemeral(String path, Object data) {
zkClient.createEphemeral(path, data);
}
//创建 临时节点 有序 节点 会话失效后删除
public void createEphemeralSequential(String path, Object data) {
zkClient.createEphemeralSequential(path, data);
}
//创建alc节点
public void createAcl(String path, Object data, final List<ACL> acl, final CreateMode mode) {
zkClient.create(path, data, acl, mode);
}
//设置acl 属性
public void setAcl(String path, List<ACL> acl) {
zkClient.setAcl(path, acl);
}
//获得acl属性
public Map.Entry<List<ACL>, Stat> getAcl(String path) {
return zkClient.getAcl(path);
}
//读取数据
public T readData(String path) {
// return zkClient.readData(path);
//没有不会抛异常,而是返回null
return zkClient.readData(path, true);
}
/**
* 读取 子节点 只能找 其 子一级 下 所有的
*/
public List<String> getChildren(String path) {
return zkClient.getChildren(path);
}
/**
* 递归查找 所有 子节点
*/
public void getChilderRecursive(String path) {
System.out.println(path);
if (zkClient.exists(path)) {
List<String> list = zkClient.getChildren(path);
if (list.size() > 0) {
list.stream().forEach(n -> {
getChilderRecursive(path + "/" + n);
});
}
}
}
// 更新内容
public void writeData(String path, Object object) {
zkClient.writeData(path, object);
}
//删除单个节点 即这个节点下不能有子节点
public void delete(String path) {
zkClient.delete(path);
}
//递归删除节点 即删除其节点下 所有子节点 对应rmr 命令
public void deleteRecursive(String path) {
zkClient.deleteRecursive(path);
}
/***
* 支持创建递归方式 但是不能写入数据
* @param path
* @param createParents true,表明需要递归创建父节点
*/
public void createPersistentRecursive(String path, boolean createParents) {
zkClient.createPersistent(path, createParents);
}
/**
* 关闭zk
*/
public void close() {
zkClient.close();
}
/**
* 监听
*/
public void lister(String path) {
//对节点添加监听变化。 当前节点内容修改、节点删除 能监听数据
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf(" 变更的节点为:%s,%s", dataPath, data); // 节点变更 变更的节点为:/w,123
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf(" 删除的节点为:%s", dataPath);
}
});
//对父节点添加监听子节点变化。监听 下面子节点的新增、删除 和 当前节点 不监听数据发生修改和变化。 parentPath: /w,currentChilds:[ww1]
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(" parentPath: " + parentPath + ",currentChilds:" + currentChilds);
}
});
//zeng gai shan
//对父节点添加监听子节点变化。
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if (state == Watcher.Event.KeeperState.SyncConnected) {
//当我重新启动后start,监听触发
System.out.println("连接成功");
} else if (state == Watcher.Event.KeeperState.Disconnected) {
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
} else
System.out.println("其他状态" + state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
}
2.4 zkCilent的测试
package com.jiagouedu.zkclient.ZkSerializer;
import com.jiagouedu.zkclient.watcher.MyZkSerializer;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.Watcher;
import java.util.List;
/**
* @Classname 监听
* @Description TODO
* @Date 2019/3/24 3:38
* @Created by 爆裂无球
*/
public class ZkClientWatcher<T> {
private ZkClientUtil zkClientUtil = new ZkClientUtil();
ZkClient zkClient;
/**
* 新增 节点(持节+临时) 并读取数据 (不能直接创建 子节点)
*/
@Test
public void createPersistent() {
User user = new User();
user.setAge(18);
user.setName("zjq");
zkClientUtil.createPersistent("/ry", user); // 创建永久节点
System.out.println(zkClientUtil.readData("/ry")); // 读取 节点 数据
User user2 = new User();
user2.setAge(23);
user2.setName("lj");
zkClientUtil.createEphemeral("/rh", user2); // 创建临时节点
System.out.println(zkClientUtil.readData("/rh"));
}
/**
* 新增 节点(永久有序,临时有序) 并读取数据 (不能直接创建 子节点)
*/
@Test
public void create() {
User user = new User();
user.setAge(1);
user.setName("我擦");
zkClientUtil.createPersistentSequential("/ry", user); // 创建 永久有序节点
User user2 = new User("我擦2", 2);
zkClientUtil.createPersistentSequential("/ry", user2); // 创建 临时有序节点
}
/**
* 创建acl节点
*/
@Test
public void createAcl() {
User user = new User("acldata", 3);
User user2 = new User("acldata2", 4);
List<ACL> aclList = new ArrayList<>();
int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE; // 或 运算 admin 可以设置节点访问控制列表权限
aclList.add(new ACL(perm, new Id("world", "anyone"))); //设置有所人的权限
// aclList.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "127.0.0.1"))); //设置所有权限,本机ip
zkClientUtil.createPersistent("/acl", user); // 创建节点
zkClientUtil.setAcl("/acl", aclList); // 给节点设置acl权限
//创建并设置acl节点 ZooDefs.Ids.OPEN_ACL_UNSAFE 表anyone
zkClientUtil.createAcl("/alc2", aclList, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
* 获得 acl 属性
*/
@Test
public void getAndSetAcl() {
Map.Entry acl = zkClientUtil.getAcl("/acl");
System.out.println(acl.getKey());
System.out.println(acl.getValue());
}
/**
* 递归创建节点(但是不能写入数据) 写入数据,读数据,,再 删除子节点
*/
@Test
public void test() {
String path = "/zjq/t1";
zkClientUtil.createPersistentRecursive(path, true);
zkClientUtil.writeData(path, new User("xx", 3));
Object o = zkClientUtil.readData(path);
System.out.println(o);
zkClientUtil.deleteRecursive(path);
}
/**
* 创建子节点,并读取
*/
@Test
public void getChildren() {
String path = "/zjq/t1";
String path2 = "/zjq/t2";
String path3 = "/zjq/t3";
zkClientUtil.createPersistentRecursive(path, true);
zkClientUtil.createPersistentRecursive(path2, true);
zkClientUtil.createPersistentRecursive(path3, true);
List list = zkClientUtil.getChildren("/zjq");
list.stream().forEach(n -> {
System.out.println(n);
});
}
/**
* 递归查找 所有 子节点
*/
@Test
public void getChilderRecursive() {
String path = "/zjq";
zkClientUtil.getChilderRecursive(path);
}
/**
* 测试监听 并且开启 下面的main方法
*/
@Test
public void testListen() throws InterruptedException {
ZkClientUtil zkClientUtil = new ZkClientUtil();
String path = "/wukong/w1";
zkClientUtil.deleteRecursive(path); //先删除
zkClientUtil.lister(path); //添加监听
zkClientUtil.createPersistent(path, "123"); //再创建节点
Thread.sleep(2000);
zkClientUtil.writeData(path, "abc"); //修改数据
Thread.sleep(Integer.MAX_VALUE);
}
public static void main(String[] args) throws InterruptedException {
ZkClientUtil zkClientUtil=new ZkClientUtil();
String path="/wukong/w1";
zkClientUtil.writeData(path,"abc"); //能触发 或者在sh zkCli.sh delete /wukong 也行
}
}
大概内容就这么多,不全的可以查查详细文档
3.结语
Curator好比hibernate,而zkclient就好比mybatis,一般情况下,zkclient可以满足需求~
世上无难事,只怕有心人,每天积累一点点,fighting!!!
更多推荐
所有评论(0)