Curator介绍

Curator是Apache软件基金会下的一个开源框架,目前是Apache下的顶级项目。Curator是ZooKeeper的一个Java客户端库,包括一个高层API框架和一些实用程序,使得开发人员在使用ZooKeeper时更容易、更可靠。另外Curator还包括一些通用用例和扩展,例如如服务发现和Java 8异步DSL等。
官网地址

Curator中包含的组件:

  • curator-recipes: Zookeeper典型用例场景实现,这些实现基于Curator Framework。
  • curator-framework: Zookeeper API的高层封装,大大简化了Zookeeper客户端编程个,添加了例如Zookeeper连接管理、重试机制等。
  • curator-client: Zookeeper Client的封装,用于取代原生的Zookeeper客户端。
  • curator-test: 包含TestingServer、TestingCluster和其他一些对测试有用的工具。
  • curator-examples: 各种Curator特性的示例用法。
  • curator-async: 具有O/R建模、迁移和许多其他特性的异步DSL。
  • curator-x-discovery: 基于Curator Framework的服务发现实现
  • curator-x-discovery-server: 可用于Curator发现的RESTful服务器。

一般情况下,前三个即可满足正常使用。

Cureator的优点:

  • 封装Zookeeper client与Zookeeper server之间的连接处理
  • 提供了一套Fluent风格的API
  • 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装

Curator框架使用

项目中引入MAVEN依赖:

<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
	<version>4.0.1</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>4.0.1</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-client</artifactId>
	<version>4.0.1</version>
</dependency>

基础操作:

public class CuratorClientTest {
    public static void main(String[] args) throws Exception {
    	// zk连接
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
        client.start();
        
        // 创建节点
        client.create().withMode(CreateMode.PERSISTENT).forPath("/curatordata", "1".getBytes());

        // 监控节点
        NodeCache nodeCache = new NodeCache(client, "/curatordata");
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("数据被修改了,包括修改、删除");
            }
        });

        // 监控状态改变
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                System.out.println("连接状态改变");
            }
        });

        // 监控子节点
        PathChildrenCache childCache = new PathChildrenCache(client, "/curatordata", true);
        childCache.start();
        childCache.getListenable().addListener(new PathChildrenCacheListener() {

            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        System.out.println("Child Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
                        break;
                    }

                    case CHILD_UPDATED: {
                        System.out.println("Child Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
                        break;
                    }

                    case CHILD_REMOVED: {
                        System.out.println("Child Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
                        break;
                    }
                }
            }
        });

        client.setData().forPath("/curatordata", "2".getBytes());

        // 获取节点信息
        System.out.println(new String(client.getData().forPath("/curatordata")));

        // 删除节点
        //  client.delete().forPath("/curatordata");

        System.in.read();
        
		// 使用完一定要关闭
        CloseableUtils.closeQuietly(nodeCache);
        CloseableUtils.closeQuietly(childCache);
        CloseableUtils.closeQuietly(client);
    }
}

典型用例场景:领导者选举
LeaderLatch
该领导者选举底层基于临时节点和顺序节点实现

public class LeaderLatchExample {
    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> leaderLatches = Lists.newArrayList();

        try {
            for(int i =0; i<10; i++) {
                CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new RetryNTimes(3,1000));
                clients.add(client);
                client.start();

                LeaderLatch leaderLatch = new LeaderLatch(client,"/leaderlatch","client#" + i);
                leaderLatches.add(leaderLatch);
                leaderLatch.start();
            }
			// 睡2S用于选举出结果
            TimeUnit.SECONDS.sleep(2);
            for(LeaderLatch leaderLatch: leaderLatches) {
                if(leaderLatch.hasLeadership()) {
                    System.out.println("当前Leader是" + leaderLatch.getId());
                }
            }

            System.in.read();
        } catch (Exception e) {

        } finally {
            for(CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            for(LeaderLatch leaderLatch : leaderLatches) {
                CloseableUtils.closeQuietly(leaderLatch);
            }
        }
    }
}

LeaderSelector
该领导者选举底层基于锁机制,谁获取到了谁就是leader。

public class LeaderSelectorExample {

    public static void main(String[] args) {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelector> leaderSelectors = Lists.newArrayList();

        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
                clients.add(client);
                client.start();

                LeaderSelector leaderSelector = new LeaderSelector(client, "/leaderselector", new LeaderSelectorListener() {
                    @Override
                    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                        System.out.println("当前Leader是" + client);
                        // 睡5s代表有5S的leader权,5s后重新选举leader
                        TimeUnit.SECONDS.sleep(5);
                    }

                    @Override
                    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    }
                });
                leaderSelector.start();
                leaderSelectors.add(leaderSelector);
            }
            System.in.read();
        } catch (Exception e) {

        } finally {
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            for (LeaderSelector leaderSelector : leaderSelectors) {
                CloseableUtils.closeQuietly(leaderSelector);
            }
        }
    }
}

上面两个领导者选举算法是Curator提供的用例场景,与zookeeper本身的领导者选举算法无关。

注意: The client must be started (and closed when no longer needed).

进一步学习请参考官方文档官方示例


------------本文结束感谢您的阅读------------
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐