LeaderLatch的使用与分析
Leader LatchZookeeper在分布式系统中,常常被用于选主。在执行某个任务时,让所有的节点都知道有一个特别的,唯一的节点是任务的主节点,由主节点进行任务的执行,其他节点作为备用节点。通过这种热备方式,为分布式系统中任务执行的可控性,以及系统高可用性。而Curator提供了两种选主机制,可以根据实际情况进行选用。关键APIorg.apache.curator.framework.rec
Leader Latch
Zookeeper在分布式系统中,常常被用于选主。在执行某个任务时,让所有的节点都知道有一个特别的,唯一的节点是任务的主节点,由主节点进行任务的执行,其他节点作为备用节点。通过这种热备方式,为分布式系统中任务执行的可控性,以及系统高可用性。
而Curator提供了两种选主机制,可以根据实际情况进行选用。
- 关键API
org.apache.curator.framework.recipes.leader.LeaderLatch
2. 机制说明
LeaderLatch的方式,就是以一种抢占的方式来决定选主。比较简单粗暴,逻辑相对简单。类似非公平锁的抢占,所以,多节点是一个随机产生主节点的过程。基本就是,谁抢到就算谁的。
多个参与者(如:逻辑节点;某个线程等),指定在一个分组之下,每个分组内进行主节点抢占。
3. 用法
3.1 创建
方法1
public LeaderLatch(CuratorFramework client,
String latchPath)
参数说明:
client : zk客户端链接
latchPath : 分组路径(zk中的path)
方法2
public LeaderLatch(CuratorFramework client,
String latchPath,
String id)
参数说明:
client : zk客户端链接
latchPath : 分组路径(zk中的path)
id : 参与者ID
3.2 使用
LeaderLatch创建好之后,必须执行:
leaderLatch.start();
这样,才能让leaderLatch开始参与选主过程。
由于LeaderLatch是一个不断抢占的过程,所以需要调用:
public boolean hasLeadership()
来检测当前参与者是否选主成功。这个方法是非阻塞的(立即返回),其结果只代表调用时的选主结果。所以,可以轮询此方法,或者当执行完本地逻辑后,需要执行分布式任务前检擦此方法。
不过,类似JDK中的CountDownLatch,LeaderLatch也提供了阻塞方法:
方法1
public void await()
throws InterruptedException,
EOFException
这个方法,会阻塞,直到选主成功。
方法2 为了避免方法1的长时间选主失败
public boolean await(long timeout,
TimeUnit unit)
throws InterruptedException
这个方法会根据参数中指定的时间,作为等待的期限。到期后,返回选主结果。
对于LeaderLatch实例,无论是否轩主成功,最后都应该调用:
leaderLatch.close();
这样,才会把当前参与者的信息从选主分组中移除出去。如果,当前参与者是主,还会释放主的资格。避免死锁。
4. 错误处理
在实际使用中,必须考虑链接问题引起的主身份丢失问题。 例如:当hasLeadership()返回true,之后链接出问题。 强烈建议:使用LeaderLatch时为其添加一个ConnectionStateListener
LeaderLatch实例会添加一个ConnectionStateListener来监听当前zk链接。 如果,链接不可用(SUSPENDED)则LeaderLatch会认为自己不在是主,等到链接恢复可用时,才可继续。 如果,链接断开(LOST),则LeaderLatch会认为自己不在是主,等到链接重新建立后,删除之前的参与者信息,然后重新参与选主。
5. 源码分析
5.1 LeaderLatch
5.1.1 类定义
先来看看类定义:
import java.io.Closeable;
public class LeaderLatch implements Closeable
{...}
注意:实现了java.io.Closeable,所以你懂的, try()…catch{}。(3.2中的leaderLatch.close();)
5.1.2 成员变量
public class LeaderLatch implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final WatcherRemoveCuratorFramework client;
private final String latchPath;
private final String id;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
handleStateChange(newState);
}
};
private static final String LOCK_NAME = "latch-";
private static final LockInternalsSorter sorter = new LockInternalsSorter()
{
@Override
public String fixForSorting(String str, String lockName)
{
return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
}
};
public enum State { LATENT, STARTED, CLOSED }
public enum CloseMode { SILENT, NOTIFY_LEADER }
@VisibleForTesting
volatile CountDownLatch debugResetWaitLatch = null;
log : caurtor依赖slf4j
client : zk客户端(curator-framework提供)
latchPath : 分组路径(zk中的path)
id : 参与者ID
state
内部枚举
状态
LATENT 休眠
STARTED 已启动
CLOSED 已关闭
使用AtomicReference原子化包装
hasLeadership
是否为主
使用AtomicBoolean原子化包装
ourPath
使用AtomicReference原子化包装
listeners
一组LeaderLatchListener监听器
closeMode
内部枚举
LeaderLatch关闭方式
SILENT : 静默关闭,不触发相关监听器
NOTIFY_LEADER :关闭时触发监听器
startTask
异步Future
使用AtomicReference原子化包装
listener
链接状态监听器
参见 : 4. 错误处理
LOCK_NAME
私有常量
sorter
私有常量
用于锁处理时,规范path
对参与者进行排序
debugResetWaitLatch
volatile 可见性
reset()使用
在测试时控制启动的时机,防止环境未初始化完成就处理了启动逻辑
注意:
这些成员变量都是final
类型。
并且,对于引用类型都进行原子化包装,避免并发问题
5.1.3 构造器
提供多个构造器模板,最终都是调用:
public LeaderLatch(CuratorFramework client, String latchPath)
{
this(client, latchPath, "", CloseMode.SILENT);
}
public LeaderLatch(CuratorFramework client, String latchPath, String id)
{
this(client, latchPath, id, CloseMode.SILENT);
}
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
{
this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();
this.latchPath = PathUtils.validatePath(latchPath);
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
可以发现:
默认是采用CloseMode.SILENT方式关闭
默认id是空字符串
client、latchPath、id、closeMode不能为空
5.1.4 启动
第3节,介绍过LeaderLatch是由start()启动选主过程:
public void start() throws Exception {
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
可以发现
调用原子性CAS方法,将状态由休眠更新到已启动
执行了一个异步任务来完成启动过程
使用一个链接可用后回调方式
AfterConnectionEstablished.execute()
内部使用了一个ThreadUtils.newSingleThreadExecutor
单线程的线程池
所以本地多个LeaderLatch实例的启动过程是序列化方式执行的
使用成员变量startTask持有异步Future
启动完成后会制空startTask
说明启动过程可能会有状态变化
启动的过程实际是由internalStart()方法来完成
private synchronized void internalStart() {
if ( state.get() == State.STARTED )
{
client.getConnectionStateListenable().addListener(listener);
try
{
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
}
}
}
internalStart()使用synchronized
同步调用
使用this进行互斥锁对象
同一个LeaderLatch对象的多次启动同样序列化执行
即便绕过第2布,也同样可以保证不会重复启动
进行状态判断
synchronized内部,再次判断
相当于Double check
在当前连接上注册自带的监听器
调用reset()完成启动逻辑
处理了异常
触发线程中断
internalStart()是异步执行,通过中断可以进行更细节的控制
避免粗暴的抛出异常
internalStart()是异步执行
避免当前线程意外中断
同时也避免了第2.1步骤中那个单线程的线程池频繁的进行线程开/关所带来的额外开销
@VisibleForTesting
void reset() throws Exception {
setLeadership(false);
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( debugResetWaitLatch != null )
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
getChildren();
}
}
else
{
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
return null;
}
});
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
return null;
}
});
}
notifyAll();
}
private void setNode(String newValue) throws Exception
{
String oldPath = ourPath.getAndSet(newValue);
if ( oldPath != null )
{
client.delete().guaranteed().inBackground().forPath(oldPath);
}
}
reset()的可见范围
利于测试
使用了com.google.common.annotations.VisibleForTesting
初始化选主状态false
getAndSet设置
根据不同的情况触发不同的监听器
得到
失去
notifyAll()
唤醒所有的synchronized等待
制空上次path
如果上一次path有残留,则delete服务器上的信息
在latchPath下创建一个EPHEMERAL_SEQUENTIAL节点
临时顺序节点
并注册了回调
回掉获取latchPath的子节点
并判断自身是否为主
5.1.5 选主
LeaderLatch的选主判断逻辑,是由上一节中第12步中注册的回调方法来触发。 实际由checkLeadership()方法处理:
private void checkLeadership(List<String> children) throws Exception
{
final String localOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
else if ( ourIndex == 0 )
{
setLeadership(true);
}
else
{
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
getChildren();
}
catch ( Exception ex )
{
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
}
};
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
// previous node is gone - reset
reset();
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
当获取到最新的参与者列表后:
对列表进行排序
如果自身处于列表第一位,则当选为主
否则,在latchPath上增加监听/回调
监听列表中上一位参与者
当上一位参与者退出(节点被删除时)
重新getChildren()再次进行选主
当latchPath发生变动(如:删除)
调用reset(),重新进行启动过程
即可导致hasLeadership()失效
- 测试
package com.roc.curator.demo.leader.latch
import org.apache.commons.lang3.RandomStringUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit
/**
* Created by roc on 2017/5/25.
*/
class LatchParticipant() {
val LATCH_PATH: String = "/test/leader/latch"
var client: CuratorFramework = CuratorFrameworkFactory.builder()
.connectString("0.0.0.0:8888")
.connectionTimeoutMs(5000)
.retryPolicy(ExponentialBackoffRetry(1000, 10))
.sessionTimeoutMs(3000)
.build()
@Before fun init() {
client.start()
}
@Test fun runTest() {
var id: String = RandomStringUtils.randomAlphabetic(10)
println("id : $id ")
val time = Date()
var latch: LeaderLatch = LeaderLatch(client, LATCH_PATH, id)
latch.start()
println("$id 开始竞选 $time")
while(!latch.await(3, TimeUnit.SECONDS)){
println("$id 选主失败 : $time")
println("当前主是:${latch.leader.id}")
println("参与者:${latch.participants}")
}
println("$id 选主成功 $time")
while (latch.hasLeadership()) {
println("$id 执行 $time")
TimeUnit.SECONDS.sleep(2)
if (Math.random() > 0.89) {
break;
}
}
println("$id 结束此轮: $time")
latch.close()
}
}
zookeeper节点:
get /test/leader/latch/_c_9b313527-e0ed-410f-9510-30e5fd92b5c6-latch-0000000208
zNillKMfuB
cZxid = 0x1db19
ctime = Thu May 25 20:53:22 CST 2017
mZxid = 0x1db19
mtime = Thu May 25 20:53:22 CST 2017
pZxid = 0x1db19
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07e9
dataLength = 10
numChildren = 0
更多推荐
所有评论(0)