本章主要内容
- 单元测试
- EmbeddedChannel 概述
- 使用 EmbeddedChannel 测试 ChannelHandler
ChannelHandler是 Netty 应用程序的关键元素,所以彻底地测试它们应该是你的开发过 程的一个标准部分。最佳实践要求你的测试不仅要能够证明你的实现是正确的,而且还要能够很 容易地隔离那些因修改代码而突然出现的问题。这种类型的测试叫作单元测试。
虽然单元测试没有统一的定义,但是大多数的从业者都有基本的共识。其基本思想是,以尽可能小的区块测试你的代码,并且尽可能地和其他的代码模块以及运行时的依赖(如数据库和网 络)相隔离。如果你的应用程序能通过测试验证每个单元本身都能够正常地工作,那么在出了问题时将可以更加容易地找出根本原因。
在本章中,我们将学习一种特殊的 Channel 实现—— EmbeddedChannel,它是 Netty 专门为改进针对 ChannelHandler 的单元测试而提供的。
因为正在被测试的代码模块或者单元将在它正常的运行时环境之外被执行,所以你需要一个 框架或者脚手架以便在其中运行它。在我们的例子中,我们将使用 JUnit 4 作为我们的测试框架, 所以你需要对它的用法有一个基本的了解。如果它对你来说比较陌生,不要害怕;虽然它功能强 大,但却很简单,你可以在 JUnit 的官方网站(www.junit.org)上找到你所需要的所有信息。
你可能会发现回顾前面关于ChannelHandler的章节很有用,因为这将为我们的示例提供素材。
EmbeddedChannel概述
你已经知道,可以将 ChannelPipeline 中的 ChannelHandler 实现链接在一起,以构 建你的应用程序的业务逻辑。我们已经在前面解释过,这种设计支持将任何潜在的复杂处理过程分解为小的可重用的组件,每个组件都将处理一个明确定义的任务或者步骤。在本章中,我们还 将展示它是如何简化测试的。
Netty 提供了它所谓的 Embedded 传输,用于测试 ChannelHandler 。这个传输是一种特殊的 Channel 实现— EmbeddedChannel —的功能,这个实现提供了通过 ChannelPipeline 传播事件的简便方法。
EmbeddedChannel 构造方法:
1
2
3
|
public EmbeddedChannel(ChannelHandler... handlers) {
this(EmbeddedChannelId.INSTANCE, handlers);
}
|
这个想法是直截了当的:将入站数据或者出站数据写入到 EmbeddedChannel 中,然后检 查是否有任何东西到达了 ChannelPipeline 的尾端。以这种方式,你便可以确定消息是否已 经被编码或者被解码过了,以及是否触发了任何的 ChannelHandler 动作。
表 9-1 中列出了EmbeddedChannel的相关方法。

入站数据由 ChannelInboundHandler 处理,代表从远程节点读取的数据。出站数据由 ChannelOutboundHandler 处理,代表将要写到远程节点的数据。根据你要测试的 Channel- Handler ,你将使用 *Inbound() 或者 *Outbound() 方法对,或者兼而有之。
图 9-1 展示了使用 EmbeddedChannel 的方法,数据是如何流经 ChannelPipeline 的。 你可以使用 writeOutbound() 方法将消息写到 Channel 中,并通过 ChannelPipeline 沿 着出站的方向传递。随后,你可以使用 readOutbound() 方法来读取已被处理过的消息,以确 定结果是否和预期一样。类似地,对于入站数据,你需要使用 writeInbound() 和 readInbound() 方法。

在每种情况下,消息都将会传递过 ChannelPipeline ,并且被相关的 ChannelInbound- Handler 或者 ChannelOutboundHandler 处理。 如果消息没有被消费, 那么你可以使用 readInbound() 或者 readOutbound() 方法来在处理过了这些消息之后,酌情把它们从 Channel 中读出来。
接下来让我们仔细看看这两种场景,以及它们是如何应用于测试你的应用程序逻辑的吧。
使用EmbeddedChannel测试ChannelHandler
在这一节中,我们将讲解如何使用 EmbeddedChannel 来测试 ChannelHandler 。
JUnit 断言
org.junit.Assert 类提供了很多用于测试的静态方法。失败的断言将导致一个异常被抛出,并 将终止当前正在执行中的测试。导入这些断言的最高效的方式是通过一个 import static 语句来实现:
import static org.junit.Assert.*;
一旦这样做了,就可以直接调用 Assert 方法了:
assertEquals(buf.readSlice(3), read);
测试入站消息
图 9-2 展示了一个简单的 ByteToMessageDecoder 实现。给定足够的数据,这个实现将 产生固定大小的帧。如果没有足够的数据可供读取,它将等待下一个数据块的到来,并将再次检 查是否能够产生一个新的帧。

正如可以从图 9-2 右侧的帧看到的那样,这个特定的解码器将产生固定为 3 字节大小的帧。 因此,它可能会需要多个事件来提供足够的字节数以产生一个帧。
最终,每个帧都会被传递给 ChannelPipeline 中的下一个 ChannelHandler 。
该解码器的实现,如代码清单 9-1 所示。
代码清单 9-1 FixedLengthFrameDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.eh.eden.netty.chapter2.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 扩展 ByteToMessageDecoder 以处理入 站字节,并将它们解码为消息
*
* @author David Li
* @create 2020/09/10
*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
// 要生成的帧的长度
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 检查是否有足够的字 节可以被读取,以生 成下一个帧
while (in.readableBytes() >= frameLength) {
// 从 ByteBuf 中 读取一个新帧
ByteBuf buf = in.readBytes(frameLength);
// 将该帧添加到已被 解码的消息列表中
out.add(buf);
}
}
}
|
现在,让我们创建一个单元测试,以确保这段代码将按照预期执行。正如我们前面所指出的, 即使是在简单的代码中,单元测试也能帮助我们防止在将来代码重构时可能会导致的问题,并且 能在问题发生时帮助我们诊断它们。
代码清单 9-2 展示了一个使用EmbeddedChannel的对于前面代码的测试。
代码清单 9-2 测试 FixedLengthFrameDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
package com.eh.eden.netty.chapter2.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
* todo
*
* @author David Li
* @create 2020/09/10
*/
public class FixedLengthFrameDecoderTest {
@Test
public void testFramesDecoded() {
// 创建一个 ByteBuf, 并存储 9 字节
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
// 创建一个 EmbeddedChannel, 并添 加一个 FixedLengthFrameDecoder, 其将以 3 字节的帧长度被测试
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
// write bytes
// 将数据写入 EmbeddedChannel
assertTrue(channel.writeInbound(input.retain()));
// 标记 Channel 为已完成状态
assertTrue(channel.finish());
// read messages
// 读取所生成的消息,并 且验证是否有 3 帧(切 片),其中每帧(切片) 都为 3 字节
ByteBuf read = channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
/**
* 第二个测试方法
*/
@Test
public void testFramesDecoded2() {
// 创建一个 ByteBuf, 并存储 9 字节
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
// 创建一个 EmbeddedChannel, 并添 加一个 FixedLengthFrameDecoder, 其将以 3 字节的帧长度被测试
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
// 返回 false, 因为 没有一个完整的 可供读取的帧
assertFalse(channel.writeInbound(input.readBytes(2)));
assertTrue(channel.writeInbound(input.readBytes(7)));
// 标记 Channel 为已完成状态
assertTrue(channel.finish());
// read messages
// 读取所生成的消息,并 且验证是否有 3 帧(切 片),其中每帧(切片) 都为 3 字节
ByteBuf read = channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
}
|
该 testFramesDecoded() 方法验证了:一个包含 9 个可读字节的 ByteBuf 被解码为 3 个 ByteBuf ,每个都包含了 3 字节。需要注意的是,仅通过一次对 writeInbound() 方法的调 用, ByteBuf 是如何被填充了 9 个可读字节的。 在此之后, 通过执行 finish() 方法, 将 EmbeddedChannel 标记为了已完成状态。最后,通过调用 readInbound() 方法,从 Embedded- Channel 中正好读取了 3 个帧和一个 null 。
testFramesDecoded2() 方法也是类似的,只有一处不同:入站 ByteBuf 是通过两个步 骤写入的。当 writeInbound(input.readBytes(2)) 被调用时,返回了 false 。为什么呢? 正如同表 9-1 中所描述的,如果对 readInbound() 的后续调用将会返回数据,那么 write- Inbound() 方法将会返回 true 。但是只有当有 3 个或者更多的字节可供读取时, FixedLengthFrameDecoder 才会产生输出。该测试剩下的部分和 testFramesDecoded() 是相同的。
测试出站消息
测试出站消息的处理过程和刚才所看到的类似。在下面的例子中,我们将会展示如何使用 EmbeddedChannel 来测试一个编码器形式的 ChannelOutboundHandler ,编码器是一种 将一种消息格式转换为另一种的组件。你将在下一章中非常详细地学习编码器和解码器,所以 现在我们只需要简单地提及我们正在测试的处理器— AbsIntegerEncoder ,它是 Netty 的 MessageToMessageEncoder 的一个特殊化的实现,用于将负值整数转换为绝对值。
该示例将会按照下列方式工作:
- 持有 AbsIntegerEncoder 的 EmbeddedChannel 将会以 4 字节的负整数的形式写出站数据;
- 编码器将从传入的 ByteBuf 中读取每个负整数,并将会调用 Math.abs() 方法来获取 其绝对值;
- 编码器将会把每个负整数的绝对值写到 ChannelPipeline 中。
图 9-3 展示了该逻辑。

代码清单 9-3 实现了这个逻辑,如图 9-3 所示。encode() 方法将把产生的值写到一个 List 中。
代码清单 9-3 AbsIntegerEncoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package com.eh.eden.netty.chapter2.encoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* 扩展 MessageToMessageEncoder 以 将一个消息编码为另外一种格式
*
* @author David Li
* @create 2020/09/10
*/
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// 检查是否有足够 的字节用来编码
while (msg.readableBytes() >= 4) {
// 从输入的 ByteBuf 中读取下一个整数, 并且计算其绝对值
int value = Math.abs(msg.readInt());
// 将该整数写入到编码 消息的 List 中
out.add(value);
}
}
}
|
代码清单 9-4 使用了EmbeddedChannel来测试代码。
代码清单 9-4 测试 AbsIntegerEncoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.eh.eden.netty.chapter2.encoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
* todo
*
* @author David Li
* @create 2020/09/10
*/
public class AbsIntegerEncoderTest {
@Test
public void testEncoded() {
// 创建一个 ByteBuf, 并且 写入 9 个负整数
ByteBuf buf = Unpooled.buffer();
for (int i = 1; i < 10; i++) {
buf.writeInt(i * -1);
}
// 创建一个EmbeddedChannel, 并安装一个要测试的 AbsIntegerEncoder
EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
// 写入 ByteBuf, 并断言调 用 readOutbound()方法将 会产生数据
assertTrue(channel.writeOutbound(buf));
// 将该 Channel 标记为已 完成状态
assertTrue(channel.finish());
// read bytes
// 读取所产生的消息, 并断言它们包含了对 应的绝对值
for (int i = 1; i < 10; i++) {
assertEquals((Integer) i, channel.readOutbound());
}
assertNull(channel.readOutbound());
}
}
|
下面是代码中执行的步骤。
- 将 4 字节的负整数写到一个新的ByteBuf中。
- 创建一个EmbeddedChannel,并为它分配一个AbsIntegerEncoder。
- 调用 EmbeddedChannel 上的 writeOutbound() 方法来写入该ByteBuf。
- 标记该 Channel 为已完成状态。
- 从 EmbeddedChannel 的出站端读取所有的整数,并验证是否只产生了绝对值。
测试异常处理
应用程序通常需要执行比转换数据更加复杂的任务。例如,你可能需要处理格式不正确的输 入或者过量的数据。在下一个示例中,如果所读取的字节数超出了某个特定的限制,我们将会抛 出一个 TooLongFrameException 。这是一种经常用来防范资源被耗尽的方法。
在图 9-4 中,最大的帧大小已经被设置为 3 字节。如果一个帧的大小超出了该限制,那么程序将 会丢弃它的字节,并抛出一个 TooLongFrameException 。位于 ChannelPipeline 中的其他 ChannelHandler 可以选择在 exceptionCaught() 方法中处理该异常或者忽略它。

其实现如代码清单 9-5 所示。
代码清单 9-5 FrameChunkDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.eh.eden.netty.chapter2.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import java.util.List;
/**
* 扩展 ByteToMessageDecoder 以将入站字 节解码为消息
*
* @author David Li
* @create 2020/09/10
*/
public class FrameChunkDecoder extends ByteToMessageDecoder {
private final int maxFrameSize;
// 指定将要产生的 帧的最大允许大小
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
// 如果该帧太大,则丢弃它并抛出一 个 TooLongFrameException……
if (readableBytes > maxFrameSize) {
// discard the bytes
in.clear();
throw new TooLongFrameException();
}
// ……否则,从 ByteBuf 中读取一个新的帧
ByteBuf buf = in.readBytes(readableBytes);
// 将该帧添加到解码消息的 List 中
out.add(buf);
}
}
|
我们再使用 EmbeddedChannel 来测试一次这段代码,如代码清单 9-6 所示。
代码清单 9-6 测试 FrameChunkDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package com.eh.eden.netty.chapter2.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.TooLongFrameException;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
* todo
*
* @author David Li
* @create 2020/09/10
*/
public class FrameChunkDecoderTest {
@Test
public void testFramesDecoded() {
// 创建一个 ByteBuf, 并存储 9 字节
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
// 创建一个 EmbeddedChannel, 并添加一个 FrameChunkDecoder
EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
// 向它写入 2 字节,并断言它 们将会产生一个新帧
assertTrue(channel.writeInbound(input.readBytes(2)));
try {
// 写入一个 4 字节大小 的帧,并捕获预期的 TooLongFrameException
channel.writeInbound(input.readBytes(4));
// 如果上面没有 抛出异常,那 么就会到达这 个断言,并且 测试失败
fail();
} catch (TooLongFrameException e) {
// expected exception
}
// 写入剩余 的 3 字节, 并断言将 会产生一 个有效帧
assertTrue(channel.writeInbound(input.readBytes(3)));
// 将该 Channel 标记 为已完成状态
assertTrue(channel.finish());
// 读取产生的消息,并且验证值
// Read frames
ByteBuf read = channel.readInbound();
assertEquals(buf.readSlice(2), read);
read.release();
read = channel.readInbound();
assertEquals(buf.skipBytes(4).readSlice(3), read);
read.release();
buf.release();
}
}
|
乍一看,这看起来非常类似于代码清单 9-2 中的测试,但是它有一个有趣的转折点,即对 TooLongFrameException 的处理。这里使用的 try/catch 块是 EmbeddedChannel 的一个特 殊功能。如果其中一个 write*
方法产生了一个受检查的 Exception ,那么它将会被包装在一个 RuntimeException 中并抛出 。这使得可以容易地测试出一个 Exception 是否在处理数据的 过程中已经被处理了。
需要注意的是,如果该类实现了 exceptionCaught()方法并处理了该异常,那么它将不会被 catch 块所捕获。
这里介绍的测试方法可以用于任何能抛出 Exception 的 ChannelHandler 实现。
小结
使用 JUnit 这样的测试工具来进行单元测试是一种非常行之有效的方式,它能保证你的代码 的正确性并提高它的可维护性。在本章中,你学习了如何使用 Netty 提供的测试工具来测试你自 定义的 ChannelHandler 。
在接下来的章节中,我们将专注于使用 Netty 编写真实世界的应用程序。我们不会再提供任何 进一步的测试代码示例了,所以我们希望你将这里所展示的测试方法的重要性牢记于心。