Curator 是一个提供了高级API框架的java语音Zookeeper客户端库,使用其开发不需要在关系网络连接管理等细节,使开发变得更容易、可靠。Curator还提供了锁,leader选举等高级特性。Curator分为以下几个部分:
- curator-client ZooKeeper类替代品
- curator-framework 提供了高级API
- curator-recipes 提供了利用Zookeeper开发的高级特性。
添加依赖
1 | <dependency> |
主要API
构建连接类
Curator的连接类为CuratorFramework
,通过工厂类CuratorFrameworkFactory
来实例化。对于一个Zookeeper集群,只需要一个CuratorFramework
对象。开始使用CuratorFramework
前,需要调用start()
方法。程序结束前,需要调用close()
方法。
有两种构建方式:1
2
3RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
还可以使用流式风格构建:1
2
3
4
5
6
7
8
9RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
构建时还可以设置命名空间1
2
3
4CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
…
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"
节点操作
创建
1 | String createdPath = client.create().withMode(CreateMode.PERSISTENT) |
注意:client.create().storingStatIn(stat)
stat并未设置值。
还调用creatingParentsIfNeeded()
直接创建不存在的父节点1
2createdPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath("/a/b/c", "TEST".getBytes(Charsets.UTF_8));
是否存在
1 | Stat stat = client.checkExists().usingWatcher(curatorTest).forPath(PATH); |
stat == null
说明节点不存在。
获取数据
1 | byte[] dataBytes = client.getData().storingStatIn(stat).usingWatcher(watcher).forPath(PATH); |
更新数据
1 | Stat newStat = client.setData().withVersion(stat.getVersion()) |
获取子节点
1 | List<String> children = client.getChildren().usingWatcher(watcher).forPath("/"); |
删除节点
1 | client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(100).forPath(multiPath); |
guaranteed()
:保证删除。当由于网络问题导致删除失败,只有CuratorFramework
是开启的,则会在后台持续尝试删除,直至成功。当要注意,仍然会收到删除失败的异常。
事物
transaction()
方法可以开启Zookeeper事物,可以组合create、setData、check、delete与commit操作,将其当做一个单元。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16CuratorOp createOp = client.transactionOp().create()
.forPath("/transaction", "transaction".getBytes(Charsets.UTF_8));
CuratorOp setDataOp = client.transactionOp().setData().withVersion(0)
.forPath("/transaction", "NEW DATA".getBytes(Charsets.UTF_8));
CuratorOp checkOp = client.transactionOp().check().withVersion(1).forPath("/transaction");
CuratorOp deleteOp = client.transactionOp().delete().forPath("/transaction");
try {
List<CuratorTransactionResult> curatorTransactionResults = client.transaction()
.forOperations(createOp, setDataOp, checkOp, deleteOp);
for (CuratorTransactionResult result : curatorTransactionResults) {
System.out.println(result.getForPath()+" "+result.getType()+" "+ result.getError());
}
} catch (Exception e) {
e.printStackTrace();
}
例子
1 | package com.fei.curator; |
cache
Zookeeper原生客户端注册的watch只能生效一次,想要监听节点变化需要反复注册watch。Curator中的cache可以自动注册监听,方便开发者获取节点相关信息。
NodeCache
NodeCache
用来缓存一个节点的视图。当节点被创建、修改、删除时,NodeCache
会自动的修改其内容。调用getCurrentData()
可以获取节点当前的数据、状态,返回值为null则说明节点不存在。还可以调用nodeCache.getListenable().addListener(new NodeCacheListener())
添加监听,当节点有变化时,触发监听。
要开启需要调用start()
或者start(buildInitial)
,buildInitial
参数表示开启时否是获取数据。使用完毕后需要调用close()
。
需要注意的是,数据完全同步是不可能的,所以修改数据时要使用版本号来修改,以免覆盖了其他用户的修改。
IMPORTANT - it’s not possible to stay transactionally in sync. Users of this class must be prepared for false-positives and false-negatives. Additionally, always use the version number when updating data to avoid overwriting another process’ change.
1 | package com.fei.curator; |
PathChildrenCache
PathChildrenCache
用来监听一个节点的子节点的变化,包括update/create/delete,并会在本地缓存所有子节点的数据、状态。可以注册一个监听器,当改变发生的时候会收到通知。
相关类:
- PathChildrenCache
- PathChildrenCacheMode
- PathChildrenCacheListener
- ChildData
启动 关闭
调用start()
开启cache,使用完毕后调用close()
.start()
可以传入启动模式StartMode
参数,StartMode
有三种:
NORMAL
默认模式。cahe在后台初始化数据,每一个已经存在的节点都会收到CHILD_ADDED
事件.调用start()
后立即调用getCurrentData()
返回为空。BUILD_INITIAL_CACHE
调用start(StartMode.BUILD_INITIAL_CACHE)
前会自动初始化数据,之后立即调用getCurrentData()
会返回所有子节点数据。POST_INITIALIZED_EVENT
与NORMAL
相同,但在初始化数据完成后会抛出INITIALIZED
事件。
实例
1 | package com.fei.curator; |
需要注意的是,数据完全同步是不可能的,所以修改数据时要使用版本号来修改,以免覆盖了其他用户的修改。
IMPORTANT - it’s not possible to stay transactionally in sync. Users of this class must be prepared for false-positives and false-negatives. Additionally, always use the version number when updating data to avoid overwriting another process’ change.
实际测试中发现,如果PathChildrenCache
处于start()
状态,视图删除cache观察的目录无效。调用close()
后再删除,则生效。
TreeCache
TreeCache
用来监听一个节点为起始的整个节点树的变化,包括update/create/delete,并会在本地缓存树的所有子节点的数据、状态。可以注册一个监听器,当改变发生的时候会收到通知。
相关类:
- TreeCache
- TreeCacheListener
启动 关闭
调用start()
开启cache,使用完毕后调用close()
。
注意start()
后,调用start()
后立即调用getCurrentChildren(xxx)
或getCurrentData(xxx)
可能会返回空数据,因为还未初始化完成。
实例
1 | package com.fei.curator; |
lock
Shared Reentrant Lock
分布式可重入互斥锁,该锁时公平锁,按照请求的顺序获得锁。
- 相关类:
InterProcessMutex
- 创建:
public InterProcessMutex(CuratorFramework client,String path)
- 获取锁:
public void acquire()
,public boolean acquire(long time,TimeUnit unit)
释放锁:
public void release()
实例
1 | public class FakeLimitedResource |
1 | public class ExampleClientThatLocks |
1 | public class LockingExample { |
- 错误处理
强烈建议增加一个ConnectionStateListener
监听类,并关注SUSPENDED
、LOST
状态。SUSPENDED
状态下不能确定客户端是否还持有锁,除非随后收到RECONNECTED
状态。LOST
状态下不在持有锁。
Shared Lock
全局同步不可重入互斥锁。
- 相关类:
InterProcessSemaphoreMutex
- 创建:
public InterProcessSemaphoreMutex(CuratorFramework client,String path)
- 获取锁:
public void acquire()
,public boolean acquire(long time,TimeUnit unit)
- 释放锁:
public void release()
- 错误处理
同Shared Reentrant Lock
Shared Reentrant Read Write Lock
分布式读写锁。读锁是共享的,可以同时被多个线程持有,写锁则只能被一个线程持有。一个写锁可以同时请求读锁,但反之不行。一个读锁视图去获取写锁,则永远不会成功。
- 相关类:
InterProcessReadWriteLock
、InterProcessLock
创建:
1
2
3public InterProcessReadWriteLock(CuratorFramework client,String basePath)
public InterProcessLock readLock()
public InterProcessLock writeLock()获取锁:
public void acquire()
,public boolean acquire(long time,TimeUnit unit)
- 释放锁:
public void release()
- 错误处理
同Shared Reentrant Lock
Multi Shared Lock
管理多个锁的容器。当acquire()
被调用后,会请求获取所有锁,如果失败,会双方所有已经请求的锁。同样的,release()
被调用后,所有的锁都会被释放。
- 创建:
public InterProcessMultiLock(List<InterProcessLock> locks)
或public InterProcessMultiLock(CuratorFramework client, List<String> paths)
- 获取、释放、错误处理:同其他锁一致。
Leader选举
分布式计算中,leader选举是指从多个计算机节点中选举出一个作为组织者的过程。Curator中有两个leader选举recipes。
LeaderSelector
公平选举,内部采用InterProcessMutex
实现,按照原始的请求顺序,在前一个leader放弃后依次成为leader。
相关类:
- LeaderSelector
- LeaderSelectorListener
- LeaderSelectorListenerAdapter
API
创建:
1
2
3
4
5
6/**
* @param client the client
* @param leaderPath the path for this leadership group
* @param listener listener
*/
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)leaderSelector.start();
开启选举takeLeadership()
start 后,当成为leader后,listener的takeLeadership()
方法会被调用。takeLeadership()
应该只有在需要放弃leader时才退出。leaderSelector.close();
关闭leaderSelector.autoRequeue()
放弃leader后继续参与选举
错误处理
使用LeaderSelector
时需要关注连接状态变化。如果已经成为leader,需要对SUSPENDED
、LOST
做出响应。
如果收到SUSPENDED
,应该假定自己已经不再是leader,知道收到RECONNECTED
。如果收到LOST
,则自己已经不再是leader,takeLeadership
方法应该退出。
实例
1 | /** |
1 | public class LeaderSelectorExample |
LeaderLatch
相关类
- LeaderLatch
- LeaderLatchListener
API
创建:
1
2
3
4
5
6
7
8
9
10
11/**
* @param client the client
* @param latchPath the path for this leadership group
*/
public LeaderLatch(CuratorFramework client, String latchPath)
/**
* @param client the client
* @param latchPath the path for this leadership group
* @param id participant ID
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id)开启
leaderLatch.start();
- 是否成为leader
leaderLatch.hasLeadership();
- 阻塞直至成为leader
leaderLatch.await();
- 关闭
leaderLatch.close();
退出选举,如果本身时领导,则会放弃领导,是唯一放弃leader的方法。 - leader状态改变通知
当成为leader后LeaderLatchListener.isLeader()
会被调用,放弃leader后,LeaderLatchListener.notLeader()
会被调用。
barrier
DistributedBarrier
分布式系统中用来阻止一系列节点的运行,直接某一刻满足条件后,所有节点继续运行。DistributedBarrier
原理新简单,调用waitOnBarrier()
方法后,线程会检查特定节点是否存在,如果不存在则代表条件满足,否则继续等待(Object.wait())。DistributedBarrier
提供了setBarrier()
、removeBarrier()
两个工具方法,代表开始等待、条件满足,继续执行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65/**
* BarrierTest
*
* @author fei
*/
public class BarrierTest {
private static final int CLIENT_QTY = 5;
private static final String CONNECT_STRING = "127.0.0.1:2187,127.0.0.1:2188,127.0.0.1:2189";
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(CLIENT_QTY);
List<CuratorFramework> clients = new ArrayList<>(CLIENT_QTY);
List<DistributedBarrier> barriers = new ArrayList<>(CLIENT_QTY);
try {
CuratorFramework c = CuratorFrameworkFactory
.newClient(CONNECT_STRING, new ExponentialBackoffRetry(1000, 3));
c.start();
DistributedBarrier b = new DistributedBarrier(c, PATH);
try {
//设置栅栏
b.setBarrier();
} catch (Exception e) {
e.printStackTrace();
}
clients.add(c);
barriers.add(b);
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client = CuratorFrameworkFactory
.newClient(CONNECT_STRING, new ExponentialBackoffRetry(1000, 3));
clients.add(client);
client.start();
DistributedBarrier barrier = new DistributedBarrier(client, PATH);
barriers.add(barrier);
int index = i;
long sleepTime = index * 1000;
executorService.submit(() -> {
try {
Thread.sleep(sleepTime);
System.out.println(index + "start wait");
barrier.waitOnBarrier();
System.out.println(index + "start continue");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(6000);
try {
b.removeBarrier();
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
} finally {
clients.forEach(CloseableUtils::closeQuietly);
executorService.shutdownNow();
}
}
}
DistributedDoubleBarrier
DistributedDoubleBarrier
可以使多个客户端同步计算的开始与结束。当足够数量的处理加入栏栅时,开始记性计算,并在全部结束后离开计算。
构造
1
2
3
4
5
6
7
8
9
10
11/**
* Creates the barrier abstraction. <code>memberQty</code> is the number of members in the
* barrier. When {@link #enter()} is called, it blocks until all members have entered. When
* {@link #leave()} is called, it blocks until all members have left.
*
* @param client the client
* @param barrierPath path to use
* @param memberQty the number of members in the barrier. NOTE: more than <code>memberQty</code>
* can enter the barrier. <code>memberQty</code> is a threshold, not a limit
*/
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)进入
enter();
- 退出
leave();
调用enter()
、leave()
方法后会阻塞,知道调用相应方法的客户端数量超过memberQty
1 | /** |
注意事项
ZooKeeper watches 使用单线程
所有的ZooKeeper watches处理时串行的,当一个watcher在执行时,其他的watcher都不能执行。所以,watcher处理应该尽可能快的返回。1
2
3
4
5
6
7
8...
InterProcessMutex lock = ...
public void process(WatchedEvent event)
{
lock.acquire();
...
}
上述代码不能工作。InterProcessMutex
依赖watcher获得通知。可以使用另外的线程请求锁:1
2
3
4
5
6
7
8
9
10
11
12
13...
InterProcessMutex lock = ...
ExecutorService service = ...
public void process(WatchedEvent event)
{
service.submit(new Callable<Void>(){
Void call() {
lock.acquire();
...
}
});
}
如何在InterProcessMutex
获取锁失败后立即返回
1 | InterProcessMutex lock = ... |
处理session failure
Curator处理session failure的默认策略月处理网络连接失败一样:检查当前重试策略,如果运行重试,则重试。
但是,如果一系列操作都与session相关,例如,临时节点创建后作为一种标记,然后执行其他操作。如果在任何地方session过期,则整个操作过期。如果需要这种行为,应该使用SessionFailRetryLoop
。