目录

编解码器

本章主要内容

  • 解码器、编码器以及编解码器的概述
  • Netty 的编解码器类

就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标 实现的很好的候选对象,它可以节省开发人员大量的时间和精力。

当然这也适应于本章的主题:编码和解码,或者数据从一种特定协议的格式到另一种格式的转 换。这些任务将由通常称为编解码器的组件来处理。Netty 提供了多种组件,简化了为了支持广泛 的协议而创建自定义的编解码器的过程。例如,如果你正在构建一个基于 Netty 的邮件服务器,那 么你将会发现 Netty 对于编解码器的支持对于实现 POP3、IMAP 和 SMTP 协议来说是多么的宝贵。

什么是编解码器

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码 器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区别是什么呢?

如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列—它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将 网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。

记住这些背景信息,接下来让我们研究一下 Netty 所提供的用于实现这两种组件的类。

解码器

在这一节中,我们将研究 Netty 所提供的解码器类,并提供关于何时以及如何使用它们的具 体示例。这些类覆盖了两个不同的用例:

  • 将字节解码为消息

    ByteToMessageDecoder 和 ReplayingDecoder ;

  • 将一种消息类型解码为另一种

    MessageToMessageDecoder

因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以知道 Netty 的解码器实 现了 ChannelInboundHandler 也不会让你感到意外。

什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑,这也是 Netty 是如何支持代码的模块化以及 复用的一个很好的例子。

抽象类ByteToMessageDecoder

将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 为它提供了一个抽象的基类: ByteToMessageDecoder 。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。表 10-1 解释了它最重要的两个方法。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910142338.png

比如用来产生一个 LastHttpContent 消息。

下面举一个如何使用这个类的示例,假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理。在这种情况下,你需要从入站 ByteBuf 中读取每个 int ,并将它传递给 ChannelPipeline 中的下一个 ChannelInboundHandler 。为了解码这个字节流,你要扩展 ByteToMessageDecoder 类。(需要注意的是,原子类型的 int 在被添加到 List 中时,会被 自动装箱为 Integer 。)

该设计如图 10-1 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910142445.png

每次从入站 ByteBuf 中读取 4 字节,将其解码为一个 int ,然后将它添加到一个 List 中。 当没有更多的元素可以被添加到该 List 中时, 它的内容将会被发送给下一个 ChannelInboundHandler 。

代码清单 10-1 展示了 ToIntegerDecoder 的代码。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910142621.png

虽然 ByteToMessageDecoder 使得可以很简单地实现这种模式,但是你可能会发现,在调 用 readInt() 方法前不得不验证所输入的 ByteBuf 是否具有足够的数据有点繁琐。在下一节中, 我们将讨论 ReplayingDecoder ,它是一个特殊的解码器,以少量的开销消除了这个步骤。

编解码器中的引用计数

正如我们在第 5 章和第 6 章中所提到的,引用计数需要特别的注意。对于编码器和解码器来说,其过程 也是相当的简单:一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message) 调用 自动释放。如果你需要保留引用以便稍后使用,那么你可以调用 ReferenceCountUtil.retain(message) 方法。这将会增加该引用计数,从而防止该消息被释放。

抽象类RelayingDecoder

ReplayingDecoder 扩展了 ByteToMessageDecoder 类(如代码清单 10-1 所示),使 得 我 们 不 必 调 用 readableBytes() 方 法 。 它 通 过 使 用 一 个 自 定 义 的 ByteBuf 实 现 , ReplayingDecoderByteBuf ,包装传入的 ByteBuf 实现了这一点,其将在内部执行该调用 。这个类的完整声明是:

1
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。代码清单 10-2 展示了基于 ReplayingDecoder 重新实现的 ToIntegerDecoder 。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910142842.png

和之前一样,从 ByteBuf 中提取的 int 将会被添加到 List 中。如果没有足够的字节可用,这 个 readInt() 方法的实现将会抛出一个 Error ,其将在基类中被捕获并处理。当有更多的数据可供读取时,该 decode() 方法将会被再次调用。(参见表 10-1 中关于 decode() 方法的描述。)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
public ByteBuf readBytes(int length) {
    checkReadableBytes(length);
    return buffer.readBytes(length);
}

private void checkReadableBytes(int readableBytes) {
    if (buffer.readableBytes() < readableBytes) {
        throw REPLAY;
    }
}

private static final Signal REPLAY = ReplayingDecoder.REPLAY;

请注意 ReplayingDecoderByteBuf 的下面这些方面:

  • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一 个 UnsupportedOperationException ;
  • ReplayingDecoder 稍慢于 ByteToMessageDecoder 。

如果对比代码清单 10-1 和代码清单 10-2,你会发现后者明显更简单。示例本身是很基本的, 所以请记住,在真实的、更加复杂的情况下,使用一种或者另一种作为基类所带来的差异可能是 很显著的。这里有一个简单的准则:如果使用 ByteToMessageDecoder 不会引入太多的复杂 性,那么请使用它;否则,请使用 ReplayingDecoder 。

更多的解码器

下面的这些类处理更加复杂的用例:

  • io.netty.handler.codec.LineBasedFrameDecoder —这个类在 Netty 内部也有使 用,它使用了行尾控制字符( \n 或者 \r\n )来解析消息数据;
  • io.netty.handler.codec.http.HttpObjectDecoder —一个 HTTP 数据的解码器。

在 io.netty.handler.codec 子包下面,你将会发现更多用于特定用例的编码器和解码器实现。 更多有关信息参见 Netty 的 Javadoc。

抽象类MessageToMessageDecoder

在这一节中,我们将解释如何使用下面的抽象基类在两个消息格式之间进行转换(例如, 从一种 POJO 类型转换为另一种):

1
2
public abstract class MessageToMessageDecoder<I>
	extends ChannelInboundHandlerAdapter

类型参数 I 指定了 decode() 方法的输入参数 msg 的类型,它是你必须实现的唯一方法。

表 10-2 展示了这个方法的详细信息。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910143441.png

在这个示例中,我们将编写一个 IntegerToStringDecoder 解码器来扩展 MessageToMessageDecoder<Integer> 。它的 decode() 方法会把 Integer 参数转换为它的 String 表示,并将拥有下列签名:

1
public void decode( ChannelHandlerContext ctx, Integer msg, List<Object> out ) throws Exception

和之前一样,解码的 String 将被添加到传出的 List 中,并转发给下一个 ChannelInboundHandler 。

该设计如图 10-2 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910143621.png

代码清单 10-3 给出了 IntegerToStringDecoder 的实现。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910143657.png

HttpObjectAggregator

有关更加复杂的例子,请研究 io.netty.handler.codec.http.HttpObjectAggregator 它扩展了 MessageToMessageDecoder<HttpObject>

TooLongFrameException

由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能 让解码器缓冲大量的数据以至于耗尽可用的内存。 为了解除这个常见的顾虑, Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一 个 TooLongFrameException (随后会被 ChannelHandler.exceptionCaught() 方法捕 获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你 返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。

代码清单 10-4 展示了 ByteToMessageDecoder 是如何使用 TooLongFrameException 来通知 ChannelPipeline 中的其他 ChannelHandler 发生了帧大小溢出的。需要注意的是,

如果你正在使用一个可变帧大小的协议,那么这种保护措施将是尤为重要的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910144025.png

到目前为止,我们已经探讨了解码器的常规用例,以及 Netty 所提供的用于构建它们的抽象 基类。但是解码器只是硬币的一面。硬币的另一面是编码器,它将消息转换为适合于传出传输的 格式。这些编码器完备了编解码器 API,它们将是我们的下一个主题。

编码器

回顾一下我们先前的定义,编码器实现了 ChannelOutboundHandler ,并将出站数据从 一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:

  • 将消息编码为字节;
  • 将消息编码为消息

我们将首先从抽象基类 MessageToByteEncoder 开始来对这些类进行考察。

抽象类MessageToByteEncoder

前面我们看到了如何使用 ByteToMessageDecoder 来将字节转换为消息。现在我们将使 用 MessageToByteEncoder 来做逆向的事情。表 10-3 展示了该 API。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910144218.png

你可能已经注意到了,这个类只有一个方法,而解码器有两个。原因是解码器通常需要在 Channel 关闭之后产生最后一个消息(因此也就有了 decodeLast() 方法)。这显然不适用于 编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的。

图 10-3 展示了 ShortToByteEncoder ,其接受一个 Short 类型的实例作为消息,将它编码 为 Short 的原子类型值,并将它写入 ByteBuf 中,其将随后被转发给 ChannelPipeline 中的 下一个 ChannelOutboundHandler 。每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910144345.png

ShortToByteEncoder 的实现如代码清单 10-5 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910144403.png

Netty 提供了一些专门化的 MessageToByteEncoder ,你可以基于它们实现自己的编码器。 WebSocket08FrameEncoder 类提供了一个很好的实例。 你可以在 io.netty.handler. codec.http.websocketx 包中找到它。

抽象类MessageToMessageEncoder

你已经看到了如何将入站数据从一种消息格式解码为另一种。为了完善这幅图,我们将展示 对于出站数据将如何从一种消息编码为另一种。 MessageToMessageEncoder 类的 encode() 方法提供了这种能力,如表 10-4 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910144956.png

为了演示,代码清单 10-6 使用 IntegerToStringEncoder 扩展了 MessageToMessageEncoder 。其设计如图 10-4 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910145031.png

如代码清单 10-6 所示,编码器将每个出站 Integer 的 String 表示添加到了该 List 中。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910145112.png

关于有趣的 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler. codec.protobuf.ProtobufEncoder 类,它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式。

抽象的编解码器类

虽然我们一直将解码器和编码器作为单独的实体讨论,但是你有时将会发现在同一个类中管理 入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对,以处理我们一直在学习的这两种类型的操作。正如同你可能已经猜想到的,这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口。

为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢?因为通过尽可能 地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则。

在我们查看这些抽象的编解码器类时,我们将会把它们与相应的单独的解码器和编码器进行 比较和参照。

抽象类ByteToMessageCodec

让我们来研究这样的一个场景:我们需要将字节解码为某种形式的消息,可能是 POJO,随 后再次对它进行编码。 ByteToMessageCodec 将为我们处理好这一切, 因为它结合了 ByteToMessageDecoder 以及它的逆向—— MessageToByteEncoder 。表 10-5 列出了其中 重要的方法。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910145419.png

任何的请求/响应协议都可以作为使用 ByteToMessageCodec 的理想选择。例如,在某个 SMTP的实现中, 编解码器将读取传入字节, 并将它们解码为一个自定义的消息类型, 如 SmtpRequest 。而在接收端,当一个响应被创建时,将会产生一个 SmtpResponse ,其将被 编码回字节以便进行传输。

抽象类MessageToMessageCodec

在 10.3.1 节中,你看到了一个扩展了 MessageToMessageEncoder 以将一种消息格式转 换为另外一种消息格式的例子。通过使用 MessageToMessageCodec ,我们可以在一个单个的 类中实现该转换的往返过程。 MessageToMessageCodec 是一个参数化的类,定义如下:

1
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>

表 10-6 列出了其中重要的方法。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910145737.png

decode() 方 法 是 将 INBOUND_IN 类 型 的 消 息 转 换 为 OUTBOUND_IN 类 型 的 消 息 , 而 encode() 方法则进行它的逆向操作。将 INBOUND_IN 类型的消息看作是通过网络发送的类型, 而将 OUTBOUND_IN 类型的消息看作是应用程序所处理的类型,将可能有所裨益 。

虽然这个编解码器可能看起来有点高深,但是它所处理的用例却是相当常见的:在两种不同 的消息 API 之间来回转换数据。当我们不得不和使用遗留或者专有消息格式的 API 进行互操作 时,我们经常会遇到这种模式。

WebSocket 协议

下面关于 MessageToMessageCodec 的示例引用了一个新出的 WebSocket 协议,这个协议能实 现 Web 浏览器和服务器之间的全双向通信。我们将在第 12 章中详细地讨论 Netty 对于 WebSocket 的 支持。

代码清单 10-7 展示了这样的对话 可能的实现方式。我们的 WebSocketConvertHandler 在参数化 MessageToMessageCodec 时将使用 INBOUND_IN 类型的 WebSocketFrame , 以及 OUTBOUND_IN 类型的 MyWebSocketFrame ,后者是 WebSocketConvertHandler 本身的一个 静态嵌套类。

对话是指 Web 浏览器和服务器之间的双向通信。

代码清单 10-7 使用 MessageToMessageCodec

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.eh.eden.netty.chapter2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.websocketx.*;
import lombok.Getter;

import java.util.List;

/**
 * todo
 *
 * @author David Li
 * @create 2020/09/05
 */
class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {

    // 将 MyWebSocketFrame 编码 为指定的 WebSocketFrame 子类型
    @Override
    protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();
        // 实例化一个指定子类型的 WebSocketFrame
        switch (msg.getType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, payload));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(payload));
                break;
            case PING:
                out.add(new PingWebSocketFrame(payload));
                break;
            default:
                throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    // 将 WebSocketFrame 解码为 MyWebSocketFrame, 并设置 FrameType
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf payload = msg.content().duplicate().retain();
        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.BINARY, payload));
        } else if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.CLOSE, payload));
        } else if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.PING, payload));
        } else if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.PONG, payload));
        } else if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.TEXT, payload));
        } else if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.CONTINUATION, payload));
        } else {
            throw new IllegalStateException(
                    "Unsupported websocket msg " + msg);
        }
    }

    // 声明 WebSocketConvertHandler 所使用的 OUTBOUND_IN 类型
    public static final class MyWebSocketFrame {
        // 定义拥有被包装的有效 负载的 WebSocketFrame 的类型
        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }

        @Getter
        private final FrameType type;
        @Getter
        private final ByteBuf data;

        public MyWebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }
    }
}

CombinedChannelDuplexHandler类

正如我们前面所提到的,结合一个解码器和编码器可能会对可重用性造成影响。但是,有一 种方法既能够避免这种惩罚,又不会牺牲将一个解码器和一个编码器作为一个单独的单元部署所 带来的便利性。 CombinedChannelDuplexHandler 提供了这个解决方案,其声明为:

1
2
3
public class CombinedChannelDuplexHandler
	<I extends ChannelInboundHandler,
	O extends ChannelOutboundHandler>

这个类充当了 ChannelInboundHandler 和 ChannelOutboundHandler (该类的类型 参数 I 和 O )的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解 码器,而又不必直接扩展抽象的编解码器类。我们将在下面的示例中说明这一点。

首先,让我们研究代码清单 10-8 中的 ByteToCharDecoder 。注意,该实现扩展了ByteToMessageDecoder ,因为它要从 ByteBuf 中读取字符。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910151020.png

这里的 decode() 方法一次将从 ByteBuf 中提取 2 字节,并将它们作为char写入到 List中,其将会被自动装箱为 Character 对象。

代码清单 10-9 包含了 CharToByteEncoder ,它能将 Character 转换回字节。这个类扩 展了 MessageToByteEncoder ,因为它需要将 char 消息编码到 ByteBuf 中。这是通过直接 写入 ByteBuf 做到的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910151115.png

既然我们有了解码器和编码器,我们将会结合它们来构建一个编解码器。代码清单 10-10 展 示了这是如何做到的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200910151138.png

正如你所能看到的,在某些情况下,通过这种方式结合实现相对于使用编解码器类的方式来说可能更加的简单也更加的灵活。当然,这可能也归结于个人的偏好问题。

小结

在本章中,我们学习了如何使用 Netty 的编解码器 API 来编写解码器和编码器。你也了解了 为什么使用这个 API 相对于直接使用 ChannelHandler API 更好。

你看到了抽象的编解码器类是如何为在一个实现中处理解码和编码提供支持的。如果你需要 更大的灵活性,或者希望重用现有的实现,那么你还可以选择结合他们,而无需扩展任何抽象的 编解码器类。

在下一章中,我们将讨论作为 Netty 框架本身的一部分的 ChannelHandler 实现和编解码器,你可以利用它们来处理特定的协议和任务。