session
使用客户端来创建一个和zk服务端连接的句柄,这就是一个会话(session)。Session一旦建立,状态就是连接中(CONNECTING
)状态,然后客户端会尝试去连接zk服务端,连接成功之后状态变成已连接(CONNECTED
)。一般正常情况下只会有这两个状态。不过,还是会发生一些无法恢复的错误/故障,比如:session过期,认证失败,或者客户端关闭连接,这种情况下,session状态会变成关闭(CLOSED)状态。下图给出了zk客户顿可能的状态转换情况:

为了保持client会话的有效性,在ZooKeeper运行过程中,client会在会话超时时间过期范围内向server发送PING请求来保持会话的有效性,俗称“心跳检测”。同时server重新激活client对应的会话,这段逻辑是在SessionTrackerImpl
的touchSession
中实现的。

再看下源码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
//sessionId为发起会话激活的client的sessionId,timeout为会话超时时间
synchronized public boolean touchSession(long sessionId, int timeout) {
/*
* sessionsById的结构为 HashMap<Long, SessionImpl>(),每个sessionid都有一个对应的session实现
* 这里取出对应的session实现
*/
SessionImpl s = sessionsById.get(sessionId);
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
return false;
}
//计算当前会话的下一个失效时间,可以理解为ExpirationTime_New
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
//tickTime是上一次计算的超时时间,可以理解为ExpirationTime_Old
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
//将ExpirationTime_Old对应的桶中的会话取出,SessionSet 是SessionImpl的集合
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
//将旧桶中的会话移除
set.sessions.remove(s);
}
//更新当前会话的下一次超时时间
s.tickTime = expireTime;
//从新桶中取出该会话,无则创建,有则更新
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
|
好了,我们了解了是会话是如何激活的,那在什么时候会发起激活呢,也就是touchSession
这个方法什么时候被触发呢?分以下两种情况:
- 只要client向server发送请求,包括读或写请求,就会触发一次激活;
- 如果client发现在sessionTimeOut / 3 时间内未尚和server进行任何通信,就会主动发起一次PING请求,进而触发激活;
过期会话如何清理
一言蔽之吧,会话过期后,集群中所有server都删除由该会话创建的临时节点(EPHEMERAL)信息
watch机制
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点被删除、子目录节点增加删除)时,zk会通知客户端。
要点:
- 监听通知是一次性的,只通知一次;
- 假如需要再次监听,需要重新进行一次注册监听;
- 只要是修改了数据,即便是数据一样,依然会有客户端数据改变通知;(修改了节点数据,这个节点版本属性变了,zk以节点版本为准,版本变了,就发送客户端通知)
watch是什么
观察者的功能
zk支持watch(观察)的概念。客户端可以在每个znode节点上设置一个观察(watcher)。如果被观察服务端的znode节点有变更,那么watch就会被触发,这个watch所属的客户端将接受到一个通知包被告知该节点已经发生变化,把相应的事件通知给设置watcher的client端。
zk里的所有读取操作:getData(); getChildren()和exits()都有设置watch的选项。
一句话总结,watch是基于异步回调的事件触发机制。
zk watch
Znode发生变化(Znode本身的增加,删除,修改,以及子Znode的变化)可以通过Watch机制通知到客户端。那么要实现Watch,就必须实现org.apache.zookeeper.Watcher接口,并且将实现类的对象传入到可以Watch的方法中。Zookeeper中所有读操作(getData(),getChildren(),exists())都可以设置Watch选项。Watch事件具有one-time trigger(一次性触发)的特性,如果Watch监视的Znode有变化,那么就会通知设置该Watch的客户端。
在上述说道的所有读操作中,如果需要Watcher,我们可以自定义Watcher,如果是Boolean型变量,当为true时,则使用系统默认的Watcher,系统默认的Watcher是在Zookeeper的构造函数中定义的Watcher。参数中Watcher为空或者false,表示不启用Wather。
一次触发
客户端在Znode设置了Watch时,如果Znode内容发生改变,那么客户端就会获得Watch事件。例如:客户端设置getData("/znode1", true)后,如果/znode1发生改变或者删除,那么客户端就会得到一个/znode1的Watch事件,但是/znode1再次发生变化,那客户端是无法收到Watch事件的,除非客户端设置了新的Watch。
发往客户端
Watch事件是异步发送到Client。Zookeeper可以保证客户端发送过去的更新顺序是有序的。例如:某个Znode没有设置watcher,那么客户端对这个Znode设置Watcher发送到集群之前,该客户端是感知不到该Znode任何的改变情况的。换个角度来解释:由于Watch有一次性触发的特点,所以在服务器端没有Watcher的情况下,Znode的任何变更就不会通知到客户端。
不过,即使某个Znode设置了Watcher,且在Znode有变化的情况下通知到了客户端,但是在客户端接收到这个变化事件,但是还没有再次设置Watcher之前,如果其他客户端对该Znode做了修改,这种情况下,Znode第二次的变化客户端是无法收到通知的。这可能是由于网络延迟或者是其他因素导致,所以我们使用Zookeeper不能期望能够监控到节点每次的变化。Zookeeper只能保证最终的一致性,而无法保证强一致性。
设置watch的数据类容
Znode改变有很多种方式,例如:节点创建,节点删除,节点改变,子节点改变等等。Zookeeper维护了两个Watch列表,一个节点数据Watch列表,另一个是子节点Watch列表。getData()和exists()设置数据Watch,getChildren()设置子节点Watch。两者选其一,可以让我们根据不同的返回结果选择不同的Watch方式,getData()和exists()返回节点的内容,getChildren()返回子节点列表。因此,setData()触发内容Watch,create()触发当前节点的内容Watch或者是其父节点的子节点Watch。delete()同时触发父节点的子节点Watch和内容Watch,以及子节点的内容Watch。
zk watcher的运行机制
流程图

- Watch是轻量级的,其实就是本地JVM的Callback,服务器端只是存了是否有设置了Watcher的布尔类型。(源码见:org.apache.zookeeper.server.FinalRequestProcessor)
- 在服务端,在FinalRequestProcessor处理对应的Znode操作时,会根据客户端传递的watcher变量,添加到对应的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中进行持久化存储,同时将自己NIOServerCnxn做为一个Watcher callback,监听服务端事件变化
- Leader通过投票通过了某次Znode变化的请求后,然后通知对应的Follower,Follower根据自己内存中的zkDataBase信息,发送notification信息给zookeeper客户端。
- Zookeeper客户端接收到notification信息后,找到对应变化path的watcher列表,挨个进行触发回调。
所谓的回调

由图看出,zk的watcher由客户端,客户端WatchManager,zk服务器组成。整个过程涉及了消息通信及数据存储。
- zk客户端向zk服务器注册watcher的同时,会将watcher对象存储在客户端的watchManager。
- Zk服务器触发watcher事件后,会向客户端发送通知,客户端线程从watchManager中回调watcher执行相应的功能。
ZK事件
Zookeeper状态/事件对应关系图:

ZooKeeper中Watch事件(见:org.apache.zookeeper.Watcher.EventType):
- None 在客户端与Zookeeper集群中的服务器断开连接的时候,客户端会收到这个事件。
- NodeCreated Znode创建事件
- NodeDeleted Znode删除事件
- NodeDataChanged Znode数据内容更新事件。其实本质上该事件只关注dataVersion版本号,但是只要调用了更新接口dataVersion就会有变更。
- NodeChildrenChanged Znode子节点改变事件,只关注子节点的个数变更,子节点内容有变更是不会通知的。
在事件发生时,ZooKeeper的状态(见:org.apache.zookeeper.Watcher.KeeperState):
- Disconnected(0) 客户端处于断开连接的状态,并且没有和Zookeeper集群中任何服务器连接。
- SyncConnected(3) 客户端处于连接的状态,也就是说客户端连接到了一台server
- AuthFailed(4) 验证失败的状态
- ConnectedReadOnly(5) 客户端连接到一个只读Server的状态。
- SaslAuthenticated(6) 用于通知客户端他们是SASL认证,以至于他们能够SASL认证的权限通过操作Zookeeper。
- Expired(-112) 会话超时状态
code
watchone 一次性触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package com.eh.zkclient;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Demo2 {
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 50 * 1000;
private static final String PATH = "/eh";
@Getter
@Setter
private ZooKeeper zk;
// 获取连接
@SneakyThrows
public void startZK() {
this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
}));
}
// 获取节点信息
@SneakyThrows
public String getZnode(String nodePath) {
byte[] data = zk.getData(nodePath, false, new Stat());
return new String(data);
}
// 事件回调
private void triggleEvent(WatchedEvent event) {
String retVal = getZnode(PATH);
log.info("=======>triggleEvent:{}", retVal);
}
/**
* 一次性watch
* 监控/eh节点,获得初次值后设置watch,只要发生新的变化就打印出最新的值
*/
@SneakyThrows
public void test() {
startZK();
if (zk.exists(PATH, false) != null) {
// getData 注册watcher
byte[] data = zk.getData(
PATH,
this::triggleEvent,
new Stat()
);
log.info("获取初始值:{}", new String(data));
}
}
@SneakyThrows
public static void main(String[] args) {
Demo2 demo = new Demo2();
demo.test();
TimeUnit.DAYS.sleep(1);
}
}
|
-
获取初始值后
-
使用zkCli模拟其他客户端更新节点值
1
|
[zk: localhost:2181(CONNECTED) 14] set /eh helloZK_v3
|
-
可以看到triggleEvent获取到新值
-
再更新值则获取不到新值了
watchmore 多次触发,长效机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package com.eh.zkclient;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Demo3 {
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 50 * 1000;
private static final String PATH = "/eh";
@Getter
@Setter
private ZooKeeper zk;
// 获取连接
@SneakyThrows
public void startZK() {
this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
}));
}
private final Watcher watcher = event -> triggleEvent(event);
// 事件回调, 重新注册相同的watcher
@SneakyThrows
private void triggleEvent(WatchedEvent event) {
byte[] data = zk.getData(PATH, watcher, new Stat());
String retVal = new String(data);
log.info("=======>triggleEvent:{}", retVal);
}
@SneakyThrows
public void test() {
startZK();
if (zk.exists(PATH, false) != null) {
// getData 注册watcher
byte[] data = zk.getData(
PATH,
this::triggleEvent,
new Stat()
);
log.info("获取初始值:{}", new String(data));
}
}
@SneakyThrows
public static void main(String[] args) {
Demo3 demo = new Demo3();
demo.test();
TimeUnit.DAYS.sleep(1);
}
}
|
触发时重新设置相同的watcher
1
2
3
4
|
2020-11-08 01:19:57 INFO [main] com.eh.zkclient.Demo3 - 获取初始值:helloZK_v5
2020-11-08 01:19:57 INFO [main-EventThread] com.eh.zkclient.Demo3 - =======>triggleEvent:helloZK_v6
2020-11-08 01:19:57 INFO [main-EventThread] com.eh.zkclient.Demo3 - =======>triggleEvent:helloZK_v7
2020-11-08 01:19:57 INFO [main-EventThread] com.eh.zkclient.Demo3 - =======>triggleEvent:helloZK_v8
|
watchchild
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package com.eh.zkclient;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Demo3 {
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 50 * 1000;
private static final String PATH = "/eh";
@Getter
@Setter
private ZooKeeper zk;
// 获取连接
@SneakyThrows
public void startZK() {
this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
}));
}
private final Watcher watcher = event -> triggleEvent(event);
// 事件回调, 重新注册相同的watcher
@SneakyThrows
private void triggleEvent(WatchedEvent event) {
List<String> children = zk.getChildren(PATH, watcher);
log.info("获取所有的子节点:{}", children);
}
/**
* 监控子节点变化(增删)情况
*/
@SneakyThrows
public void test() {
startZK();
if (zk.exists(PATH, false) != null) {
// getData 注册watcher
List<String> children = zk.getChildren(
PATH,
this::triggleEvent
);
log.info("获取所有的子节点:{}", children);
}
}
@SneakyThrows
public static void main(String[] args) {
Demo3 demo = new Demo3();
demo.test();
TimeUnit.DAYS.sleep(1);
}
}
|
zkClient
1
2
3
4
5
6
7
8
|
[zk: localhost:2181(CONNECTED) 25] ls /eh
[]
[zk: localhost:2181(CONNECTED) 26] create /eh/eh-1 helloZK_child_v1
Created /eh/eh-1
[zk: localhost:2181(CONNECTED) 27] create /eh/eh-2 helloZK_child_v2
Created /eh/eh-2
[zk: localhost:2181(CONNECTED) 28] create /eh/eh-3 helloZK_child_v3
Created /eh/eh-3
|
可以看到控制台日志打印:
1
2
3
4
|
2020-11-08 01:32:27 INFO [main] com.eh.zkclient.Demo3 - 获取所有的子节点:[]
2020-11-08 01:32:27 INFO [main-EventThread] com.eh.zkclient.Demo3 - 获取所有的子节点:[eh-1]
2020-11-08 01:32:27 INFO [main-EventThread] com.eh.zkclient.Demo3 - 获取所有的子节点:[eh-1, eh-2]
2020-11-08 01:32:27 INFO [main-EventThread] com.eh.zkclient.Demo3 - 获取所有的子节点:[eh-1, eh-3, eh-2]
|