目录

zk通知机制

session

使用客户端来创建一个和zk服务端连接的句柄,这就是一个会话(session)。Session一旦建立,状态就是连接中(CONNECTING)状态,然后客户端会尝试去连接zk服务端,连接成功之后状态变成已连接(CONNECTED)。一般正常情况下只会有这两个状态。不过,还是会发生一些无法恢复的错误/故障,比如:session过期,认证失败,或者客户端关闭连接,这种情况下,session状态会变成关闭(CLOSED)状态。下图给出了zk客户顿可能的状态转换情况:

http://img.cana.space/picStore/20201107234057.png

为了保持client会话的有效性,在ZooKeeper运行过程中,client会在会话超时时间过期范围内向server发送PING请求来保持会话的有效性,俗称“心跳检测”。同时server重新激活client对应的会话,这段逻辑是在SessionTrackerImpltouchSession中实现的。

http://img.cana.space/picStore/20201108112611.png

再看下源码实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//sessionId为发起会话激活的client的sessionId,timeout为会话超时时间
synchronized public boolean touchSession(long sessionId, int timeout) {
        /*
         * sessionsById的结构为 HashMap<Long, SessionImpl>(),每个sessionid都有一个对应的session实现
         * 这里取出对应的session实现
         */
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
        //计算当前会话的下一个失效时间,可以理解为ExpirationTime_New
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
        //tickTime是上一次计算的超时时间,可以理解为ExpirationTime_Old
        if (s.tickTime >= expireTime) {
            // Nothing needs to be done
            return true;
        }
        //将ExpirationTime_Old对应的桶中的会话取出,SessionSet 是SessionImpl的集合
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
        	//将旧桶中的会话移除
            set.sessions.remove(s);
        }
        //更新当前会话的下一次超时时间
        s.tickTime = expireTime;
        //从新桶中取出该会话,无则创建,有则更新
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);
        return true;
    }

好了,我们了解了是会话是如何激活的,那在什么时候会发起激活呢,也就是touchSession这个方法什么时候被触发呢?分以下两种情况:

  1. 只要client向server发送请求,包括读或写请求,就会触发一次激活;
  2. 如果client发现在sessionTimeOut / 3 时间内未尚和server进行任何通信,就会主动发起一次PING请求,进而触发激活;

过期会话如何清理

一言蔽之吧,会话过期后,集群中所有server都删除由该会话创建的临时节点(EPHEMERAL)信息

watch机制

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点被删除、子目录节点增加删除)时,zk会通知客户端。

要点:

  • 监听通知是一次性的,只通知一次;
  • 假如需要再次监听,需要重新进行一次注册监听;
  • 只要是修改了数据,即便是数据一样,依然会有客户端数据改变通知;(修改了节点数据,这个节点版本属性变了,zk以节点版本为准,版本变了,就发送客户端通知

watch是什么

观察者的功能

zk支持watch(观察)的概念。客户端可以在每个znode节点上设置一个观察(watcher)。如果被观察服务端的znode节点有变更,那么watch就会被触发,这个watch所属的客户端将接受到一个通知包被告知该节点已经发生变化,把相应的事件通知给设置watcher的client端。

zk里的所有读取操作:getData(); getChildren()和exits()都有设置watch的选项。

一句话总结,watch是基于异步回调的事件触发机制。

zk watch

Znode发生变化(Znode本身的增加,删除,修改,以及子Znode的变化)可以通过Watch机制通知到客户端。那么要实现Watch,就必须实现org.apache.zookeeper.Watcher接口,并且将实现类的对象传入到可以Watch的方法中。Zookeeper中所有读操作(getData(),getChildren(),exists())都可以设置Watch选项。Watch事件具有one-time trigger(一次性触发)的特性,如果Watch监视的Znode有变化,那么就会通知设置该Watch的客户端。

在上述说道的所有读操作中,如果需要Watcher,我们可以自定义Watcher,如果是Boolean型变量,当为true时,则使用系统默认的Watcher,系统默认的Watcher是在Zookeeper的构造函数中定义的Watcher。参数中Watcher为空或者false,表示不启用Wather。

一次触发

客户端在Znode设置了Watch时,如果Znode内容发生改变,那么客户端就会获得Watch事件。例如:客户端设置getData("/znode1", true)后,如果/znode1发生改变或者删除,那么客户端就会得到一个/znode1的Watch事件,但是/znode1再次发生变化,那客户端是无法收到Watch事件的,除非客户端设置了新的Watch。

发往客户端

Watch事件是异步发送到Client。Zookeeper可以保证客户端发送过去的更新顺序是有序的。例如:某个Znode没有设置watcher,那么客户端对这个Znode设置Watcher发送到集群之前,该客户端是感知不到该Znode任何的改变情况的。换个角度来解释:由于Watch有一次性触发的特点,所以在服务器端没有Watcher的情况下,Znode的任何变更就不会通知到客户端。

不过,即使某个Znode设置了Watcher,且在Znode有变化的情况下通知到了客户端,但是在客户端接收到这个变化事件,但是还没有再次设置Watcher之前,如果其他客户端对该Znode做了修改,这种情况下,Znode第二次的变化客户端是无法收到通知的。这可能是由于网络延迟或者是其他因素导致,所以我们使用Zookeeper不能期望能够监控到节点每次的变化。Zookeeper只能保证最终的一致性,而无法保证强一致性。

设置watch的数据类容

Znode改变有很多种方式,例如:节点创建,节点删除,节点改变,子节点改变等等。Zookeeper维护了两个Watch列表,一个节点数据Watch列表,另一个是子节点Watch列表getData()和exists()设置数据Watch,getChildren()设置子节点Watch。两者选其一,可以让我们根据不同的返回结果选择不同的Watch方式,getData()和exists()返回节点的内容,getChildren()返回子节点列表。因此,setData()触发内容Watch,create()触发当前节点的内容Watch或者是其父节点的子节点Watch。delete()同时触发父节点的子节点Watch和内容Watch,以及子节点的内容Watch。

zk watcher的运行机制

流程图

http://img.cana.space/picStore/20201108001724.png

  1. Watch是轻量级的,其实就是本地JVM的Callback,服务器端只是存了是否有设置了Watcher的布尔类型。(源码见:org.apache.zookeeper.server.FinalRequestProcessor)
  2. 在服务端,在FinalRequestProcessor处理对应的Znode操作时,会根据客户端传递的watcher变量,添加到对应的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中进行持久化存储,同时将自己NIOServerCnxn做为一个Watcher callback,监听服务端事件变化
  3. Leader通过投票通过了某次Znode变化的请求后,然后通知对应的Follower,Follower根据自己内存中的zkDataBase信息,发送notification信息给zookeeper客户端。
  4. Zookeeper客户端接收到notification信息后,找到对应变化path的watcher列表,挨个进行触发回调。

所谓的回调

http://img.cana.space/picStore/20201108002100.png

由图看出,zk的watcher由客户端,客户端WatchManager,zk服务器组成。整个过程涉及了消息通信及数据存储。

  • zk客户端向zk服务器注册watcher的同时,会将watcher对象存储在客户端的watchManager。
  • Zk服务器触发watcher事件后,会向客户端发送通知,客户端线程从watchManager中回调watcher执行相应的功能。

ZK事件

Zookeeper状态/事件对应关系图:

http://img.cana.space/picStore/20201108001817.png

ZooKeeper中Watch事件(见:org.apache.zookeeper.Watcher.EventType):

  • None 在客户端与Zookeeper集群中的服务器断开连接的时候,客户端会收到这个事件。
  • NodeCreated Znode创建事件
  • NodeDeleted Znode删除事件
  • NodeDataChanged Znode数据内容更新事件。其实本质上该事件只关注dataVersion版本号,但是只要调用了更新接口dataVersion就会有变更。
  • NodeChildrenChanged Znode子节点改变事件,只关注子节点的个数变更,子节点内容有变更是不会通知的。

在事件发生时,ZooKeeper的状态(见:org.apache.zookeeper.Watcher.KeeperState):

  • Disconnected(0) 客户端处于断开连接的状态,并且没有和Zookeeper集群中任何服务器连接。
  • SyncConnected(3) 客户端处于连接的状态,也就是说客户端连接到了一台server
  • AuthFailed(4) 验证失败的状态
  • ConnectedReadOnly(5) 客户端连接到一个只读Server的状态。
  • SaslAuthenticated(6) 用于通知客户端他们是SASL认证,以至于他们能够SASL认证的权限通过操作Zookeeper。
  • Expired(-112) 会话超时状态

code

watchone 一次性触发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.eh.zkclient;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.TimeUnit;

@Slf4j
public class Demo2 {

    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    private static final String PATH = "/eh";

    @Getter
    @Setter
    private ZooKeeper zk;

    // 获取连接
    @SneakyThrows
    public void startZK() {
        this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
        }));
    }

    // 获取节点信息
    @SneakyThrows
    public String getZnode(String nodePath) {
        byte[] data = zk.getData(nodePath, false, new Stat());
        return new String(data);
    }

    // 事件回调
    private void triggleEvent(WatchedEvent event) {
        String retVal = getZnode(PATH);
        log.info("=======>triggleEvent:{}", retVal);
    }


    /**
     * 一次性watch
     * 监控/eh节点,获得初次值后设置watch,只要发生新的变化就打印出最新的值
     */
    @SneakyThrows
    public void test() {
        startZK();
        if (zk.exists(PATH, false) != null) {
            // getData 注册watcher
            byte[] data = zk.getData(
                    PATH,
                    this::triggleEvent,
                    new Stat()
            );
            log.info("获取初始值:{}", new String(data));
        }
    }

    @SneakyThrows
    public static void main(String[] args) {
        Demo2 demo = new Demo2();
        demo.test();
        TimeUnit.DAYS.sleep(1);
    }
}
  1. 获取初始值后

  2. 使用zkCli模拟其他客户端更新节点值

    1
    
    [zk: localhost:2181(CONNECTED) 14] set /eh helloZK_v3
    
  3. 可以看到triggleEvent获取到新值

  4. 再更新值则获取不到新值了

watchmore 多次触发,长效机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.eh.zkclient;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.TimeUnit;

@Slf4j
public class Demo3 {

    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    private static final String PATH = "/eh";

    @Getter
    @Setter
    private ZooKeeper zk;

    // 获取连接
    @SneakyThrows
    public void startZK() {
        this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
        }));
    }

    private final Watcher watcher = event -> triggleEvent(event);


    // 事件回调, 重新注册相同的watcher
    @SneakyThrows
    private void triggleEvent(WatchedEvent event) {
        byte[] data = zk.getData(PATH, watcher, new Stat());
        String retVal = new String(data);
        log.info("=======>triggleEvent:{}", retVal);
    }

    @SneakyThrows
    public void test() {
        startZK();
        if (zk.exists(PATH, false) != null) {
            // getData 注册watcher
            byte[] data = zk.getData(
                    PATH,
                    this::triggleEvent,
                    new Stat()
            );
            log.info("获取初始值:{}", new String(data));
        }
    }

    @SneakyThrows
    public static void main(String[] args) {
        Demo3 demo = new Demo3();
        demo.test();
        TimeUnit.DAYS.sleep(1);
    }
}

触发时重新设置相同的watcher

1
2
3
4
2020-11-08 01:19:57 INFO  [main] com.eh.zkclient.Demo3 - 获取初始值helloZK_v5
2020-11-08 01:19:57 INFO  [main-EventThread] com.eh.zkclient.Demo3 - =======>triggleEvent:helloZK_v6
2020-11-08 01:19:57 INFO  [main-EventThread] com.eh.zkclient.Demo3 - =======>triggleEvent:helloZK_v7
2020-11-08 01:19:57 INFO  [main-EventThread] com.eh.zkclient.Demo3 - =======>triggleEvent:helloZK_v8

watchchild

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.eh.zkclient;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Demo3 {

    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    private static final String PATH = "/eh";

    @Getter
    @Setter
    private ZooKeeper zk;

    // 获取连接
    @SneakyThrows
    public void startZK() {
        this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
        }));
    }

    private final Watcher watcher = event -> triggleEvent(event);


    // 事件回调, 重新注册相同的watcher
    @SneakyThrows
    private void triggleEvent(WatchedEvent event) {
        List<String> children = zk.getChildren(PATH, watcher);
        log.info("获取所有的子节点:{}", children);
    }


    /**
     * 监控子节点变化(增删)情况
     */
    @SneakyThrows
    public void test() {
        startZK();
        if (zk.exists(PATH, false) != null) {
            // getData 注册watcher
            List<String> children = zk.getChildren(
                    PATH,
                    this::triggleEvent
            );
            log.info("获取所有的子节点:{}", children);
        }
    }

    @SneakyThrows
    public static void main(String[] args) {
        Demo3 demo = new Demo3();
        demo.test();
        TimeUnit.DAYS.sleep(1);
    }
}

zkClient

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 25] ls /eh
[]
[zk: localhost:2181(CONNECTED) 26] create /eh/eh-1 helloZK_child_v1
Created /eh/eh-1
[zk: localhost:2181(CONNECTED) 27] create /eh/eh-2 helloZK_child_v2
Created /eh/eh-2
[zk: localhost:2181(CONNECTED) 28] create /eh/eh-3 helloZK_child_v3
Created /eh/eh-3

可以看到控制台日志打印:

1
2
3
4
2020-11-08 01:32:27 INFO  [main] com.eh.zkclient.Demo3 - 获取所有的子节点[]
2020-11-08 01:32:27 INFO  [main-EventThread] com.eh.zkclient.Demo3 - 获取所有的子节点[eh-1]
2020-11-08 01:32:27 INFO  [main-EventThread] com.eh.zkclient.Demo3 - 获取所有的子节点[eh-1, eh-2]
2020-11-08 01:32:27 INFO  [main-EventThread] com.eh.zkclient.Demo3 - 获取所有的子节点[eh-1, eh-3, eh-2]