分布式协调工具 - Zookeeper 客户端(Curator) 使用
前言这一篇主要来说一下Zookeeper客户端Curator的使用方式涉及知识点增删改查 API分布式锁概念首先增删该查 肯定没啥说了把那么主要说的就是简单概括一下分布式锁的问题之所以叫做分布式锁 那么肯定是在分布式场景下才会遇到的问题实现分布式锁的方式mysql实现简单无法面对大并发场景容易死锁redis实现相对复杂性能好但...
前言
这一篇主要来说一下Zookeeper客户端Curator的使用方式
涉及知识点
- 增删改查 API
- 分布式锁
概念
首先增删该查 肯定没啥说了把 那么主要说的就是简单概括一下分布式锁的问题
之所以叫做分布式锁 那么肯定是在分布式场景下才会遇到的问题
实现分布式锁的方式
- mysql 实现简单 无法面对大并发场景 容易死锁
- redis 实现相对复杂 性能好 但是可能造成死锁
- zookeeper 实现复杂 (有了curator就简单多了 都是封装好的) 稳定 性能好 天生的就是用来做分布式锁的
要实现分布式锁必备的三个前提条件
- 在并发场景下
- 多进程对统一资源访问 不同的JVM
- 进程和进程之间相互互斥
在我们Java中的synchronized是进程内加锁 面对分布式领域 无能为力
zookeeper实现分布式锁的核心就是他的临时顺序节点 通过临时顺序节点就可以实现分布式锁
通过获取最小的节点数 来获取锁 那么没有获取到的锁 会通过 zookeeper 的 监听器 机制来监听通知 如果锁释放了 那么就继续尝试获取锁 一直这样 这里 简单概括一下 看这篇文章建议你已经学习了Zookeeper服务端的使用以及常用脚本命令 和 集群搭建方式 最好
依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
使用 增删改查
获取curator对象客户端
// ---------------------------------成员变量---------------------------------
/**
* 这是需要链接的地址 集群模式 如果不是集群模式 填写一个地址就好
*/
private static final java.lang.String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
// 设置超时时间
private static final Integer sessionTimeOut = 5000;
// -----------------------------main---------------------------------
/**
* 重试策略
* 1000 是睡眠时间
* 10 是重试次数
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,10);
// 通过这个对象设置链接地址 超时时间 和 重试策略 并构建curator对象
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.connectionTimeoutMs(sessionTimeOut)
.retryPolicy(retryPolicy).build();
curatorFramework.start();
增删改查
有异常需要捕获
// 创建一个节点
String parentName = curatorFramework.create().forPath("/master", "init".getBytes());
// 创建master节点的子节点
String parentChildName = curatorFramework.create().forPath("/master/lock","child".getBytes());
//递归创建节点 永久节点
curatorFramework.create().creatingParentsIfNeeded().forPath("/parent/child","内容".getBytes());
// 创建永久节点 带序号的
cf.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/123","内容".getBytes());
//创建临时节点
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/temp");
// 创建临时节点 带序号的
cf.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/linshi","内容".getBytes());
//递归删除节点
Void aVoid = curatorFramework.delete().deletingChildrenIfNeeded().forPath("/parent");
System.out.println(aVoid);
//删除节点
curatorFramework.delete().forPath("/master");
//获取子节点
List<String> strings = curatorFramework.getChildren().forPath("/");
for (String string : strings) {
System.out.println(string);
}
//设置节点数据
curatorFramework.setData().forPath("/master","set 之后的值".getBytes());
//获取节点数据
byte[] bytes = curatorFramework.getData().forPath("/master");
System.out.println(new java.lang.String(bytes));
监听器
- 只监听一次
public class curator_test3 {
private static final String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
private static final Integer connectionTimeOut = 5000;
public static void main(String[] args) throws Exception {
RetryPolicy rp = new ExponentialBackoffRetry(1000,10);
CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectionString)
.connectionTimeoutMs(connectionTimeOut).retryPolicy(rp).build();
cf.start();
/**
* 增删改操作监听 第一种
*/
/**
* 观察者
*/
byte[] data = cf.getData().usingWatcher(new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("监听到了 只不过只监听一次");
System.out.println(watchedEvent.getType());
}
}).forPath("/bhz");
Thread.sleep(Long.MAX_VALUE);
}
}
- 当监听的节点的数据发生变化的时候就会回调对应的函数
- 如果NodeCache监听的节点为空(也就是说传入的路径不存在)。那么如果我们后面创建了对应的 节点,也是会触发事件从而回调nodeChanged方法。但是删除了该节点并不会触发
public class curator_test4 {
/**
* 监听器 usingListener NodeCache TreeCache PathCache
* @param args
*/
private static final String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
private static final Integer connectionTiemOut = 5000;
public static void main(String[] args) throws Exception {
RetryPolicy rp = new ExponentialBackoffRetry(1000,10);
CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectionString).connectionTimeoutMs(connectionTiemOut)
.retryPolicy(rp).build();
cf.start();
final NodeCache cache = new NodeCache(cf,"/bhz",false);
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("the test node is change and result is :");
System.out.println("path : "+cache.getCurrentData().getPath());
System.out.println("data : "+new String(cache.getCurrentData().getData()));
System.out.println("stat : "+cache.getCurrentData().getStat());
}
});
cache.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
- 能够监听子节点的创建删除修改 PathChildrenCache
public class curator_test5 {
private static final String connectionString = "192.168.190.132:2181,192.168.190.133:2181,192.168.190.134:2181";
private static final Integer connectionTimeOut = 5000;
private static final Integer baseSleepTimeMs = 1000;
private static final Integer maxRetries = 10;
public static void main(String[] args) throws Exception {
RetryPolicy rp = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries);
CuratorFramework cf = CuratorFrameworkFactory
.builder()
.connectString(connectionString)
.connectionTimeoutMs(connectionTimeOut)
.retryPolicy(rp)
.build();
cf.start();
final PathChildrenCache pathChildrenCache = new PathChildrenCache(cf,"/bhz",true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
String path = pathChildrenCacheEvent.getData().getPath();
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
if (path.equalsIgnoreCase("/bhz/bj"))
System.out.println("导入北京数据库");
else if (path.equalsIgnoreCase("/bhz/sh"))
System.out.println("导入上海数据库");
else if (path.equalsIgnoreCase("/bhz/nj"))
System.out.println("导入南京数据库");
break;
case CHILD_UPDATED:
break;
case CHILD_REMOVED:
break;
}
}
});
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/bhz/bj");
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/bhz/sh");
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/bhz/nj");
Thread.sleep(Long.MAX_VALUE);
}
}
在最后我们创建了子节点进行测试 能够看到打印的信息就正常
- TreeCache 类似上面两种的结合体 既能够监听自身 也能够监听子节点
private static final String zkAddress = "centos3";
private static final int sessionTimeout = 2000;
private static String parentPath = "/Curator-Recipes1";//父节点
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
public static void main(String[] args) throws InterruptedException {
client.start();
final TreeCache treeCache = new TreeCache(client, parentPath);
try {
treeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
//添加错误监听器
treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
public void unhandledError(String s, Throwable throwable) {
System.out.println(".错误原因:" + throwable.getMessage() + "\n==============\n");
}
});
//节点变化的监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("treeCache ------ Type:" + treeCacheEvent.getType() + ",");
System.out.println(treeCacheEvent.getData().getPath());
}
});
Thread.sleep(Integer.MAX_VALUE);
}
分布式锁的使用
- 关键API
- org.apache.curator.framework.recipes.locks.InterProcessMutex
类名意思是:进程间互斥。体现着锁的本质。
创建
public InterProcessMutex(CuratorFramework client,
String path)
参数说明:
client:zookeeperk客户端的链接对象
path:加锁的zookeeper节点的path
使用
InterProcessMutex实例是一个可重用的对象。不需要每次使用时创建一个新的实例。可以安全的使用单例
- 加锁
public void acquire()
- 解锁
public void release()
持有锁的线程如果调用此方法,就会释放持有的锁 如果线程多次调用了acquire方法,也应该调用相同次数的release方法,否则线程会继续持有锁
path :加锁的zk节点path,通常可以通过InterProcessMutex.getParticipantNodes()获得
简单测试加锁效果 模拟四个线程
package curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class curatorDemo {
public static void main(String[] args) {
Resources r= new Resources();
Thread thread1 = new Thread(r);
Thread thread2 = new Thread(r);
Thread thread3 = new Thread(r);
Thread thread4 = new Thread(r);
thread1.start();
thread2.start();
thread3.start();
thread4.start();
}
}
class Resources implements Runnable{
private int count = 100;
private static final String connectionString = "127.0.0.1:2181";
private static final Integer connectionTimeOut = 5000;
private static final Integer baseSleepTimeMs = 1000;
private static final Integer maxRetries = 10;
private CuratorFramework cf = null;
{
RetryPolicy rp = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries);
cf = CuratorFrameworkFactory
.builder()
.connectString(connectionString)
.connectionTimeoutMs(connectionTimeOut)
.retryPolicy(rp)
.build();
cf.start();
}
InterProcessMutex lock = new InterProcessMutex(cf,"/lock");
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
while(count > 0){
try {
lock.acquire();
if (count > 0){
System.out.println(Thread.currentThread().getName() + "售卖了第" + count-- + "张票");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
结语
那么本篇就到这里~
更多推荐
所有评论(0)