前言

这一篇主要来说一下Zookeeper客户端Curator的使用方式

涉及知识点

  • 增删改查 API
  • 分布式锁

概念

首先增删该查 肯定没啥说了把 那么主要说的就是简单概括一下分布式锁的问题

之所以叫做分布式锁 那么肯定是在分布式场景下才会遇到的问题

实现分布式锁的方式

  • mysql 实现简单 无法面对大并发场景 容易死锁
  • redis 实现相对复杂 性能好 但是可能造成死锁
  • zookeeper 实现复杂 (有了curator就简单多了 都是封装好的) 稳定 性能好 天生的就是用来做分布式锁的

要实现分布式锁必备的三个前提条件

  • 在并发场景下
  • 多进程对统一资源访问 不同的JVM
  • 进程和进程之间相互互斥

在我们Java中的synchronized是进程内加锁 面对分布式领域 无能为力

zookeeper实现分布式锁的核心就是他的临时顺序节点 通过临时顺序节点就可以实现分布式锁

通过获取最小的节点数 来获取锁 那么没有获取到的锁 会通过 zookeeper 的 监听器 机制来监听通知 如果锁释放了 那么就继续尝试获取锁 一直这样 这里 简单概括一下 看这篇文章建议你已经学习了Zookeeper服务端的使用以及常用脚本命令 和 集群搭建方式 最好

依赖

  <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>

使用 增删改查

获取curator对象客户端

// ---------------------------------成员变量---------------------------------
/**
*  这是需要链接的地址 集群模式  如果不是集群模式 填写一个地址就好
*/
private static final java.lang.String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
// 设置超时时间
private static final Integer sessionTimeOut = 5000;


// -----------------------------main---------------------------------
/**
 * 重试策略
 * 1000 是睡眠时间
 * 10 是重试次数
 */
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,10);
// 通过这个对象设置链接地址  超时时间   和 重试策略 并构建curator对象
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
        .connectString(connectionString)
        .connectionTimeoutMs(sessionTimeOut)
        .retryPolicy(retryPolicy).build();
curatorFramework.start();

增删改查

有异常需要捕获

// 创建一个节点
 String parentName = curatorFramework.create().forPath("/master", "init".getBytes());
 
 // 创建master节点的子节点
 String parentChildName = curatorFramework.create().forPath("/master/lock","child".getBytes());
 
 //递归创建节点  永久节点
 curatorFramework.create().creatingParentsIfNeeded().forPath("/parent/child","内容".getBytes());

// 创建永久节点 带序号的 
 cf.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/123","内容".getBytes());
 
 //创建临时节点
 curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/temp");

// 创建临时节点 带序号的
cf.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/linshi","内容".getBytes());
 
 //递归删除节点
 Void aVoid = curatorFramework.delete().deletingChildrenIfNeeded().forPath("/parent");
 System.out.println(aVoid);
 
 //删除节点
 curatorFramework.delete().forPath("/master");
 
 //获取子节点
 List<String> strings = curatorFramework.getChildren().forPath("/");
 for (String string : strings) {
     System.out.println(string);
 }
 
 //设置节点数据
 curatorFramework.setData().forPath("/master","set 之后的值".getBytes());
 
 //获取节点数据
 byte[] bytes = curatorFramework.getData().forPath("/master");
 System.out.println(new java.lang.String(bytes));

监听器

  • 只监听一次
public class curator_test3 {
    private static final String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
    private static final Integer connectionTimeOut = 5000;
    public static void main(String[] args) throws Exception {
        RetryPolicy rp = new ExponentialBackoffRetry(1000,10);
        CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectionString)
                        .connectionTimeoutMs(connectionTimeOut).retryPolicy(rp).build();
        cf.start();

        /**
         * 增删改操作监听  第一种
         */
        /**
         * 观察者
                */
        byte[] data = cf.getData().usingWatcher(new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听到了 只不过只监听一次");
                System.out.println(watchedEvent.getType());
            }
        }).forPath("/bhz");

        Thread.sleep(Long.MAX_VALUE);
    }
}

  • 当监听的节点的数据发生变化的时候就会回调对应的函数
  • 如果NodeCache监听的节点为空(也就是说传入的路径不存在)。那么如果我们后面创建了对应的 节点,也是会触发事件从而回调nodeChanged方法。但是删除了该节点并不会触发
public class curator_test4 {
    /**
     * 监听器    usingListener   NodeCache   TreeCache  PathCache
     * @param args
     */

    private static final String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
    private static final Integer connectionTiemOut = 5000;
    public static void main(String[] args) throws Exception {
        RetryPolicy rp = new ExponentialBackoffRetry(1000,10);
        CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectionString).connectionTimeoutMs(connectionTiemOut)
                                    .retryPolicy(rp).build();
        cf.start();
        final NodeCache cache = new NodeCache(cf,"/bhz",false);
        cache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("the test node is change and result is :");
                System.out.println("path : "+cache.getCurrentData().getPath());
                System.out.println("data : "+new String(cache.getCurrentData().getData()));
                System.out.println("stat : "+cache.getCurrentData().getStat());
            }
        });
        cache.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}
  • 能够监听子节点的创建删除修改 PathChildrenCache
public class curator_test5 {
    private static final String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
    private static final Integer connectionTimeOut = 5000;
    private static final Integer baseSleepTimeMs = 1000;
    private static final Integer maxRetries = 10;

    public static void main(String[] args) throws Exception {
        RetryPolicy rp = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries);
        CuratorFramework cf = CuratorFrameworkFactory
                              .builder()
                              .connectString(connectionString)
                              .connectionTimeoutMs(connectionTimeOut)
                              .retryPolicy(rp)
                              .build();
        cf.start();
        final PathChildrenCache pathChildrenCache = new PathChildrenCache(cf,"/bhz",true);
        pathChildrenCache.start();
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                String path = pathChildrenCacheEvent.getData().getPath();
                switch (pathChildrenCacheEvent.getType()){
                        case CHILD_ADDED:
                            if (path.equalsIgnoreCase("/bhz/bj"))
                                System.out.println("导入北京数据库");
                            else if (path.equalsIgnoreCase("/bhz/sh"))
                                System.out.println("导入上海数据库");
                            else if (path.equalsIgnoreCase("/bhz/nj"))
                                System.out.println("导入南京数据库");
                            break;
                        case CHILD_UPDATED:
                            break;
                        case CHILD_REMOVED:
                            break;
                    }
            }
        });
        cf.create().withMode(CreateMode.EPHEMERAL).forPath("/bhz/bj");
        cf.create().withMode(CreateMode.EPHEMERAL).forPath("/bhz/sh");
        cf.create().withMode(CreateMode.EPHEMERAL).forPath("/bhz/nj");
        Thread.sleep(Long.MAX_VALUE);
    }
}

在最后我们创建了子节点进行测试 能够看到打印的信息就正常

  • TreeCache 类似上面两种的结合体 既能够监听自身 也能够监听子节点
 private static final String zkAddress = "centos3";
    private static final int sessionTimeout = 2000;
    private static String parentPath = "/Curator-Recipes1";//父节点

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .sessionTimeoutMs(sessionTimeout)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .build();

    public static void main(String[] args) throws InterruptedException {
        client.start();
        final TreeCache treeCache = new TreeCache(client, parentPath);
        try {
            treeCache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        //添加错误监听器
        treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
            public void unhandledError(String s, Throwable throwable) {
                System.out.println(".错误原因:" + throwable.getMessage() + "\n==============\n");
            }
        });

        //节点变化的监听器
        treeCache.getListenable().addListener(new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("treeCache ------ Type:" + treeCacheEvent.getType() + ",");
                System.out.println(treeCacheEvent.getData().getPath());

            }
        });

        Thread.sleep(Integer.MAX_VALUE);
    }

分布式锁的使用

  • 关键API
  • org.apache.curator.framework.recipes.locks.InterProcessMutex

类名意思是:进程间互斥。体现着锁的本质。

创建

public InterProcessMutex(CuratorFramework client,
                         String path)

参数说明:

client:zookeeperk客户端的链接对象
path:加锁的zookeeper节点的path

使用

InterProcessMutex实例是一个可重用的对象。不需要每次使用时创建一个新的实例。可以安全的使用单例

  • 加锁
public void acquire()
  • 解锁
public void release()

持有锁的线程如果调用此方法,就会释放持有的锁 如果线程多次调用了acquire方法,也应该调用相同次数的release方法,否则线程会继续持有锁

path :加锁的zk节点path,通常可以通过InterProcessMutex.getParticipantNodes()获得

简单测试加锁效果 模拟四个线程

package curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class curatorDemo {


    public static void main(String[] args) {
        Resources  r= new Resources();

        Thread thread1 = new Thread(r);
        Thread thread2 = new Thread(r);
        Thread thread3 = new Thread(r);
        Thread thread4 = new Thread(r);

        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
    }

}

class Resources implements Runnable{

    private int count = 100;
    private static final String connectionString = "127.0.0.1:2181";
    private static final Integer connectionTimeOut = 5000;
    private static final Integer baseSleepTimeMs = 1000;
    private static final Integer maxRetries = 10;
    private CuratorFramework cf = null;

    {
        RetryPolicy rp = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries);
         cf = CuratorFrameworkFactory
                .builder()
                .connectString(connectionString)
                .connectionTimeoutMs(connectionTimeOut)
                .retryPolicy(rp)
                .build();
        cf.start();
    }

    InterProcessMutex lock = new InterProcessMutex(cf,"/lock");


    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
            while(count > 0){
                try {
                    lock.acquire();
                    if (count > 0){
                        System.out.println(Thread.currentThread().getName() + "售卖了第" + count-- + "张票");
                    }
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }
    }
}

在这里插入图片描述

结语

那么本篇就到这里~

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐