zookeeper客户端框架Curator
Curator客户端Cureator的特点:1.封装Zookeeper client与Zookeeper server之间的连接处理2.提供了一套Fluent风格的API3.提供Zookeeper各种应用场景的抽象封装,比如recipe、共享锁、集群领导选举等。...
·
Curator介绍
Curator是Apache软件基金会下的一个开源框架,目前是Apache下的顶级项目。Curator是ZooKeeper的一个Java客户端库,包括一个高层API框架和一些实用程序,使得开发人员在使用ZooKeeper时更容易、更可靠。另外Curator还包括一些通用用例和扩展,例如如服务发现和Java 8异步DSL等。
官网地址
Curator中包含的组件:
- curator-recipes: Zookeeper典型用例场景实现,这些实现基于Curator Framework。
- curator-framework: Zookeeper API的高层封装,大大简化了Zookeeper客户端编程个,添加了例如Zookeeper连接管理、重试机制等。
- curator-client: Zookeeper Client的封装,用于取代原生的Zookeeper客户端。
- curator-test: 包含TestingServer、TestingCluster和其他一些对测试有用的工具。
- curator-examples: 各种Curator特性的示例用法。
- curator-async: 具有O/R建模、迁移和许多其他特性的异步DSL。
- curator-x-discovery: 基于Curator Framework的服务发现实现
- curator-x-discovery-server: 可用于Curator发现的RESTful服务器。
一般情况下,前三个即可满足正常使用。
Cureator的优点:
- 封装Zookeeper client与Zookeeper server之间的连接处理
- 提供了一套Fluent风格的API
- 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
Curator框架使用
项目中引入MAVEN依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.1</version>
</dependency>
基础操作:
public class CuratorClientTest {
public static void main(String[] args) throws Exception {
// zk连接
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
// 创建节点
client.create().withMode(CreateMode.PERSISTENT).forPath("/curatordata", "1".getBytes());
// 监控节点
NodeCache nodeCache = new NodeCache(client, "/curatordata");
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("数据被修改了,包括修改、删除");
}
});
// 监控状态改变
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("连接状态改变");
}
});
// 监控子节点
PathChildrenCache childCache = new PathChildrenCache(client, "/curatordata", true);
childCache.start();
childCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED: {
System.out.println("Child Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_UPDATED: {
System.out.println("Child Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_REMOVED: {
System.out.println("Child Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
}
}
});
client.setData().forPath("/curatordata", "2".getBytes());
// 获取节点信息
System.out.println(new String(client.getData().forPath("/curatordata")));
// 删除节点
// client.delete().forPath("/curatordata");
System.in.read();
// 使用完一定要关闭
CloseableUtils.closeQuietly(nodeCache);
CloseableUtils.closeQuietly(childCache);
CloseableUtils.closeQuietly(client);
}
}
典型用例场景:领导者选举
LeaderLatch
该领导者选举底层基于临时节点和顺序节点实现
public class LeaderLatchExample {
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> leaderLatches = Lists.newArrayList();
try {
for(int i =0; i<10; i++) {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new RetryNTimes(3,1000));
clients.add(client);
client.start();
LeaderLatch leaderLatch = new LeaderLatch(client,"/leaderlatch","client#" + i);
leaderLatches.add(leaderLatch);
leaderLatch.start();
}
// 睡2S用于选举出结果
TimeUnit.SECONDS.sleep(2);
for(LeaderLatch leaderLatch: leaderLatches) {
if(leaderLatch.hasLeadership()) {
System.out.println("当前Leader是" + leaderLatch.getId());
}
}
System.in.read();
} catch (Exception e) {
} finally {
for(CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
for(LeaderLatch leaderLatch : leaderLatches) {
CloseableUtils.closeQuietly(leaderLatch);
}
}
}
}
LeaderSelector
该领导者选举底层基于锁机制,谁获取到了谁就是leader。
public class LeaderSelectorExample {
public static void main(String[] args) {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelector> leaderSelectors = Lists.newArrayList();
try {
for (int i = 0; i < 10; i++) {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
clients.add(client);
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client, "/leaderselector", new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("当前Leader是" + client);
// 睡5s代表有5S的leader权,5s后重新选举leader
TimeUnit.SECONDS.sleep(5);
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}
});
leaderSelector.start();
leaderSelectors.add(leaderSelector);
}
System.in.read();
} catch (Exception e) {
} finally {
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
for (LeaderSelector leaderSelector : leaderSelectors) {
CloseableUtils.closeQuietly(leaderSelector);
}
}
}
}
上面两个领导者选举算法是Curator提供的用例场景,与zookeeper本身的领导者选举算法无关。
注意: The client must be started (and closed when no longer needed).
更多推荐
已为社区贡献1条内容
所有评论(0)