Zkclient操作zookeeper

每天多学一点点~
话不多说,这就开始吧…

1.Zookeeper的Java客户端API

有三种方式

  1. 原生
    (1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
    (2)Session超时之后没有实现重连机制;
    (3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
    (4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
    (5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
    (6)删除节点无法实现级联删除;

  2. zkclient
    ZkClient是一个开源客户端,在Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了Session超时重连,Watcher反复注册等功能。像dubbo等框架对其也进行了集成使用。
    虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:
    (1)几乎没有参考文档;
    (2)异常处理简化(抛出RuntimeException);
    (3)重试机制比较难用;
    (4)没有提供各种使用场景的实现;

  3. Curator
    Curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。目前已经成为Apache的顶级项目。另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

本文选用zkclient 进行操作

2.代码

在这里插入图片描述

2.1. pom依赖

    <dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.10</version>
    </dependency>

2.2. zkCilent的序列化

防止数据在客户端呈现乱码,且 能 启动监听机制

package com.jiagouedu.zkclient.watcher;


import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

/**
 * @Classname zk序列化 和 反 序列化 方式
 * @Description TODO
 * @Date 2019/3/24 3:54
 * @Created by 爆裂无球
 */
public class MyZkSerializer implements ZkSerializer {
    /**
     *  zk自带的序列化
     */
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public byte[] serialize(Object obj) throws ZkMarshallingError {
        try {
            return String.valueOf(obj).getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}

2.3 zkCilent的Util

package com.jiagouedu.zkclient.znode;
import com.jiagouedu.zkclient.watcher.MyZkSerializer;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

import java.util.List;


/**
 * @Classname ZkClientCrud
 * @Description TODO
 * @Date 2019/3/24 2:54
 * @Created by 爆裂无球
 */
public class ZkClientCrud<T> {
 ZkClient zkClient;
    //   private String connectString="192.168.0.31:2181,192.168.0.32:2181,192.168.0.33:2181";
    private String connectString = "127.0.0.1:2181";

    //   public ZkClientUtil() {
//      this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());  // zk默认的序列化方式
//   }

    public ZkClientUtil() {
        //定义序列化方式,别忘了
        this.zkClient = new ZkClient(connectString, 5000, 5000, new MyZkSerializer());
    }

    public void createPersistent(String path) {
        zkClient.createPersistent(path, true); //创建持久化节点,true表示如果父节点不存在则创建父节点
    }

    //创建 永久 节点,并设置数据
    public void createPersistent(String path, Object data) {
        zkClient.createPersistent(path, data);
    }

    // 创建永久 有序节点
    public void createPersistentSequential(String path, Object data) {
        zkClient.createEphemeralSequential(path, data);
    }

    //创建临时节点  会话失效后删除
    public void createEphemeral(String path, Object data) {
        zkClient.createEphemeral(path, data);
    }

    //创建 临时节点 有序 节点   会话失效后删除
    public void createEphemeralSequential(String path, Object data) {
        zkClient.createEphemeralSequential(path, data);
    }

    //创建alc节点
    public void createAcl(String path, Object data, final List<ACL> acl, final CreateMode mode) {
        zkClient.create(path, data, acl, mode);
    }

    //设置acl 属性
    public void setAcl(String path, List<ACL> acl) {
        zkClient.setAcl(path, acl);
    }

    //获得acl属性
    public Map.Entry<List<ACL>, Stat> getAcl(String path) {
        return zkClient.getAcl(path);
    }


    //读取数据
    public T readData(String path) {
//      return zkClient.readData(path);
        //没有不会抛异常,而是返回null
        return zkClient.readData(path, true);
    }

    /**
     * 读取 子节点 只能找 其 子一级 下 所有的
     */
    public List<String> getChildren(String path) {
        return zkClient.getChildren(path);
    }

    /**
     * 递归查找 所有 子节点
     */
    public void getChilderRecursive(String path) {
        System.out.println(path);
        if (zkClient.exists(path)) {
            List<String> list = zkClient.getChildren(path);
            if (list.size() > 0) {
                list.stream().forEach(n -> {
                    getChilderRecursive(path + "/" + n);
                });
            }
        }

    }

    // 更新内容
    public void writeData(String path, Object object) {
        zkClient.writeData(path, object);
    }

    //删除单个节点 即这个节点下不能有子节点
    public void delete(String path) {
        zkClient.delete(path);
    }


    //递归删除节点 即删除其节点下 所有子节点  对应rmr 命令
    public void deleteRecursive(String path) {
        zkClient.deleteRecursive(path);
    }


    /***
     * 支持创建递归方式 但是不能写入数据
     * @param path
     * @param createParents     true,表明需要递归创建父节点
     */
    public void createPersistentRecursive(String path, boolean createParents) {
        zkClient.createPersistent(path, createParents);
    }

    /**
     * 关闭zk
     */
    public void close() {
        zkClient.close();
    }
  /**
     * 监听
     */
    public void lister(String path) {
        //对节点添加监听变化。  当前节点内容修改、节点删除 能监听数据
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.printf("   变更的节点为:%s,%s", dataPath, data);       // 节点变更  变更的节点为:/w,123
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.printf("    删除的节点为:%s", dataPath);
            }
        });
        //对父节点添加监听子节点变化。监听 下面子节点的新增、删除 和 当前节点   不监听数据发生修改和变化。  parentPath: /w,currentChilds:[ww1]
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("   parentPath: " + parentPath + ",currentChilds:" + currentChilds);
            }
        });
        //zeng  gai  shan
        //对父节点添加监听子节点变化。
        zkClient.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                if (state == Watcher.Event.KeeperState.SyncConnected) {
                    //当我重新启动后start,监听触发
                    System.out.println("连接成功");
                } else if (state == Watcher.Event.KeeperState.Disconnected) {
                    System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
                } else
                    System.out.println("其他状态" + state);
            }

            @Override
            public void handleNewSession() throws Exception {
                System.out.println("重建session");
            }

            @Override
            public void handleSessionEstablishmentError(Throwable error) throws Exception {
            }
        });

    }
   
}

2.4 zkCilent的测试

package com.jiagouedu.zkclient.ZkSerializer;
import com.jiagouedu.zkclient.watcher.MyZkSerializer;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.Watcher;

import java.util.List;

/**
 * @Classname 监听
 * @Description TODO
 * @Date 2019/3/24 3:38
 * @Created by 爆裂无球
 */
public class ZkClientWatcher<T> {
     private ZkClientUtil zkClientUtil = new ZkClientUtil();
    ZkClient zkClient;

    /**
     * 新增 节点(持节+临时) 并读取数据   (不能直接创建 子节点)
     */
    @Test
    public void createPersistent() {
        User user = new User();
        user.setAge(18);
        user.setName("zjq");
        zkClientUtil.createPersistent("/ry", user);       // 创建永久节点
        System.out.println(zkClientUtil.readData("/ry"));  // 读取 节点 数据
        User user2 = new User();
        user2.setAge(23);
        user2.setName("lj");
        zkClientUtil.createEphemeral("/rh", user2);        // 创建临时节点
        System.out.println(zkClientUtil.readData("/rh"));
    }

    /**
     * 新增 节点(永久有序,临时有序)   并读取数据   (不能直接创建 子节点)
     */
    @Test
    public void create() {
        User user = new User();
        user.setAge(1);
        user.setName("我擦");
        zkClientUtil.createPersistentSequential("/ry", user);    // 创建 永久有序节点

        User user2 = new User("我擦2", 2);
        zkClientUtil.createPersistentSequential("/ry", user2);    // 创建 临时有序节点

    }

    /**
     * 创建acl节点
     */
    @Test
    public void createAcl() {
        User user = new User("acldata", 3);
        User user2 = new User("acldata2", 4);
        List<ACL> aclList = new ArrayList<>();
        int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE;          // 或 运算 admin 可以设置节点访问控制列表权限
        aclList.add(new ACL(perm, new Id("world", "anyone")));                //设置有所人的权限
//        aclList.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "127.0.0.1")));               //设置所有权限,本机ip
        zkClientUtil.createPersistent("/acl", user);                                   // 创建节点
        zkClientUtil.setAcl("/acl", aclList);                                        // 给节点设置acl权限
        //创建并设置acl节点 ZooDefs.Ids.OPEN_ACL_UNSAFE 表anyone
        zkClientUtil.createAcl("/alc2", aclList, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * 获得 acl 属性
     */
    @Test
    public void getAndSetAcl() {
        Map.Entry acl = zkClientUtil.getAcl("/acl");
        System.out.println(acl.getKey());
        System.out.println(acl.getValue());
    }

    /**
     * 递归创建节点(但是不能写入数据) 写入数据,读数据,,再 删除子节点
     */
    @Test
    public void test() {
        String path = "/zjq/t1";
        zkClientUtil.createPersistentRecursive(path, true);
        zkClientUtil.writeData(path, new User("xx", 3));
        Object o = zkClientUtil.readData(path);
        System.out.println(o);
        zkClientUtil.deleteRecursive(path);
    }

    /**
     * 创建子节点,并读取
     */
    @Test
    public void getChildren() {
        String path = "/zjq/t1";
        String path2 = "/zjq/t2";
        String path3 = "/zjq/t3";
        zkClientUtil.createPersistentRecursive(path, true);
        zkClientUtil.createPersistentRecursive(path2, true);
        zkClientUtil.createPersistentRecursive(path3, true);
        List list = zkClientUtil.getChildren("/zjq");
        list.stream().forEach(n -> {
            System.out.println(n);
        });
    }

    /**
     * 递归查找 所有 子节点
     */
    @Test
    public void getChilderRecursive() {
        String path = "/zjq";
        zkClientUtil.getChilderRecursive(path);
    }


    /**
     *  测试监听     并且开启 下面的main方法
     */
    @Test
    public void testListen() throws InterruptedException {
        ZkClientUtil zkClientUtil = new ZkClientUtil();
        String path = "/wukong/w1";
        zkClientUtil.deleteRecursive(path);                     //先删除
        zkClientUtil.lister(path);                              //添加监听
        zkClientUtil.createPersistent(path, "123");      //再创建节点
        Thread.sleep(2000);
        zkClientUtil.writeData(path, "abc");            //修改数据
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static void main(String[] args) throws InterruptedException {
        ZkClientUtil zkClientUtil=new ZkClientUtil();
        String path="/wukong/w1";
        zkClientUtil.writeData(path,"abc"); //能触发 或者在sh zkCli.sh  delete /wukong 也行
    }
}

大概内容就这么多,不全的可以查查详细文档

3.结语

Curator好比hibernate,而zkclient就好比mybatis,一般情况下,zkclient可以满足需求~
世上无难事,只怕有心人,每天积累一点点,fighting!!!

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐