Curator 是一个提供了高级API框架的java语音Zookeeper客户端库,使用其开发不需要在关系网络连接管理等细节,使开发变得更容易、可靠。Curator还提供了锁,leader选举等高级特性。Curator分为以下几个部分:
- curator-client ZooKeeper类替代品
 - curator-framework 提供了高级API
 - curator-recipes 提供了利用Zookeeper开发的高级特性。
 
添加依赖
1  | <dependency>  | 
主要API
构建连接类
Curator的连接类为CuratorFramework,通过工厂类CuratorFrameworkFactory来实例化。对于一个Zookeeper集群,只需要一个CuratorFramework对象。开始使用CuratorFramework前,需要调用start()方法。程序结束前,需要调用close()方法。
有两种构建方式:1
2
3RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
还可以使用流式风格构建:1
2
3
4
5
6
7
8
9RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
        .connectString(connectionInfo)
        .sessionTimeoutMs(5000)
        .connectionTimeoutMs(5000)
        .retryPolicy(retryPolicy)
        .build();
client.start();
构建时还可以设置命名空间1
2
3
4CuratorFramework    client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
…
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"
节点操作
创建
1  | String createdPath = client.create().withMode(CreateMode.PERSISTENT)  | 
注意:client.create().storingStatIn(stat) stat并未设置值。
还调用creatingParentsIfNeeded()直接创建不存在的父节点1
2createdPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .forPath("/a/b/c", "TEST".getBytes(Charsets.UTF_8));
是否存在
1  | Stat stat = client.checkExists().usingWatcher(curatorTest).forPath(PATH);  | 
stat == null说明节点不存在。
获取数据
1  | byte[] dataBytes = client.getData().storingStatIn(stat).usingWatcher(watcher).forPath(PATH);  | 
更新数据
1  | Stat newStat = client.setData().withVersion(stat.getVersion())  | 
获取子节点
1  | List<String> children = client.getChildren().usingWatcher(watcher).forPath("/");  | 
删除节点
1  | client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(100).forPath(multiPath);  | 
guaranteed():保证删除。当由于网络问题导致删除失败,只有CuratorFramework是开启的,则会在后台持续尝试删除,直至成功。当要注意,仍然会收到删除失败的异常。
事物
transaction()方法可以开启Zookeeper事物,可以组合create、setData、check、delete与commit操作,将其当做一个单元。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16CuratorOp createOp = client.transactionOp().create()
    .forPath("/transaction", "transaction".getBytes(Charsets.UTF_8));
CuratorOp setDataOp = client.transactionOp().setData().withVersion(0)
    .forPath("/transaction", "NEW DATA".getBytes(Charsets.UTF_8));
CuratorOp checkOp = client.transactionOp().check().withVersion(1).forPath("/transaction");
CuratorOp deleteOp = client.transactionOp().delete().forPath("/transaction");
try {
    List<CuratorTransactionResult> curatorTransactionResults = client.transaction()
        .forOperations(createOp, setDataOp, checkOp, deleteOp);
    for (CuratorTransactionResult result : curatorTransactionResults) {
        System.out.println(result.getForPath()+"  "+result.getType()+"  "+ result.getError());
    }
} catch (Exception e) {
    e.printStackTrace();
}
例子
1  | package com.fei.curator;  | 
cache
Zookeeper原生客户端注册的watch只能生效一次,想要监听节点变化需要反复注册watch。Curator中的cache可以自动注册监听,方便开发者获取节点相关信息。
NodeCache
NodeCache 用来缓存一个节点的视图。当节点被创建、修改、删除时,NodeCache会自动的修改其内容。调用getCurrentData()可以获取节点当前的数据、状态,返回值为null则说明节点不存在。还可以调用nodeCache.getListenable().addListener(new NodeCacheListener())添加监听,当节点有变化时,触发监听。
要开启需要调用start()或者start(buildInitial),buildInitial参数表示开启时否是获取数据。使用完毕后需要调用close()。
需要注意的是,数据完全同步是不可能的,所以修改数据时要使用版本号来修改,以免覆盖了其他用户的修改。
IMPORTANT - it’s not possible to stay transactionally in sync. Users of this class must be prepared for false-positives and false-negatives. Additionally, always use the version number when updating data to avoid overwriting another process’ change.
1  | package com.fei.curator;  | 
PathChildrenCache
PathChildrenCache用来监听一个节点的子节点的变化,包括update/create/delete,并会在本地缓存所有子节点的数据、状态。可以注册一个监听器,当改变发生的时候会收到通知。
相关类:
- PathChildrenCache
 - PathChildrenCacheMode
 - PathChildrenCacheListener
 - ChildData
 
启动 关闭
调用start()开启cache,使用完毕后调用close().start()可以传入启动模式StartMode参数,StartMode有三种:
NORMAL默认模式。cahe在后台初始化数据,每一个已经存在的节点都会收到CHILD_ADDED事件.调用start()后立即调用getCurrentData()返回为空。BUILD_INITIAL_CACHE调用start(StartMode.BUILD_INITIAL_CACHE)前会自动初始化数据,之后立即调用getCurrentData()会返回所有子节点数据。POST_INITIALIZED_EVENT与NORMAL相同,但在初始化数据完成后会抛出INITIALIZED事件。
实例
1  | package com.fei.curator;  | 
需要注意的是,数据完全同步是不可能的,所以修改数据时要使用版本号来修改,以免覆盖了其他用户的修改。
IMPORTANT - it’s not possible to stay transactionally in sync. Users of this class must be prepared for false-positives and false-negatives. Additionally, always use the version number when updating data to avoid overwriting another process’ change.
实际测试中发现,如果PathChildrenCache处于start()状态,视图删除cache观察的目录无效。调用close()后再删除,则生效。
TreeCache
TreeCache用来监听一个节点为起始的整个节点树的变化,包括update/create/delete,并会在本地缓存树的所有子节点的数据、状态。可以注册一个监听器,当改变发生的时候会收到通知。
相关类:
- TreeCache
 - TreeCacheListener
 
启动 关闭
调用start()开启cache,使用完毕后调用close()。
注意start()后,调用start()后立即调用getCurrentChildren(xxx)或getCurrentData(xxx)可能会返回空数据,因为还未初始化完成。
实例
1  | package com.fei.curator;  | 
lock
Shared Reentrant Lock
分布式可重入互斥锁,该锁时公平锁,按照请求的顺序获得锁。
- 相关类:
InterProcessMutex - 创建: 
public InterProcessMutex(CuratorFramework client,String path) - 获取锁:
public void acquire(),public boolean acquire(long time,TimeUnit unit) 释放锁:
public void release()实例
1  | public class FakeLimitedResource  | 
1  | public class ExampleClientThatLocks  | 
1  | public class LockingExample {  | 
- 错误处理
强烈建议增加一个ConnectionStateListener监听类,并关注SUSPENDED、LOST状态。SUSPENDED状态下不能确定客户端是否还持有锁,除非随后收到RECONNECTED状态。LOST状态下不在持有锁。 
Shared Lock
全局同步不可重入互斥锁。
- 相关类:
InterProcessSemaphoreMutex - 创建:
public InterProcessSemaphoreMutex(CuratorFramework client,String path) - 获取锁:
public void acquire(),public boolean acquire(long time,TimeUnit unit) - 释放锁:
public void release() - 错误处理
同Shared Reentrant Lock 
Shared Reentrant Read Write Lock
分布式读写锁。读锁是共享的,可以同时被多个线程持有,写锁则只能被一个线程持有。一个写锁可以同时请求读锁,但反之不行。一个读锁视图去获取写锁,则永远不会成功。
- 相关类:
InterProcessReadWriteLock、InterProcessLock 创建:
1
2
3public InterProcessReadWriteLock(CuratorFramework client,String basePath)
public InterProcessLock readLock()
public InterProcessLock writeLock()获取锁:
public void acquire(),public boolean acquire(long time,TimeUnit unit)- 释放锁:
public void release() - 错误处理
同Shared Reentrant Lock 
Multi Shared Lock
管理多个锁的容器。当acquire()被调用后,会请求获取所有锁,如果失败,会双方所有已经请求的锁。同样的,release()被调用后,所有的锁都会被释放。
- 创建:
public InterProcessMultiLock(List<InterProcessLock> locks)或public InterProcessMultiLock(CuratorFramework client, List<String> paths) - 获取、释放、错误处理:同其他锁一致。
 
Leader选举
分布式计算中,leader选举是指从多个计算机节点中选举出一个作为组织者的过程。Curator中有两个leader选举recipes。
LeaderSelector
公平选举,内部采用InterProcessMutex实现,按照原始的请求顺序,在前一个leader放弃后依次成为leader。
相关类:
- LeaderSelector
 - LeaderSelectorListener
 - LeaderSelectorListenerAdapter
API
 创建:
1
2
3
4
5
6/**
* @param client the client
* @param leaderPath the path for this leadership group
* @param listener listener
*/
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)leaderSelector.start();开启选举takeLeadership()start 后,当成为leader后,listener的takeLeadership()方法会被调用。takeLeadership()应该只有在需要放弃leader时才退出。leaderSelector.close();关闭leaderSelector.autoRequeue()放弃leader后继续参与选举
错误处理
使用LeaderSelector时需要关注连接状态变化。如果已经成为leader,需要对SUSPENDED、LOST做出响应。
如果收到SUSPENDED,应该假定自己已经不再是leader,知道收到RECONNECTED。如果收到LOST,则自己已经不再是leader,takeLeadership 方法应该退出。
实例
1  | /**  | 
1  | public class LeaderSelectorExample  | 
LeaderLatch
相关类
- LeaderLatch
 - LeaderLatchListener
 
API
创建:
1
2
3
4
5
6
7
8
9
10
11/**
* @param client the client
* @param latchPath the path for this leadership group
*/
public LeaderLatch(CuratorFramework client, String latchPath)
/**
* @param client the client
* @param latchPath the path for this leadership group
* @param id participant ID
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id)开启
leaderLatch.start();- 是否成为leader 
leaderLatch.hasLeadership(); - 阻塞直至成为leader 
leaderLatch.await(); - 关闭 
leaderLatch.close();
退出选举,如果本身时领导,则会放弃领导,是唯一放弃leader的方法。 - leader状态改变通知
当成为leader后LeaderLatchListener.isLeader()会被调用,放弃leader后,LeaderLatchListener.notLeader()会被调用。 
barrier
DistributedBarrier
分布式系统中用来阻止一系列节点的运行,直接某一刻满足条件后,所有节点继续运行。DistributedBarrier原理新简单,调用waitOnBarrier()方法后,线程会检查特定节点是否存在,如果不存在则代表条件满足,否则继续等待(Object.wait())。DistributedBarrier提供了setBarrier()、removeBarrier()两个工具方法,代表开始等待、条件满足,继续执行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65/**
 * BarrierTest
 *
 * @author fei
 */
public class BarrierTest {
    private static final int CLIENT_QTY = 5;
    private static final String CONNECT_STRING = "127.0.0.1:2187,127.0.0.1:2188,127.0.0.1:2189";
    private static final String PATH = "/examples/barrier";
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(CLIENT_QTY);
        List<CuratorFramework> clients = new ArrayList<>(CLIENT_QTY);
        List<DistributedBarrier> barriers = new ArrayList<>(CLIENT_QTY);
        try {
            CuratorFramework c = CuratorFrameworkFactory
                .newClient(CONNECT_STRING, new ExponentialBackoffRetry(1000, 3));
            c.start();
            DistributedBarrier b = new DistributedBarrier(c, PATH);
            try {
                //设置栅栏
                b.setBarrier();
            } catch (Exception e) {
                e.printStackTrace();
            }
            clients.add(c);
            barriers.add(b);
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client = CuratorFrameworkFactory
                    .newClient(CONNECT_STRING, new ExponentialBackoffRetry(1000, 3));
                clients.add(client);
                client.start();
                DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                barriers.add(barrier);
                int index = i;
                long sleepTime = index * 1000;
                executorService.submit(() -> {
                    try {
                        Thread.sleep(sleepTime);
                        System.out.println(index + "start wait");
                        barrier.waitOnBarrier();
                        System.out.println(index + "start continue");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            Thread.sleep(6000);
            try {
                b.removeBarrier();
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(2000);
        } finally {
            clients.forEach(CloseableUtils::closeQuietly);
            executorService.shutdownNow();
        }
    }
}
DistributedDoubleBarrier
DistributedDoubleBarrier可以使多个客户端同步计算的开始与结束。当足够数量的处理加入栏栅时,开始记性计算,并在全部结束后离开计算。
构造
1
2
3
4
5
6
7
8
9
10
11/**
* Creates the barrier abstraction. <code>memberQty</code> is the number of members in the
* barrier. When {@link #enter()} is called, it blocks until all members have entered. When
* {@link #leave()} is called, it blocks until all members have left.
*
* @param client the client
* @param barrierPath path to use
* @param memberQty the number of members in the barrier. NOTE: more than <code>memberQty</code>
* can enter the barrier. <code>memberQty</code> is a threshold, not a limit
*/
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)进入
enter();- 退出 
leave(); 
调用enter()、leave()方法后会阻塞,知道调用相应方法的客户端数量超过memberQty
1  | /**  | 
注意事项
ZooKeeper watches 使用单线程
所有的ZooKeeper watches处理时串行的,当一个watcher在执行时,其他的watcher都不能执行。所以,watcher处理应该尽可能快的返回。1
2
3
4
5
6
7
8...
InterProcessMutex   lock = ...
public void process(WatchedEvent event)
{
    lock.acquire();
       ...
}
上述代码不能工作。InterProcessMutex依赖watcher获得通知。可以使用另外的线程请求锁:1
2
3
4
5
6
7
8
9
10
11
12
13...
InterProcessMutex   lock = ...
ExecutorService    service = ...
public void process(WatchedEvent event)
{
    service.submit(new Callable<Void>(){
        Void call() {
            lock.acquire();
              ...
        }
    });
}
如何在InterProcessMutex获取锁失败后立即返回
1  | InterProcessMutex lock = ...  | 
处理session failure
Curator处理session failure的默认策略月处理网络连接失败一样:检查当前重试策略,如果运行重试,则重试。
但是,如果一系列操作都与session相关,例如,临时节点创建后作为一种标记,然后执行其他操作。如果在任何地方session过期,则整个操作过期。如果需要这种行为,应该使用SessionFailRetryLoop。