目录

传输

本章主要内容

  • OIO——阻塞传输
  • NIO——异步传输
  • Local——JVM 内部的异步通信
  • Embedded——测试你的 ChannelHandler

流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的 网络传输—一个帮助我们抽象底层数据传输机制的概念。用户并不关心这些细节;他们只想确 保他们的字节被可靠地发送和接收。

如果你有 Java 网络编程的经验,那么你可能已经发现,在某些时候,你需要支撑比预期多 很多的并发连接。如果你随后尝试从阻塞传输切换到非阻塞传输,那么你可能会因为这两种网络 API 的截然不同而遇到问题。

然而,Netty 为它所有的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所能够达到的简单得多。所产生的代码不会被实现的细节所污染,而你也不需要在你的整个代码 库上进行广泛的重构。简而言之,你可以将时间花在其他更有成效的事情上。

在本章中,我们将学习这个通用 API,并通过和 JDK 的对比来证明它极其简单易用。我们 将阐述 Netty 自带的不同传输实现,以及它们各自适用的场景。有了这些信息,你会发现选择最 适合于你的应用程序的选项将是直截了当的。

本章的唯一前提是 Java 编程语言的相关知识。有网络框架或者网络编程相关的经验更好, 但不是必需的。

我们先来看一看传输在现实世界中是如何工作的。

案例研究:传输迁移

我们将从一个应用程序开始我们对传输的学习,这个应用程序只简单地接受连接,向客户端 写“Hi!”,然后关闭连接。

不通过Netty使用OIO和NIO

我们将介绍仅使用了 JDK API 的应用程序的阻塞(OIO)版本和异步(NIO)版本。代码清 单 4-1 展示了其阻塞版本的实现。如果你曾享受过使用 JDK 进行网络编程的乐趣,那么这段代码 将唤起你美好的回忆。

代码清单 4-1 未使用 Netty 的阻塞网络编程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.eh.eden.pattern;


import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;

class PlainOioServer {

    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port); // 将服务器绑定到指定端口

        try {
            for (; ; ) {
                final Socket clientSocket = socket.accept(); // 接受连接
                System.out.println("Accepted connection from " + clientSocket);
                new Thread(() -> { // 创建一个新的线程来处理该连接
                    try (
                            OutputStream out = clientSocket.getOutputStream()
                    ) {
                        out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); // 将消息写给已连接的客户端
                        out.flush();
                        clientSocket.close(); // 关闭连接
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }).start(); // 启动线程
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws IOException {
        new PlainOioServer().serve(7777);
    }
}
// 运行结果
$ nc localhost 7777
Hi!

这段代码完全可以处理中等数量的并发客户端。但是随着应用程序变得流行起来,你会发现 它并不能很好地伸缩到支撑成千上万的并发连入连接。你决定改用异步网络编程,但是很快就发 现异步 API 是完全不同的,以至于现在你不得不重写你的应用程序。

其非阻塞版本如代码清单 4-2 所示。

代码清单 4-2 未使用 Netty 的异步网络编程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.eh.eden.pattern;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class PlainNioServer {

    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        InetSocketAddress address = new InetSocketAddress(port);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.bind(address); // 将服务器绑定到选定的端口

        Selector selector = Selector.open(); // 打开 Selector 来处理 Channel
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 将 ServerSocket 注册到 Selector以接受连接

        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (; ; ) {
            try {
                selector.select(); // 等待需要处理的新事件;阻塞将一直持续到下一个传入事件
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }

            Set<SelectionKey> readyKeys = selector.selectedKeys(); // 获取所有接收事件的 SelectionKey 实例
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) { // 检查事件是否是一个新的已经就绪可以被接受的连接
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false); // 设置成非阻塞连接
                        // 接受客户端, 并将它注册到选择器
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }

                    if (key.isWritable()) { // 检查套接字 是否已经准备好写数据
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) { // 将数据写到已连接的客户端
                                break;
                            }
                        }
                        client.close(); // 关闭连接
                    }
                } catch (IOException ex) {
                    key.cancel();

                    try {

                        key.channel().close();
                    } catch (IOException cex) {
                        // ignore on close
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new PlainNioServer().serve(7777);
    }
}
// 运行结果
$ nc localhost 7777
Hi!

如同你所看到的,虽然这段代码所做的事情与之前的版本完全相同,但是代码却截然不同。 如果为了用于非阻塞 I/O 而重新实现这个简单的应用程序,都需要一次完全的重写的话,那么不 难想象,移植真正复杂的应用程序需要付出什么样的努力。

通过Netty使用OIO和NIO

我们将先编写这个应用程序的另一个阻塞版本,这次我们将使用 Netty 框架,如代码清单 4-3 所示。

代码清单 4-3 使用 Netty 的阻塞网络处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.eh.eden.netty.chapter2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/05
 */
class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi! Netty Oio\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup(); // 为非阻塞模式使用 OioEventLoopGroup

        try {
            ServerBootstrap b = new ServerBootstrap();  // 创建引导类 ServerBootstrap

            b.group(group)
                    .channel(OioServerSocketChannel.class) // 使用 OioServerSocketChannel 以允许阻塞模式(旧的 I/O)
                    .localAddress(new InetSocketAddress(port))
                    // 指定 ChannelInitializer, 对于每个已接受的连接都调用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 添加一个 ChannelInboundHandlerAdapter 以拦截和处理事件
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                // 将消息写到客户端,并添加 ChannelFutureListener, 以便消息一被写完就关闭连接
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ch.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            // 绑定服务器以接受连接
            b.bind().sync().channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully(); // 释放所有的资源
        }

    }

    public static void main(String[] args) throws Exception {
        new NettyOioServer().server(7777);
    }
}
// 运行结果
$ nc localhost 7777
Hi! Netty Oio

非阻塞的Netty版本

代码清单 4-4 和代码清单 4-3 几乎一模一样,除了//1//2那两行。这就是从阻塞(OIO) 传输切换到非阻塞(NIO)传输需要做的所有变更。

代码清单 4-4 使用 Netty 的异步网络处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.eh.eden.netty.chapter2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/05
 */
class NettyNioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi! Netty Nio\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new NioEventLoopGroup(); // 1. 为非阻塞模式使用 NioEventLoopGroup

        try {
            ServerBootstrap b = new ServerBootstrap(); // 创建引导类 ServerBootstrap

            b.group(group)
                    .channel(NioServerSocketChannel.class) // 2. 使用 NioServerSocketChannel 以允许非阻塞模式(新的 I/O)
                    .localAddress(new InetSocketAddress(port))
                    // 指定 ChannelInitializer, 对于每个已接受的连接都调用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 添加一个 ChannelInboundHandlerAdapter 以拦截和处理事件
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                // 将消息写到客户端,并添加 ChannelFutureListener, 以便消息一被写完就关闭连接
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ch.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            // 绑定服务器以接受连接
            b.bind().sync().channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully(); // 释放所有的资源
        }

    }

    public static void main(String[] args) throws Exception {
        new NettyNioServer().server(7777);
    }
}
// 运行结果
$ nc localhost 7777
Hi! Netty Nio

因为 Netty 为每种传输的实现都暴露了相同的 API,所以无论选用哪一种传输的实现,你的 代码都仍然几乎不受影响。 在所有的情况下, 传输的实现都依赖于 interface Channel 、 ChannelPipeline 和 ChannelHandler 。

在看过一些使用基于 Netty 的传输的这些优点之后,让我们仔细看看传输 API 本身。

传输API

传输 API 的核心是 结构如图 4-1 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905115859.png

如图所示,每个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig 。 ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。由于特定的传输可能 具有独特的设置,所以它可能会实现一个 ChannelConfig 的子类型。(请参考 ChannelConfig 实现对应的 Javadoc。)

由于 Channel 是独一无二的, 所以为了保证顺序将 Channel 声明为 java.lang. Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那 么 AbstractChannel 中的 compareTo() 方法的实现将会抛出一个 Error 。

ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 例,这些 ChannelHandler 实现了应用程序用于处理状态变化以及数据处理的逻辑

ChannelHandler的典型用途包括:

  • 将数据从一种格式转换为另一种格式;
  • 提供异常的通知;
  • 提供Channel 变为活动的或者非活动的通知;
  • 提供当 Channel 注册到EventLoop或者从EventLoop注销时的通知;
  • 提供有关用户自定义事件的通知。

拦截过滤器

ChannelPipeline 实现了一种常见的设计模式—拦截过滤器(InterceptingFilter) 。 UNIX 管道是另外一个熟悉的例子:多个命令被链接在一起,其中一个命令的输出端将连 接到命令行中下一个命令的输入端。

通过利用Netty的这项能力可以构建出高度灵活的应用程序。例如,每当STARTTLS 协议被请 求时,你可以简单地通过向 ChannelPipeline 添加一个适当的ChannelHandler ( SslHandler )来按需地支持STARTTLS协议。

除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用Channel 的其他方法,其中最重要的列举在表 4-1 中。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905120616.png

稍后我们将进一步深入地讨论所有这些特性的应用。目前,请记住,Netty 所提供的广泛功 能只依赖于少量的接口这意味着,你可以对你的应用程序逻辑进行重大的修改,而又无需大规 模地重构你的代码库。

考虑一下写数据并将其冲刷到远程节点这样的常规任务。 代码清单 4-5 演示了使用 Channel.writeAndFlush() 来实现这一目的。

代码清单 4-5 写出到Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Channel channel = ...;
// 创建持有要写数据的 ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
ChannelFuture cf = channel.writeAndFlush(buf); // 写数据并冲刷它
cf.addListener(new ChannelFutureListener() { // 添加 ChannelFutureListener 以便在写操作完成后接收通知
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) { // 写操作完成,并且没有错误发生
            System.out.println("Write successful");
        } else {
            // 记录错误
            System.err.println("Write error");
            future.cause().printStackTrace();
        }
    }
});

Netty 的 Channel 实现是线程安全的,因此你可以存储一个到 Channel 的引用,并且每当 你需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它。代码清单 4-6 展示 了一个多线程写数据的简单例子。需要注意的是,消息将会被保证按顺序发送

代码清单 4-6 从多个线程使用同一个 Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
final Channel channel = ...;
// 创建持有要写数据的 ByteBuf
final ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8).retain();
// 创建将数据写到 Channel 的 Runnable
Runnable writer = () -> channel.writeAndFlush(buf.duplicate());
// 获取到线程池 Executor 的引用
Executor executor = Executors.newCachedThreadPool();
// write in one thread
executor.execute(writer); // 递交写任务给线程池以便 在某个线程中执行
// write in another thread
executor.execute(writer); // 递交另一个写任务以便 在另一个线程中执行

内置的传输

Netty 内置了一些可开箱即用的传输。因为并不是它们所有的传输都支持每一种协议,所以 你必须选择一个和你的应用程序所使用的协议相容的传输。在本节中我们将讨论这些关系。

表 4-2 显示了所有 Netty 提供的传输。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905121706.png

我们将在接下来的几节中详细讨论这些传输。

NIO 非阻塞I/O

NIO 提供了一个所有 I/O 操作的全异步的实现。它利用了自 NIO 子系统被引入 JDK 1.4 时便可用的基于选择器的 API。

选择器背后的基本概念是充当一个注册表,在那里你将可以请求在Channel 的状态发生变化时得到通知。可能的状态变化有:

  • 新的 Channel已被接受并且就绪;
  • Channel 连接已经完成;
  • Channel 有已经就绪的可供读取的数据;
  • Channel 可用于写数据。

选择器运行在一个检查状态变化并对其做出相应响应(调用pipeline的handler)的线程上,在应用程序对状态的改变做 出响应之后,选择器将会被重置(基于ET),并将重复这个过程。

表4-3 中的常量值代表了由 class java.nio.channels.SelectionKey 定义的位模式。这些位模式可以组合起来定义一组应用程序正在请求通知的状态变化集。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905122440.png

对于所有 Netty 的传输实现都共有的用户级别 API 完全地隐藏了这些 NIO 的内部细节。 图 4-2 展示了该处理流程。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905122622.png

零拷贝

零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速 高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,其在像 FTP 或者 HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。反过来说,传输已被 加密的文件则不是问题。

Epoll 用于Linux的本地非阻塞传输

正如我们之前所说的,Netty 的 NIO 传输基于 Java 提供的异步/非阻塞网络编程的通用抽象。 虽然这保证了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相应的限制,因为 JDK 为了在所有系统上提供相同的功能,必须做出妥协

Linux作为高性能网络编程的平台,其重要性与日俱增,这催生了大量先进特性的开发,其 中包括epoll——一个高度可扩展的I/O事件通知特性。这个API自Linux内核版本 2.5.44(2002)被 引入,提供了比旧的POSIX select 和 poll 系统调用 更好的性能,同时现在也是Linux上非阻塞网络编程的事实标准。Linux JDK NIO API使用了这些epoll调用。

Netty为Linux提供了一组NIO API,其以一种和它本身的设计更加一致的方式使用epoll,并 且以一种更加轻量的方式使用中断。 如果你的应用程序旨在运行于Linux系统,那么请考虑利用 这个版本的传输;你将发现在高负载下它的性能要优于JDK的NIO实现。

JDK 的实现是水平触发,而 Netty 的(默认的)是边沿触发。

这个传输的语义与在图 4-2 所示的完全相同,而且它的用法也是简单直接的。相关示例参照 代码清单 4-4。如果要在那个代码清单中使用 epoll 替代 NIO,只需要将 NioEventLoopGroup 替 换 为 EpollEventLoopGroup , 并 且 将 NioServerSocketChannel.class 替 换 为 EpollServerSocketChannel.class 即可。

OIO 旧的阻塞I/O

Netty 的 OIO 传输实现代表了一种折中:它可以通过常规的传输 API 使用, 但是由于它 是建立在 java.net 包的阻塞实现之上的,所以它不是异步的。但是,它仍然非常适合于某 些用途。

例如,你可能需要移植使用了一些进行阻塞调用的库(如JDBC )的遗留代码,而将逻辑转 换为非阻塞的可能也是不切实际的。相反,你可以在短期内使用Netty的OIO传输,然后再将你的 代码移植到纯粹的异步传输上。让我们来看一看怎么做。

在 java.net API 中,你通常会有一个用来接受到达正在监听的 ServerSocket 的新连 接的线程。会创建一个新的和远程节点进行交互的套接字,并且会分配一个新的用于处理相应通 信流量的线程。这是必需的,因为某个指定套接字上的任何 I/O 操作在任意的时间点上都可能会 阻塞。使用单个线程来处理多个套接字,很容易导致一个套接字上的阻塞操作也捆绑了所有其他 的套接字。

有了这个背景,你可能会想,Netty是如何能够使用和用于异步传输相同的API来支持OIO的呢。 答案就是,Netty利用了 SO_TIMEOUT 这个 Socket 标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个 SocketTimeout Exception 。Netty 将捕获这个异常并继续处理循环。在 EventLoop 下一次运行时,它将再次尝试。这实际上也是 类似于Netty这样的异步框架能够支持OIO的唯一方式 。图 4-3 说明了这个逻辑。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905124300.png

OIO 是建立在 java.net 包的阻塞实现之上的,但由于EventLoop异步API是非阻塞实现,使用循环,所以利用SO_TIMEOUT 这个 Socket 标志,捕获异常使得可以继续处理循环。

这种方式的一个问题是,当一个 SocketTimeoutException 被抛出时填充栈跟踪所需要的时间,其对于性能来说代价很大。

用于JVM内部通信的Local传输

Netty 提供了一个 Local 传输,用于在同一个 JVM 中运行的客户端和服务器程序之间的异步 通信。同样,这个传输也支持对于所有 Netty 传输实现都共同的 API。

在这个传输中,和服务器 Channel 相关联的 SocketAddress 并没有绑定物理网络地址; 相反,只要服务器还在运行,它就会被存储在注册表里,并在 Channel 关闭时注销。因为这个 传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,客户端希望连接到(在同一个 JVM 中)使用了这个传输的服务器端时也必须使用它。除了这个限制,它的使用方式和其他的传输一模一样。

LcoalChannel是Netty提供的用来在同一个JVM内部实现client和server之间通信的transport。它的实现主要是通过内存里的对象作为通信介质,不会像NIO下的channel,会占用一个文件描述符;因此使用它不会影响到你系统上的打开文件数,也就不会影响到你系统所能管理的连接数了。对于在同一个JVM内部使用netty的机制进行通信的话,还是很轻量级的。

Embeded传输

Netty 提供了一种额外的传输,使得你可以将一组 ChannelHandler 作为帮助器类嵌入到 其他的 ChannelHandler 内部。通过这种方式,你将可以扩展一个 ChannelHandler 的功能, 而又不需要修改其内部代码。

不足为奇的是,Embedded 传输的关键是一个被称为 EmbeddedChannel 的具体的 Channel 实现。在第 9 章中,我们将详细地讨论如何使用这个类来为 ChannelHandler 的实现创建单元 测试用例。

传输的用例(case)

既然我们已经详细地了解了所有的传输,那么让我们考虑一下选用一个适用于特定用途的协 议的因素吧。正如前面所提到的,并不是所有的传输都支持所有的核心协议,其可能会限制你的 选择。表 4-4 展示了截止出版时的传输和其所支持的协议。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905124738.png

在 Linux 上启用 SCTP

SCTP 需要内核的支持,并且需要安装用户库。 例如,对于 Ubuntu,可以使用下面的命令:

1
# sudo apt-get install libsctp1

对于 Fedora,可以使用 yum:

1
#sudo yum install kernel-modules-extra.x86_64 lksctp-tools.x86_64

有关如何启用 SCTP 的详细信息,请参考你的 Linux 发行版的文档。

虽然只有 SCTP 传输有这些特殊要求,但是其他传输可能也有它们自己的配置选项需要考虑。 此外,如果只是为了支持更高的并发连接数,服务器平台可能需要配置得和客户端不一样。

这里是一些你很可能会遇到的用例。

  • 非阻塞代码库

    如果你的代码库中没有阻塞调用(或者你能够限制它们的范围),那么 在 Linux 上使用 NIO 或者 epoll 始终是个好主意。虽然 NIO/epoll 旨在处理大量的并发连 接,但是在处理较小数目的并发连接时,它也能很好地工作,尤其是考虑到它在连接之间共享线程的方式。

  • 阻塞代码库

    如果你的代码库严重地依赖于阻塞 I/O,而且你的应 用程序也有一个相应的设计,那么在你尝试将其直接转换为 Netty 的 NIO 传输时,你将可 能会遇到和阻塞操作相关的问题。不要为此而重写你的代码,可以考虑分阶段迁移:先从 OIO 开始,等你的代码修改好之后,再迁移到 NIO(或者使用 epoll,如果你在使用 Linux)。

  • 在同一个 JVM 内部的通信

    在同一个 JVM 内部的通信,不需要通过网络暴露服务,是 Local 传输的完美用例。这将消除所有真实网络操作的开销,同时仍然使用你的 Netty 代码 库。如果随后需要通过网络暴露服务,那么你将只需要把传输改为 NIO 或者 OIO 即可。

  • 测试你的 ChannelHandler 实现

    如果你想要为自己的 ChannelHandler 实现编写单元测试,那么请考虑使用 Embedded 传输。这既便于测试你的代码,而又不需要创建大 量的模拟(mock)对象。你的类将仍然符合常规的 API 事件流,保证该 ChannelHandler 在和真实的传输一 起使用时能够正确 地工作。 你将 在第 9 章中 发现关于测试ChannelHandler 的更多信息。

表 4-5 总结了我们探讨过的用例。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905125244.png

小结

在本章中,我们研究了传输、它们的实现和使用,以及 Netty 是如何将它们呈现给开发者的。

我们深入探讨了 Netty 预置的传输,并且解释了它们的行为。因为不是所有的传输都可以在 相同的 Java 版本下工作,并且其中一些可能只在特定的操作系统下可用,所以我们也描述了它 们的最低需求。最后,我们讨论了你可以如何匹配不同的传输和特定用例的需求。

在下一章中,我们将关注于 ByteBuf 和 ByteBufHolder ——Netty 的数据容器。我们将 展示如何使用它们以及如何通过它们获得最佳性能。