目录

zk基础命令和客户端操作

zkclient常用命令操作

和redis的KV键值对类似,只不过key变成了一个路径节点的值,v就是data

zk表为一个分层的文件系统目录树结构,不同于文件系统之处在于,zk节点可以有自己的数据,而unix文件系统中的目录节点只有子节点

http://img.cana.space/picStore/20201107210025.png

一个节点对应一个应用,节点存储的数据就是应用需要的配置信息。

  • ls

  • ls2

    查看节点数据并且能看到stat结构体信息

  • stat

  • set

  • get

    get -s /path, 表示带stat信息

  • create

  • delete

    删除节点且节点不能有子节点

  • rmr,新版本叫deleteall

    递归删除一锅端

四字命令

zookeeper支持某些特定的四字命令,它们大多数是用来查询zk服务当前状态及相关信息的,通过telnet或nc向zookeeper提交相应命令,如:echo ruok | nc 127.0.0.1 2181

运行公式: echo 四字命令 | nc 主机IP zookeeper端口

常用四字命令

http://img.cana.space/picStore/20201107211234.png

Java客户端操作

Maven工程和配置pom

参考文章:Zookeeper客户端Curator使用详解

ZooKeeper Releases

由于zkclient最新版0.11依赖的zk版本是3.4.13,为了防止兼容性问题,本篇测试也使用3.4.13版本

http://img.cana.space/picStore/20201107213820.png

在实际使用中建议切换成更高版本的zk以及使用curator框架。

maven依赖:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

日志

http://img.cana.space/picStore/20201107215015.png

默认使用log4j,这里我们使用logback,将原有的日志jar过滤,引入logback和log4j-over-slf4j,完整的pom如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.eh</groupId>
    <artifactId>zkclient-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
        <!--引入zkclient, exclude原有的log4j包-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.11</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--引入logback-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.7</version>
        </dependency>
        <!--过滤掉原始log包后需要增加桥接包-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.25</version>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
    </dependencies>

</project>

logback.xml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<configuration scanPeriod="60 seconds" debug="false">

    <timestamp key="dt" datePattern="yyyy-MM-dd HH:mm:ss"/>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--高亮-->
            <pattern>${dt} %highlight(%-5level) %boldYellow([%thread]) %cyan(%logger{50}) - %msg%n</pattern>
        </encoder>
        <withJansi>true</withJansi>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.eh.zkclient;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

@Slf4j
public class Demo {

    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    private static final String PATH = "/eh";

    @Getter
    @Setter
    private ZooKeeper zk;

    // 获取连接
    @SneakyThrows
    public void startZK() {
        this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
        }));
    }

    // 释放连接
    @SneakyThrows
    public void stopZK() {
        if (zk != null) {
            zk.close();
        }
    }

    // 创建节点
    @SneakyThrows
    public void createZnode(String nodePath, String nodeValue) {
        zk.create(nodePath, nodeValue.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    // 获取节点信息
    @SneakyThrows
    public String getZnode(String nodePath) {
        byte[] data = zk.getData(nodePath, false, new Stat());
        return new String(data);
    }

    @SneakyThrows
    private void test1() {
        // 获取连接
        startZK();
        // 判断是否存在节点
        if (zk.exists(PATH, false) == null) {
            // 创建节点
            createZnode(PATH, "helloZK_v1");
            // 获取节点
            String retValue = getZnode(PATH);
            log.info("=========>retValue:{}", retValue);
        } else {
            log.info("=========>路径:{}已存在节点", PATH);
        }

        // 关闭节点
        stopZK();
    }

    @SneakyThrows
    public static void main(String[] args) {
        Demo demo = new Demo();
        demo.test1();
    }
}

main方法输出:

1
2
3
4
5
6
7
8
...
2020-11-07 23:32:56 INFO  [main] org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2181 sessionTimeout=50000 watcher=com.eh.zkclient.Demo$$Lambda$1/2114889273@6fadae5d
2020-11-07 23:32:56 INFO  [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-11-07 23:32:56 INFO  [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
2020-11-07 23:32:56 INFO  [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100063d1f390007, negotiated timeout = 40000
2020-11-07 23:32:56 INFO  [main] com.eh.zkclient.Demo - =========>retValue:helloZK_v1
2020-11-07 23:32:56 INFO  [main] org.apache.zookeeper.ZooKeeper - Session: 0x100063d1f390007 closed
2020-11-07 23:32:56 INFO  [main-EventThread] org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x100063d1f390007

代码整理

创建session时就传入watcher,之后getData,getChildren,exists时传入true就可以重新注册之前的watcher了。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.eh.zkclient;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Demo3 {

    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    private static final String PATH = "/eh";

    @Getter
    @Setter
    private ZooKeeper zk;

    // 获取连接
    @SneakyThrows
    public void startZK() {
        this.setZk(new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
            // 只对PATH节点感兴趣
            if (!PATH.equals(event.getPath())) {
                return;
            }
            //
            switch (event.getType()) {
                // Znode创建事件
                case NodeCreated:
                    triggleNodeCreated(event);
                    break;
                // Znode删除事件
                case NodeDeleted:
                    triggleNodeDeleted(event);
                    break;
                //  Znode数据内容更新事件。其实本质上该事件只关注dataVersion版本号,
                //  只要调用了更新接口dataVersion就会有变更。
                case NodeDataChanged:
                    triggleNodeDataChanged(event);
                    break;
                //  Znode子节点改变事件,只关注子节点的个数变更,子节点内容有变更是不会通知的。
                case NodeChildrenChanged:
                    triggleNodeChildrenChanged(event);
                    break;
                // 在客户端与Zookeeper集群中的服务器断开连接的时候,客户端会收到这个事件。
                case None:
                    triggleNone();

            }
        }));
    }

    @SneakyThrows
    public void triggleNodeCreated(WatchedEvent event) {
        log.info("========>triggleNodeCreated,节点{}已创建", event.getPath());
    }

    @SneakyThrows
    public void triggleNodeDeleted(WatchedEvent event) {
        log.info("========>triggleNodeCreated,节点{}已删除", event.getPath());
    }

    @SneakyThrows
    public void triggleNodeDataChanged(WatchedEvent event) {
        byte[] data = zk.getData(event.getPath(), true, new Stat());
        log.info("========>triggleNodeDataChanged,更新后的值:{}", new String(data));
    }

    @SneakyThrows
    public void triggleNodeChildrenChanged(WatchedEvent event) {
        List<String> children = zk.getChildren(event.getPath(), true);
        log.info("========>triggleNodeChildrenChanged,子节点列表:{}", children);
    }

    @SneakyThrows
    public void triggleNone() {
        log.info("========>triggleNone, 会话已关闭");
    }


    /**
     * 监控子节点变化(增删)情况
     */
    @SneakyThrows
    public void test() {
        log.info("=======> 测试开始");

        startZK();
        if (zk.exists(PATH, true) == null) {
            log.info("==========>test:节点{}尚未创建", PATH);
        }
        // 测试步骤
        // 1. zkClient创建/eh节点,,查看控制台输出
        // 2. 获取节点值,注册watcher
        while (zk.exists(PATH, false) == null) {
            TimeUnit.SECONDS.sleep(1);
        }

        // 创建一个线程监听数据变化情况
        new Thread(
                () -> {
                    try {
                        log.info("=====>获取节点值:{},并重新注册watcher", new String(zk.getData(PATH, true, new Stat())));
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        ).start();

        // 创建一个线程监听子节点变化情况
        new Thread(
                () -> {
                    try {
                        log.info("=====>获取子节点列表:{},并重新注册watcher", zk.getChildren(PATH, true));
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        ).start();

        TimeUnit.DAYS.sleep(1);
        log.info("=======> 测试结束");

        // 3. zkClient更新节点值,查看控制台输出
        // 4. zkClient创建子节点,查看控制台输出
        // 5. zkClient删除子节点,查看控制台输出
    }

    @SneakyThrows
    public static void main(String[] args) {
        Demo3 demo = new Demo3();
        demo.test();
    }
}

zkClient操作步骤:

1
2
3
4
create /eh helloZK_v1
set /eh helloZK_v2
create /eh/eh-1 helloZK_child_v1
delete /eh/eh-1

控制台依次打印:

http://img.cana.space/picStore/20201108024504.png