目录

第一款Netty应用程序

本章主要内容

  • 设置开发环境
  • 编写 Echo 服务器和客户端
  • 构建并测试应用程序

在本章中, 我们将展示如何构建一个基于 Netty 的客户端和服务器。 应用程序很简单: 客户端将消息发送给服务器, 而服务器再将消息回送给客户端。 但是这个练习很重要, 原因 有两个。

首先,它会提供一个测试台,用于设置和验证你的开发工具和环境,如果你打算通过对本书 的示例代码的练习来为自己将来的开发工作做准备,那么它将是必不可少的。

其次, 你将获得关于 Netty 的一个关键方面的实践经验, 即在前一章中提到过的:通过 ChannelHandler 来构建应用程序的逻辑。这能让你对在第 3 章中开始的对 Netty API 的深入 学习做好准备。

设置开发环境

要编译和运行本书的示例,只需要 JDK 和 Apache Maven 这两样工具,它们都是可以免费下 载的。

我们将假设,你想要捣鼓示例代码,并且想很快就开始编写自己的代码。虽然你可以使用纯 文本编辑器,但是我们仍然强烈地建议你使用用于 Java 的集成开发环境(IDE)。

  1. 获取并安装 Java 开发工具包

  2. 下载并安装 IDE

  3. 下载和安装 Apache Maven

  4. 工程结构

    https://gitee.com/lienhui68/picStore/raw/master/null/20200903101556.png

  5. 依赖以及插件配置

    在netty/pom.xml中配置

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.9.Final</version>
            </dependency>
        </dependencies>
       
        <build>
            <!--可选插件,让子类自行决定是否使用-->
            <pluginManagement>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>exec-maven-plugin</artifactId>
                        <version>1.6.0</version>
                    </plugin>
                </plugins>
            </pluginManagement>
        </build>
    

    netty/chapter2/pom.xml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    <name>一个简单的Netty小程序</name>
       
     <!--Server和Client都会用到-->
        <properties>
            <echo-server.host>127.0.0.1</echo-server.host>
            <echo-server.port>9999</echo-server.port>
        </properties>
       
        <build>
            <plugins>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>run-server</id> <!--随便写-->
                            <goals>
                                <goal>java</goal>
                            </goals>
                        </execution>
                    </executions>
                  <!--父类自定义绑定,由子类定义具体配置-->
                </plugin>
            </plugins>
        </build>
    

    netty/chapter2/server/pom.xml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    <name>Echo Server</name>
       
        <build>
            <plugins>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <configuration>
                        <mainClass>com.eh.eden.netty.chapter2.EchoServer</mainClass>
                        <arguments>
                            <argument>${echo-server.port}</argument>
                        </arguments>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    netty/chapter2/client/pom.xml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    <name>Echo Client</name>
       
        <build>
            <plugins>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <configuration>
                        <mainClass>com.eh.eden.netty.chapter2.EchoClient</mainClass>
                        <arguments>
                            <argument>${echo-server.host}</argument>
                            <argument>${echo-server.port}</argument>
                        </arguments>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

Netty客户端/服务器概览

图 2-1 从高层次上展示了一个你将要编写的 Echo 客户端和服务器应用程序。虽然你的主要 关注点可能是编写基于 Web 的用于被浏览器访问的应用程序,但是通过同时实现客户端和服务 器,你一定能更加全面地理解 Netty 的 API。

https://gitee.com/lienhui68/picStore/raw/master/null/20200903124112.png

虽然我们已经谈及到了客户端,但是该图展示的是多个客户端同时连接到一台服务器。所能 够支持的客户端数量,在理论上,仅受限于系统的可用资源(以及所使用的 JDK 版本可能会施 加的限制)。

Echo 客户端和服务器之间的交互是非常简单的;在客户端建立一个连接之后,它会向服务 器发送一个或多个消息,反过来,服务器又会将每个消息回送给客户端。虽然它本身看起来好像 用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式。

我们将从考察服务器端代码开始这个项目。

编写Echo服务器

所有的 Netty 服务器都需要以下两部分。

  • 引导—这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的 端口上。
  • 至少一个 ChannelHandler —该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑

在本小节的剩下部分,我们将描述 Echo 服务器的业务逻辑以及引导代码。

Channel和业务逻辑

在第 1 章中,我们介绍了 Future 和回调,并且阐述了它们在事件驱动设计中的应用。我们 还讨论了 ChannelHandler ,它是一个接口族的父接口,它的实现负责接收并响应事件通知。 在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。

因为你的 Echo 服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口,用 来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。

我们感兴趣的方法是:

  • channelRead() ——对于每个传入的消息都要调用;

  • channelReadComplete() —— 通知 ChannelInboundHandler 最后一次对 channelRead() 的调用是当前批量读取中的最后一条消息;

    channel上面有数据到来时会触发channelRead事件,当数据到来时,eventLoop被唤醒继而调用channelRead方法处理数据。

    channelReadComplete事件参考 How does netty determine when a read is complete? 大致意思是eventLoop被到来的数据唤醒后read数据并包装成msg,然后将msg作为参数调用channelRead方法,期间做个判断,read到0个字节或者是read到的字节数小于buffer的容量,满足以上条件就会调用channelReadComplete方法。

    如果read到的字节数等于buffer(编码器里面设置的一条消息的大小)的容量则先执行channelRead再执行channelReadComplete

    如果实现了解码器,则有下述过程

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    
    class XDecoder extends ByteToMessageDecoder {
    
        static final int PACKET_SIZE = 10;
    
        // 用来临时保留没有处理过的请求报文
        ByteBuf tmpMsg = Unpooled.buffer();
    
        /**
         * @param ctx
         * @param in  请求的数据
         * @param out 将粘在一起的报文拆分后的结果放入out列表,交由后面的业务逻辑处理
         * @throws Exception
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());
    
            // 合并报文
            ByteBuf message;
            int tmpMsgSize = tmpMsg.readableBytes();
            if (tmpMsgSize > 0) {
                message = Unpooled.buffer();
                message.writeBytes(tmpMsg);
                message.writeBytes(in);
                System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
    
            } else {
                message = in;
            }
    
            int size = message.readableBytes();
            int counter = size / PACKET_SIZE;
            for (int i = 0; i < counter; i++) {
                byte[] request = new byte[PACKET_SIZE];
                // 每次从总的消息中读取220个字节的数据
                message.readBytes(request);
    
                // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
                out.add(Unpooled.copiedBuffer(request));
            }
    
            // 多余的报文存起来
            // 第一个报文: i+  暂存
            // 第二个报文: 1 与第一次
            size = message.readableBytes();
            if (size != 0) {
                System.out.println("多余的数据长度:" + size);
                // 剩下来的数据放到tempMsg暂存
                tmpMsg.clear();
                tmpMsg.writeBytes(message.readBytes(size));
            }
        }
    }
    
    class XHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            byte[] content = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(content);
            System.out.println(Thread.currentThread() + ": 最终打印\n" + new String(content));
            ((ByteBuf) msg).release();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    解码器将接受到的数据拆分成一个Object列表,Object相当于一条消息,有几条消息就会调几次channelRead,最后才执行channelReadComplete

    如果想要完整的数据被读出后被回调应该如何解决?

    解码器 + channelRead解码器用于把数据整合,同时在整合完成后调用 channelRead

    如果你使用的 HTTP 协议,Netty 已经帮你写好了一个解码器 HttpObjectAggregator,你只需要使用它然后就可以愉快的在 channelRead 写业务代码了。

    如果你使用的其它协议或者自定义协议,那么你可能需要自己写解码器。

  • exceptionCaught() —— 在读取操作期间,有异常抛出时会调用。

该 Echo 服务器的ChannelHandler 实现是 EchoServerHandler,如代码清单 2-1 所示。

代码清单 2-1 EchoServerHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.eh.eden.netty.demo1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/03
 */
@ChannelHandler.Sharable // 标示一个ChannelHandler 可以被多 个 Channel 安全地 共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将消息记录到控制台
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        // 将接收到的消息 写给发送者, 而不冲刷出站消息
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // Unpooled.EMPTY_BUFFER capacity=0,所以仅相当于flush
        // 将未决消息冲刷到 远程节点, 并且关 闭该 Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace(); // 打印异常 栈跟踪
        ctx.close(); // 关闭该Channel
    }
}

ChannelInboundHandlerAdapter 有一个直观的 API,并且它的每个方法都可以被重写以 挂钩到事件生命周期的恰当点上。因为需要处理所有接收到的数据,所以你重写了 channelRead() 方法。在这个服务器应用程序中,你将数据简单地回送给了远程节点。

重写 exceptionCaught() 方法允许你对 Throwable 的任何子类型做出反应,在这里你 记录了异常并关闭了连接。虽然一个更加完善的应用程序也许会尝试从异常中恢复,但在这个场 景下,只是通过简单地关闭连接来通知远程节点发生了错误。

如果不捕获异常,会发生什么呢

每个 Channel 都拥有一个与之相关联的 ChannelPipeline ,其持有一个 ChannelHandler 的 实例链。在默认的情况下, ChannelHandler 会把对它的方法的调用转发给链中的下一个 ChannelsHandler 。因此,如果 exceptionCaught() 方法没有被该链中的某处实现,那么所接收的异常将会被 传递到 ChannelPipeline 的尾端并被记录。 为此, 你的应用程序应该提供至少有一个实现了 exceptionCaught() 方法的 ChannelHandler 。(6.4 节详细地讨论了异常处理)。

除了 ChannelInboundHandlerAdapter 之外,还有很多需要学习的 ChannelHandler 的 子类型和实现,我们将在第 6 章和第 7 章中对它们进行详细的阐述。目前,请记住下面这些关键点:

  • 针对不同类型的事件来调用 ChannelHandler
  • 应用程序通过实现或者扩展 ChannelHandler 来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑;
  • 在架构上, ChannelHandler 有助于保持业务逻辑与网络处理代码的分离。这简化了开发过程,因为代码必须不断地演化以响应不断变化的需求。

引导服务器

在讨论过由 EchoServerHandler 实现的核心业务逻辑之后,我们现在可以探讨引导服务器本身的过程了,具体涉及以下内容:

  • 绑定到服务器将在其上监听并接受传入连接请求的端口;
  • 配置Channel,以将有关的入站消息通知给 EchoServerHandler 实例。

传输

在这一节中,你将遇到术语传输。在网络协议的标准多层视图中,传输层提供了端到端的或者主机 到主机的通信服务。 因特网通信是建立在 TCP 传输之上的。除了一些由 Java NIO 实现提供的服务器端性能增强之外, NIO 传输大多数时候指的就是 TCP 传输。 我们将在第 4 章对传输进行详细的讨论。

代码清单 2-2 展示了 EchoServer 类的完整代码。

代码清单 2-2 EchoServer 类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.eh.eden.netty.demo1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/03
 */
public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        // 参数校验
        if (args.length != 1) {
            System.err.println("Usage: " + EchoServer.class.getSimpleName() + "<port>");
        }

        // 设置端口值(如果端口参数 的格式不正确,则抛出一个 NumberFormatException)
        int port = Integer.parseInt(args[0]);
        // 调用服务器 的 start()方法
        new EchoServer(port).start();
    }

    private void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        // 1. 创建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 2. 创建ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    // 3. 指定所使用的 NIO 传输 Channel
                    .channel(NioServerSocketChannel.class)
                    // 4. 使用指定的 端口设置套 接字地址
                    .localAddress(new InetSocketAddress(port))
                    // 5. 添加一个 EchoServerHandler 到子 Channel 的 ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // EchoServerHandler 被标注为@Shareable,所 以我们可以总是使用 同样的实例
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            // 异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成
            ChannelFuture f = b.bind().sync();
            // 获取 Channel 的 CloseFuture, 并且阻塞当前线程直到它完成
            f.channel().closeFuture().sync();
        } finally {
            // 关闭 EventLoopGroup, 释放所有的资源
            group.shutdownGracefully().sync();
        }
    }
}

这里对于所有的客户端连接来说,都会使用同一个 EchoServerHandler,因为其被标注为@Sharable, 这将在后面的章节中讲到。——译者注

//2处,你创建了一个 ServerBootstrap 实例。因为你正在使用的是 NIO 传输,所以你指定 了 NioEventLoopGroup 来接受和处理新的连接,并且将 Channel 的类型指定为 NioServerSocketChannel 。 在此之后, 你将本地地址设置为一个具有选定端口的 InetSocketAddress 。服务器将绑定到这个地址以监听新的连接请求。

//5处,你使用了一个特殊的类—— ChannelInitializer 。这是关键。当一个新的连接 被接受时,一个新的子 Channel 将会被创建, 而 ChannelInitializer 将会把一个你的 EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。正如我们之前所 解释的,这个 ChannelHandler 将会收到有关入站消息的通知。

ChannelPipeline

https://gitee.com/lienhui68/picStore/raw/master/null/20200903132914.png

ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据。它提供了一种高级的截取过滤模式(类似serverlet中的filter功能),让用户可以在ChannelPipeline中完全控制一个事件以及如何处理ChannelHandler与ChannelPipeline的交互。

对于每个新的通道Channel,都会创建一个新的ChannelPipeline,并将器pipeline附加到channel中。

下图描述ChannelHandler与pipeline中的关系,一个io操作可以由一个ChannelInboundHandler或ChannelOutboundHandle进行处理,并通过调用ChannelInboundHandler 处理入站io或通过ChannelOutboundHandler处理出站IO。

https://gitee.com/lienhui68/picStore/raw/master/null/20200903133117.png

常用方法

1
2
3
4
5
6
addFirst(...)   //添加ChannelHandler在ChannelPipeline的第一个位置
addBefore(...)   //在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler
addAfter(...)   //在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler
addLast(...)   //在ChannelPipeline的末尾添加ChannelHandler
remove(...)   //删除ChannelPipeline中指定的ChannelHandler
replace(...)   //替换ChannelPipeline中指定的ChannelHandler

虽然 NIO 是可伸缩的,但是其适当的尤其是关于多线程处理的配置并不简单。Netty 的设计封装了大部分的复杂性,而且我们将在第 3 章中对相关的抽象( EventLoopGroup 、 SocketChannel 和 ChannelInitializer )进行详细的讨论。

接下来你绑定了服务器 ,并等待绑定完成。(对 sync() 方法的调用将导致当前 Thread 阻塞,一直到绑定操作完成为止)。在 处,该应用程序将会阻塞等待直到服务器的 Channel 关闭(因为你在 Channel 的 CloseFuture 上调用了 sync() 方法)。然后,你将可以关闭 EventLoopGroup ,并释放所有的资源,包括所有被创建的线程 。

这个示例使用了 NIO,因为得益于它的可扩展性和彻底的异步性,它是目前使用最广泛的传 输。但是也可以使用一个不同的传输实现。如果你想要在自己的服务器中使用 OIO 传输,将需 要指定 OioServerSocketChannel 和 OioEventLoopGroup 。我们将在第 4 章中对传输进行 更加详细的探讨。

BIO和OIO:

相同点:

  • 都是阻塞传输

区别:

  • BIO 读流,OIO读缓冲区

  • BIO 一个是连接一个线程,OIO多个连接一个线程

  • 阻塞传输的实现方式不一样,BIO线程挂起的方式,OIO是使用SOCKET的SO_TIMEOUT标志响应超时中断然后不断循环的方式实现阻塞。

    OIO这种实现方式是为了和NIO共用一套异步传输的API。

与此同时,让我们回顾一下你刚完成的服务器实现中的重要步骤。下面这些是服务器的主要 代码组件:

  • EchoServerHandler 实现了业务逻辑;
  • main() 方法引导了服务器;

引导过程中所需要的步骤如下:

  • 创建一个 ServerBootstrap 的实例以引导和绑定服务器
  • 创建并分配一个NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;
  • 指定服务器绑定的本地的 InetSocketAddress;
  • 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
  • 调用 ServerBootstrap.bind() 方法以绑定服务器。

在这个时候,服务器已经初始化,并且已经就绪能被使用了。在下一节中,我们将探讨对应 的客户端应用程序的代码。

编写Echo客户端

Echo 客户端将会:

  1. 连接到服务器;
  2. 发送一个或者多个消息;
  3. 对于每个消息,等待并接收从服务器发回的相同的消息;
  4. 关闭连接。

编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和你在服务器中看到的一样。

通过 ChannelHandler 实现客户端逻辑

如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler 。在这个场景 下,你将扩展 SimpleChannelInboundHandler 类以处理所有必须的任务,如代码清单 2-3 所示。这要求重写下面的方法:

  • channelActive() 在到服务器的连接已经建立之后将被调用;
  • channelRead0() 当从服务器接收到一条消息时被调用; // 未被实现的抽象方法,继承后必须得实现
  • exceptionCaught() 在处理过程中引发异常时被调用

代码清单 2-3 客户端的 ChannelHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.eh.eden.netty.demo1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/03
 */
@ChannelHandler.Sharable // 标记该类的实例可以被 多个 Channel 共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 当被通知 Channel 是活跃的时候,发送一条消息
        // copiedBuffer 缓冲区的长度就是字符串的长度
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        // 记录已接收 消息的转储
        System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 在发生异常时, 记录错误并关闭 Channel
        cause.printStackTrace();
        ctx.close();
    }
}

首先,你重写了 channelActive() 方法,其将在一个连接建立时被调用。这确保了数据 将会被尽可能快地写入服务器,其在这个场景下是一个编码了字符串 “Netty rocks!” 的字节缓冲区。

接下来,你重写了 channelRead0() 方法。每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被一次性接收。即使是对于这么少量的数据, channelRead0() 方法也可能会被调用两次,第一次使用一个持有 3 字节的 ByteBuf (Netty 的字节容器),第二次使用一个 持有 2 字节的 ByteBuf 。作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序被接收。

重写的第三个方法是 exceptionCaught() 。如同在 EchoServerHandler (见代码清 单 2-2)中所示,记录 Throwable ,关闭 Channel ,在这个场景下,终止到服务器的连接。

SimpleChannelInboundHandler 与 ChannelInboundHandler的区别

一般用netty来发送和接收数据都会继承SimpleChannelInboundHandler和ChannelInboundHandlerAdapter这两个抽象类,那么这两个到底有什么区别呢?

其实用这两个抽象类是有讲究的,在客户端的业务Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter。

这和两个因素的相互作用有 关:业务逻辑如何处理消息以及 Netty 如何管理资源。

在客户端,当 channelRead0() 方法完成时,你已经有了传入消息,并且已经处理完它了。当该方 法返回时, SimpleChannelInboundHandler 负责释放指向保存该消息的 ByteBuf 的内存引用。

在 EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而 write() 操作是异步的,直到 EchoServerHandler channelRead() 方法返回后可能仍然没有完成(如代码清单 2-1 所示)。为此, 扩展了 ChannelInboundHandlerAdapter ,其在这个时间点上不会释放消息。

消息在 EchoServerHandler 的 channelReadComplete() 方法中,当 writeAndFlush() 方 法被调用时被释放(见代码清单 2-1)。

从源码上面看,当方法返回时,SimpleChannelInboundHandler会负责释放指向保存该消息的ByteBuf的内存引用。而ChannelInboundHandlerAdapter在其时间节点上不会释放消息,而是将消息传递给下一个ChannelHandler处理。

第 5 章和第 6 章将对消息的资源管理进行详细的介绍。

引导客户端

如同将在代码清单 2-4 中所看到的,引导客户端类似于引导服务器,不同的是,客户端是使 用主机和端口参数来连接远程地址,也就是这里的 Echo 服务器的地址,而不是绑定到一个一直 被监听的端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.eh.eden.netty.demo1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/03
 */
public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    private void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: " + EchoClient.class.getSimpleName() + "n<host> <port>");
            return;
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }

}

和之前一样,使用了 NIO 传输。注意,你可以在客户端和服务器上分别使用不同的传输。 例如,在服务器端使用 NIO 传输,而在客户端使用 OIO 传输。在第 4 章,我们将探讨影响你选 择适用于特定用例的特定传输的各种因素和场景。

让我们回顾一下这一节中所介绍的要点:

  • 为初始化客户端,创建了一个 Bootstrap 实例;
  • 为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的 连接以及处理入站和出站数据;
  • 为服务器连接创建了一个 InetSocketAddress 实例;
  • 当连接被建立时, 一个 EchoClientHandler 实例会被安装到 该 Channel 的 ChannelPipeline 中;
  • 在一切都设置完成后,调用 Bootstrap.connect() 方法连接到远程节点;

完成了客户端,你便可以着手构建并测试该系统了。

构建和运行Echo服务器和客户端

在这一节中,我们将介绍编译和运行 Echo 服务器和客户端所需的所有步骤。

Echo 客户端/服务器的 Maven 工程

这本书的附录使用 Echo 客户端/服务器工程的配置,详细地解释了多模块 Maven 工程是如何组织的。 这部分内容对于构建和运行该应用程序来说并不是必读的,之所以推荐阅读这部分内容,是因为它能帮 助你更好地理解本书的示例以及 Netty 项目本身。

运行构建

要构建 Echo 客户端和服务器,请进入到代码示例根目录下的 chapter2 目录执行以下命令:

1
$ mvn clean package

这将产生非常类似于代码清单 2-5 所示的输出(我们已经编辑忽略了几个构建过程中的非必要 步骤)。

代码清单 2-5 构建 Echo 客户端和服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] 
[INFO] 一个简单的Netty小程序
[INFO] Echo Client
[INFO] Echo Server
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building 一个简单的Netty小程序 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ chapter2 ---
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building Echo Client 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ client ---
[INFO] Deleting /Users/david/my/study/project/eden/netty/chapter2/client/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ client ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ client ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /Users/david/my/study/project/eden/netty/chapter2/client/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ client ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/david/my/study/project/eden/netty/chapter2/client/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ client ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ client ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ client ---
[INFO] Building jar: /Users/david/my/study/project/eden/netty/chapter2/client/target/client-1.0-SNAPSHOT.jar
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building Echo Server 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ server ---
[INFO] Deleting /Users/david/my/study/project/eden/netty/chapter2/server/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ server ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ server ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /Users/david/my/study/project/eden/netty/chapter2/server/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ server ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/david/my/study/project/eden/netty/chapter2/server/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ server ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ server ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ server ---
[INFO] Building jar: /Users/david/my/study/project/eden/netty/chapter2/server/target/server-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] 一个简单的Netty小程序 ...................................... SUCCESS [  0.397 s]
[INFO] Echo Client ........................................ SUCCESS [  3.825 s]
[INFO] Echo Server ........................................ SUCCESS [  0.802 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.249 s
[INFO] Finished at: 2020-09-03T15:04:18+08:00
[INFO] Final Memory: 21M/163M
[INFO] ------------------------------------------------------------------------

下面是前面的构建日志中记录的主要步骤:

  • Maven 确定了构建顺序:首先是父 pom.xml,然后是各个模块(子工程);
  • 如果在用户的本地存储库中没有找到 Netty 构件,Maven 将从公共的 Maven 存储库中下 载它们(此处未显示);
  • 运行了构建生命周期中的 clean 和 compile 阶段;
  • 最后执行了 maven-jar-plugin。

Maven Reactor 的摘要显示所有的项目都已经被成功地构建。两个子工程的目标目录的文件列表现在应该类似于代码清单 2-6。

代码清单 2-6 构建的构件列表

Directory of nia\chapter2\Client\target

1
2
3
4
5
6
total 16
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 classes
-rw-r--r--  1 david  staff   5.4K Sep  3 14:29 client-1.0-SNAPSHOT.jar
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 generated-sources
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 maven-archiver
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 maven-status

Directory of nia\chapter2\Server\target

1
2
3
4
5
6
total 16
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 classes
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 generated-sources
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 maven-archiver
drwxr-xr-x  3 david  staff    96B Sep  3 14:29 maven-status
-rw-r--r--  1 david  staff   5.4K Sep  3 14:29 server-1.0-SNAPSHOT.jar

运行Echo服务器和客户端

要运行这些应用程序组件,可以直接使用 Java 命令。但是在 POM 文件中,已经为你配置好了exec-maven-plugin 来做这个。

并排打开两个控制台窗口,一个进到 chapter2\Server 目录中,另外一个进到 chapter2\Client 目录中。

在服务器的控制台中执行这个命令:

1
$ mvn exec:java

应该会看到类似于下面的内容:

服务器现在已经启动并准备好接受连接。现在在客户端的控制台中执行同样的命令:

1
$ mvn exec:java

应该会看到下面的内容:

同时在服务器的控制台中,应该会看到这个:

1
Server received: Netty rocks!

每次运行客户端时,在服务器的控制台中你都能看到这条日志语句。

下面是发生的事:

  1. 一旦客户端建立连接,它就发送它的消息—— Netty rocks!
  2. 服务器报告接收到的消息,并将其回送给客户端;
  3. 客户端报告返回的消息并退出。

你所看到的都是预期的行为,现在让我们看看故障是如何被处理的。服务器应该还在运行, 所以在服务器的控制台中按下 Ctrl+C 来停止该进程。一旦它停止,就再次使用mvn命令启动客户端。

代码清单 2-7 展示了你应该会从客户端的控制台中看到的当它不能连接到服务器时的输出。

代码清单 2-7 Echo 客户端的异常处理

https://gitee.com/lienhui68/picStore/raw/master/null/20200903151417.png

发生了什么?客户端试图连接服务器,其预期运行在 localhost:9999 上。但是连接失败 了(和预期的一样),因为服务器在这之前就已经停止了,所以在客户端导致了一个 java.net. ConnectException 。这个异常触发了 EchoClientHandler 的 exceptionCaught() 方法, 打印出了栈跟踪并关闭了 Channel 。

小结

在本章中,你设置好了开发环境,并且构建和运行了你的第一款 Netty 客户端和服务器。虽 然这只是一个简单的应用程序,但是它可以伸缩到支持数千个并发连接——每秒可以比普通的基 于套接字的 Java 应用程序处理多得多的消息。

在接下来的几章中,你将看到更多关于 Netty 如何简化可伸缩性和并发性的例子。我们也将 更加深入地了解 Netty 对于关注点分离的架构原则的支持。通过提供正确的抽象来解耦业务逻辑 和网络编程逻辑,Netty 使得可以很容易地跟上快速演化的需求,而又不危及系统的稳定性。

在下一章中,我们将提供对 Netty 体系架构的概述。这将为你在后续的章节中对 Netty 的内 部进行深入而全面的学习提供上下文。