目录

ByteBuf

本章主要内容

  • ByteBuf ——Netty 的数据容器
  • API 的详细信息
  • 用例
  • 内存分配

正如前面所提到的,网络数据的基本单位总是字节。Java NIO 提供了 的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。

Netty的ByteBuffer 替代品是 ByteBuf , 一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。

在本章中我们将会说明和 JDK 的 ByteBuffer 相比, ByteBuf 的卓越功能性和灵活性。这 也将有助于更好地理解 Netty 数据处理的一般方式,并为将在第 6 章中针对 ChannelPipeline 和 ChannelHandler 的讨论做好准备。

ByteBuf的API

Netty 的数据处理 API 通过两个组件暴露—— abstract class ByteBuf 和 interface ByteBufHolder 。

下面是一些 ByteBuf API 的优点:

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于 JDK 的StringBuilder);
  • 在读和写这两种模式之间切换不需要调用ByteBuffer的flip() 方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化。

其他类可用于管理 ByteBuf 实例的分配,以及执行各种针对于数据容器本身和它所持有的 数据的操作。我们将在仔细研究 ByteBuf 和 ByteBufHolder 时探讨这些特性。

ByteBuf类——Netty的数据容器

因为所有的网络通信都涉及字节序列的移动,所以高效易用的数据结构明显是必不可少的。 Netty 的 ByteBuf 实现满足并超越了这些需求。让我们首先来看看它是如何通过使用不同的索引 来简化对它所包含的数据的访问的吧。

它是如何工作的

1
2
ByteBuf byteBuf = Unpooled.copiedBuffer("你好".getBytes(Charsets.UTF_8));
System.out.println(byteBuf.toString(Charsets.UTF_8));

ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增。图 5-1 展示了一个空 ByteBuf 的布局结构和状态。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905181544.png

要了解这些索引两两之间的关系,请考虑一下,如果打算读取字节直到 readerIndex 达到 和 writerIndex 同样的值时会发生什么。在那时,你将会到达“可以读取的”数据的末尾。就 如同试图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个 IndexOutOfBoundsException 。

名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或 者 get 开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作

可以指定 ByteBuf 的最大容量。试图移动写索引(即 writerIndex )超过这个值将会触 发一个异常 。(默认的限制是 Integer.MAX_VALUE 。)

也就是说用户直接或者间接使 capacity(int)或者 ensureWritable(int)方法来增加超过该最大 容量时抛出异常。—译者注

ByteBuf的使用模式

在使用 Netty 时,你将遇到几种常见的围绕 ByteBuf 而构建的使用模式。在研究它们时,我们心里想着图 5-1 会有所裨益 —— 一个由不同的索引分别控制读访问和写访问的字节数组。

堆缓冲区

最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。 这种模式被称为支撑数组 (backing array),它能在没有使用池化的情况下提供快速的分配和释放。这种方式,如代码清单 5-1 所示,非常适合于有遗留的数据需要处理的情况。

代码清单 5-1 支撑数组

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Demo {

    public static void main(String[] args) {
        ByteBuf heapBuf = Unpooled.copiedBuffer("Hello world".getBytes(Charsets.UTF_8));
        // 如果使用下面这种方式则会有问题,支持数组的长度计算:
        // int length = (int) ((double) src.remaining() * encoder.maxBytesPerChar()) + extraCapacity;
        // utf8 每个字符占3个字节,所以会在原始字符个数上乘以3
        // ByteBuf heapBuf = Unpooled.copiedBuffer("Hello world", Charsets.UTF_8);
        if (heapBuf.hasArray()) { // 检查 ByteBuf 是否 有一个支撑数组
            byte[] array = heapBuf.array(); // 如果有,则获取 对该数组的引用
            // 计算第一个字节 的偏移量。
            int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
            // 获得可读 字节数
            int length = heapBuf.readableBytes();
            // 使用数组、偏移量和长度 作为参数调用你的方法
            handleArray(array, offset, length);
        }
    }

    private static void handleArray(byte[] array, int offset, int length) {
        System.out.println("array: ");
        for (byte b : array) {
            System.out.print((char) b);
        }
        System.out.println();

        System.out.println("内置数组长度: " + array.length);
        System.out.println("offset: " + offset);
        System.out.println("length: " + length);
    }
}
// 运行结果
array: 
Hello world
内置数组长度: 11
offset: 0
length: 11

注意: 当 hasArray() 方法返回 false 时, 尝试访问支撑数组将触发一个Unsupported OperationException 。这个模式类似于 JDK 的 ByteBuffer 的用法。

直接缓冲区

直接缓冲区是另外一种 ByteBuf 模式。我们期望用于对象创建的内存分配永远都来自于堆 中,但这并不是必须的——NIO 在 JDK 1.4 中引入的 ByteBuffer 类允许 JVM 实现通过本地调 用来分配内存。这主要是为了避免在每次调用本地 I/O 操作之前(或者之后)将缓冲区的内容复 制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。

中间缓冲区指的就是内核缓冲区

ByteBuffer的Javadoc 明确指出:“直接缓冲区的内容将驻留在常规的会被垃圾回收的堆 之外。”这也就解释了为何直接缓冲区对于网络数据传输是理想的选择。如果你的数据包含在一 个在堆上分配的缓冲区中,那么事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲 区复制到一个直接缓冲区中。

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。如果你 正在处理遗留代码,你也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一 次复制,如代码清单 5-2 所示。

代码清单 5-2 访问直接缓冲区的数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Demo {

    public static void main(String[] args) {
        // 默认初始大小 256个字节, 最大大小 Integer.MAX_VALUE
        ByteBuf directBuf = Unpooled.directBuffer();
        directBuf.writeBytes("Hello world".getBytes(Charsets.UTF_8));
        // 检查 ByteBuf 是否由数 组支撑。如果不是,则 这是一个直接缓冲区
        if (!directBuf.hasArray()) {
            int length = directBuf.readableBytes(); // 获取可读 字节数
            // 分配一个新的数组来保存 具有该长度的字节数据
            byte[] array = new byte[length];
            // 将字节复制到该数组
            directBuf.getBytes(directBuf.readerIndex(), array);
            // 使用数组、偏移量和长度 作为参数调用你的方法
            handleArray(array, 0, length);
        }
    }

    private static void handleArray(byte[] array, int offset, int length) {
        System.out.println("array: ");
        for (byte b : array) {
            System.out.print((char) b);
        }
        System.out.println();

        System.out.println("内置数组长度: " + array.length);
        System.out.println("offset: " + offset);
        System.out.println("length: " + length);
    }
}
// 运行结果
array: 
Hello world
内置数组长度: 11
offset: 0
length: 11

显然,与使用支撑数组相比,这涉及的工作更多(如果需要作为数组访问的话)。因此,如果事先知道容器中的数据将会被 作为数组来访问,你可能更愿意使用堆内存。

复合缓冲区

第三种也是最后一种模式使用的是复合缓冲区,它为多个 ByteBuf 提供一个聚合视图。在 这里你可以根据需要添加或者删除 ByteBuf 实例,这是一个 JDK 的 ByteBuffer 实现完全缺 失的特性。

Netty 通过一个 ByteBuf子类—— CompositeByteBuf ——实现了这个模式, 它提供了一 个将多个缓冲表示为单个合并缓冲区的虚拟表示。

警告

CompositeByteBuf 中的 ByteBuf 实例可能同时包含直接内存分配和非直接内存分配。 如果其中只有一个实例,那么对 CompositeByteBuf 上的 hasArray() 方法的调用将返回该组 件上的 hasArray() 方法的值;否则它将返回 false 。

为了举例说明,让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议 传输的消息。这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序 可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新 的头部。

因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个 完美的选择。它在消除了没必要的复制的同时,暴露了通用的 ByteBuf API。图 5-2 展示了生成 的消息布局。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905210027.png

代码清单 5-3 展示了如何通过使用 JDK 的 ByteBuffer 来实现这一需求。创建了一个包含 两个 ByteBuffer 的数组用来保存这些消息组件,同时创建了第三个 ByteBuffer 用来保存所 有这些数据的副本。

代码清单 5-3 使用 ByteBuffer 的复合缓冲区模式

1
2
3
4
5
6
7
8
9
ByteBuffer header = ...;
ByteBuffer body = ...;
// Use an array to hold the message parts
ByteBuffer[] message = new ByteBuffer[]{header, body};
// Create a new ByteBuffer and use copy to merge the header and body
ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();

分配和复制操作,以及伴随着对数组管理的需要,使得这个版本的实现效率低下而且笨拙。 代码清单 5-4 展示了一个使用了 CompositeByteBuf 的版本。

代码清单 5-4 使用 CompositeByteBuf 的复合缓冲区模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ByteBuf header = ...; // can be backing(支撑) or direct
ByteBuf body = ...; // can be backing or direct
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
// 将 ByteBuf 实例追加 到 CompositeByteBuf
messageBuf.addComponent(header);
messageBuf.addComponent(body);
...
// 删除位于索引位置为 0 (第一个组件)的 ByteBuf
messageBuf.removeComponent(0);
// 循环遍历所有 的 ByteBuf 实例
for (ByteBuf byteBuf : messageBuf) {
    System.out.println(byteBuf.toString());
}

CompositeByteBuf 可能不支持访问其支撑数组,因此访问 CompositeByteBuf 中的数据类似于(访问)直接缓冲区的模式,如代码清单 5-5 所示。

代码清单 5-5 访问 CompositeByteBuf 中的数据

1
2
3
4
5
6
7
8
9
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
// 获得可读 字节数
int length = compBuf.readableBytes();
// 分配一个具有可读字节 数长度的新数组
byte[] array = new byte[length];
// 将字节读到 该数组中
compBuf.getBytes(compBuf.readerIndex(), array);
// 使用偏移量和长度作 为参数使用该数组
handleArray(array, 0, array.length);

需要注意的是,Netty使用了 CompositeByteBuf 来优化套接字的I/O操作,尽可能地消除了 由JDK的缓冲区实现所导致的性能以及内存使用率的惩罚。 这种优化发生在Netty的核心代码中, 因此不会被暴露出来,但是你应该知道它所带来的影响。

这尤其适用于 JDK 所使用的一种称为分散/收集 I/O(Scatter/Gather I/O)的工作,定义为“一种输入和 输出的方法,其中,单个系统调用从单个数据流写到一组缓冲区中,或者,从单个数据源读到一组缓冲 区中”

CompositeByteBuf API 除了从 ByteBuf 继承的方法, CompositeByteBuf 提供了大量的附 加功能。请参考 Netty 的 Javadoc 以获得该 API 的完整列表。

字节级操作

ByteBuf 提供了许多超出基本读、写操作的方法用于修改它的数据。在接下来的章节中, 我们将会讨论这些中最重要的部分。

随机访问索引

如同在普通的 Java 字节数组中一样, ByteBuf 的索引是从零开始的:第一个字节的索引是 0,最后一个字节的索引总是 capacity() - 1 。代码清单 5-6 表明,对存储机制的封装使得遍 历 ByteBuf 的内容非常简单。

代码清单 5-6 访问数据

1
2
3
4
5
ByteBuf buf = Unpooled.copiedBuffer("Hello world".getBytes(Charsets.UTF_8));
for (int i = 0; i < buf.capacity(); i++) {
    byte b = buf.getByte(i);
    System.out.println((char) b);
}

需要注意的是,使用那些需要一个索引值参数的方法(的其中)之一来访问数据既不会改变 readerIndex 也不会改变 writerIndex 。如果有需要,也可以通过调用 readerIndex(index) 或者 writerIndex(index) 来手动移动这两者。

顺序访问索引

虽然 ByteBuf 同时具有读索引和写索引,但是 JDK 的 ByteBuffer 却只有一个索引,这 也就是为什么必须调用 flip() 方法来在读模式和写模式之间进行切换的原因。图 5-3 展示了 ByteBuf 是如何被它的两个索引划分成 3 个区域的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905211824.png

可丢弃字节

在图 5-3 中标记为可丢弃字节的分段包含了已经被读过的字节。通过调用 discardRead- Bytes() 方法,可以丢弃它们并回收空间。这个分段的初始大小为 0,存储在 readerIndex 中, 会随着 read 操作的执行而增加( get* 操作不会移动 readerIndex )。

图 5-4 展示了图 5-3 中所展示的缓冲区上调用 discardReadBytes() 方法后的结果。可以看 到,可丢弃字节分段中的空间已经变为可写的了。注意,在调用 discardReadBytes() 之后,对 可写分段的内容并没有任何的保证 。

因为只是移动了可以读取的字节以及 writerIndex,而没有对所有可写入的字节进行擦除写。

https://gitee.com/lienhui68/picStore/raw/master/null/20200905211938.png

虽然你可能会倾向于频繁地调用 discardReadBytes() 方法以确保可写分段的最大化,但是 请注意,这将极有可能会导致内存复制,因为可读字节(图中标记为 CONTENT 的部分)必须被移 动到缓冲区的开始位置。我们建议只在有真正需要的时候才这样做,例如,当内存非常宝贵的时候。

可读字节

ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的 readerIndex 值为 0。任何名称以 read 或者 skip 开头的操作都将检索或者跳过位于当前 readerIndex 的数据,并且将它增加已读字节数。

如果被调用的方法需要一个 ByteBuf 参数作为写入的目标,并且没有指定目标索引参数, 那么该目标缓冲区的 writerIndex 也将被增加,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Unpooled.copiedBuffer,初始大小和最大大小都为字符序列字节大小
ByteBuf src = Unpooled.copiedBuffer("Hello".getBytes(Charsets.UTF_8));
// 必须要初始化initialCapacity,否则默认的initialCapacity是256,
// src只有5个可读字节,dest从src读5个字节后接着读,就会报IndexOutOfBoundsException
ByteBuf dest = Unpooled.buffer(5);
System.out.println(src.toString() + dest.toString());
src.readBytes(dest);
System.out.println(src.toString() + dest.toString());

// 运行结果
UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5)UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 5)
UnpooledHeapByteBuf(ridx: 5, widx: 5, cap: 5/5)UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 5)

如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据,那么将会引发一个 IndexOutOfBoundsException 。

代码清单 5-7 展示了如何读取所有可以读的字节。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Unpooled.copiedBuffer,初始大小和最大大小都为字符序列字节大小
ByteBuf src = Unpooled.copiedBuffer("Hello".getBytes(Charsets.UTF_8));
// 这次没有初始化initialCapacity,默认的initialCapacity是256,挨个字节读取就没有问题
ByteBuf dest = Unpooled.buffer();
while (src.isReadable()) {
    /**
     * writeByte
     * Sets the specified byte at the current {@code writerIndex}
     * and increases the {@code writerIndex} by {@code 1} in this buffer.
     * The 24 high-order bits of the specified value are ignored.
     *
     * @throws IndexOutOfBoundsException
     *         if {@code this.writableBytes} is less than {@code 1}
     */
    dest.writeByte(src.readByte());
}
System.out.println(dest.toString());

可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。 新分配的缓冲区的 writerIndex 的默认值为 0。任何名称以 write 开头的操作都将从当前的 writerIndex 处 开始写数据,并将它增加已经写入的字节数。如果写操作的目标也是 ByteBuf ,并且没有指定 源索引的值,则源缓冲区的 readerIndex 也同样会被增加相同的大小。这个调用如下所示:

1
2
3
4
5
@Override
public ByteBuf writeBytes(byte[] src) {
    writeBytes(src, 0, src.length);
    return this;
}

如果尝试往目标写入超过目标容量的数据,将会引发一个 IndexOutOfBoundException 。 代码清单5-8 是一个用随机整数值填充缓冲区,直到它空间不足为止的例子。 writeableBytes() 方法在这里被用来确定该缓冲区中是否还有足够的空间。

1
2
3
4
5
6
7
8
// Fills the writable bytes of a buffer with random integers.
Random random = new Random();
ByteBuf dest = Unpooled.buffer(5, 5);
while (true) {
    dest.writeInt(random.nextInt());
}
// 运行结果
IndexOutOfBoundsException: writerIndex(4) + minWritableBytes(4) exceeds maxCapacity(5)

在往 ByteBuf 中写入数据时,其将首先确保目标 ByteBuf 具有足够的可写入空间来容纳当前要写入 的数据,如果没有,则将检查当前的写索引以及最大容量是否可以在扩展后容纳该数据,可以则会分配 并调整容量,否则就会抛出该异常。

索引管理

JDK 的 InputStream 定义了 mark(int readlimit) 和 reset() 方法,这些方法分别 被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。

同样,可以通过调用 markReaderIndex() 和 markWriterIndex() 、 resetWriterIndex() 、 resetReaderIndex() 来标记和重置 ByteBuf 的 readerIndex 和 writerIndex 。这些和 InputStream 上的调用类似,只是没有 readlimit 参数来指定标记什么时候失效。

也可以通过调用 readerIndex(int) 或者 writerIndex(int) 来将索引移动到指定位置。试 图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException 。

可以通过调用 clear() 方法来将 readerIndex 和 writerIndex 都设置为 0。注意,这 并不会清除内存中的内容。图 5-5(重复上面的图 5-3)展示了它是如何工作的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906053416.png

和之前一样, ByteBuf 包含 3 个分段。图 5-6 展示了在 clear() 方法被调用之后 ByteBuf 的状态。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906053446.png

调用 clear() 比调用 discardReadBytes() 轻量得多,因为它将只是重置索引而不会复 制任何的内存。

查找操作

在 ByteBuf 中有多种可以用来确定指定值的索引的方法。最简单的是使用 indexOf() 方法。 较复杂的查找可以通过那些需要一个 ByteProcessor 作为参数的方法达成。

在 Netty 4.1.x 之前可以使用ByteBufProcessor,不过在4.1之后该类已经废弃,请使用 io.netty.util.ByteProcessor。

这个接口只定 义了一个方法:

1
boolean process(byte value) throws Exception;

它将检查输入值是否是正在查找的值。

ByteProcessor 针对一些常见的值定义了许多便利的方法。假设你的应用程序需要和 所谓的包含有以NULL结尾的内容的Flash套接字 集成。调用

1
2
ByteBuf buf = ...;
buf.forEachByte(ByteProcessor.FIND_NUL);

将简单高效地消费该 Flash 数据,因为在处理期间只会执行较少的边界检查。 代码清单 5-9 展示了一个查找回车符( \r )的例子。

代码清单 5-9 使用 ByteBufProcessor 来寻找 \r

1
2
3
4
5
ByteBuf buffer = Unpooled.copiedBuffer("Hello\r".getBytes());
int index = buffer.forEachByte(ByteProcessor.FIND_CR);
System.out.println(index);
// 运行结果
5

派生缓冲区

派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方 法被创建的:

  • duplicate()
  • slice()
  • slice(int, int) ;
  • Unpooled.unmodifiableBuffer(…);
  • order(ByteOrder);
  • readSlice(int)

每个这些方法都将返回一个新的 ByteBuf 实例, 它具有自己的读索引、写索引和标记 索引。 其内部存储和 JDK 的 ByteBuffer 一样也是共享的。 这使得派生缓冲区的创建成本 是很低廉的, 但是这也意味着, 如果你修改了它的内容, 也同时修改了其对应的源实例, 所 以要小心。

ByteBuf 复制 :如果需要一个现有缓冲区的真实副本,请使用 copy() 或者 copy(int, int) 法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本

代码清单 5-10 展示了如何使用 slice(int, int) 方法来操作 ByteBuf 的一个分段。

代码清单 5-10 对 ByteBuf 进行切片

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 创建一个用于保存给定字符 串的字节的 ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", Charsets.UTF_8);
// 创建该 ByteBuf 从索 引 0 开始到索引 5 结束的一个新切片
ByteBuf sliced = buf.slice(0, 5);
System.out.println(sliced.toString(Charsets.UTF_8));
// 更新索引 0 处的字节
buf.setByte(0, (byte)'J');
// 将会成功,因为数据是共享的,对其中 一个所做的更改对另外一个也是可见的
assert buf.getByte(0) == sliced.getByte(0); // right
// 运行结果
Netty

现在,让我们看看 ByteBuf 的分段的副本和切片有何区别,如代码清单 5-11 所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
Charset utf8 = Charset.forName("UTF-8");
// 创建 ByteBuf 以保存 所提供的字符串的字节
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 创建该 ByteBuf 从索 引 0 开始到索引 15 结束的分段的副本
ByteBuf copy = buf.copy(0, 15);
// 将打印 “Netty in Action”
System.out.println(copy.toString(utf8));
// 更新索引 0 处的字节
buf.setByte(0, (byte) 'J');

// 将会成功,因为数据不是共享的
assert buf.getByte(0) != copy.getByte(0);
// 运行结果
Netty in Action

除了修改原始 ByteBuf 的切片或者副本的效果以外,这两种场景是相同的。只要有可能, 使用 slice() 方法来避免复制内存的开销。

读/写操作

正如我们所提到过的,有两种类别的读/写操作:

  • get() 和 set() 操作,从给定的索引开始,并且保持索引不变;
  • read() 和 write() 操作,从给定的索引开始, 并且会根据已经访问过的字节数对索引进行调整。

表 5-1 列举了最常用的get() 方法。完整列表请参考对应的 API 文档。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906055334.png

大多数的这些操作都有一个对应的 set()方法。这些方法在表 5-2 中列出。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906055401.png

代码清单 5-12 说明了 get() 和 set() 方法的用法,表明了它们不会改变读索引和写索引。

代码清单 5-12 get() 和 set() 方法的用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 打印第一 个字符'N'
System.out.println((char) buf.getByte(0));
// 存储当前的 readerIndex 和 writerIndex
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
// 将索引 0 处的字 节更新为字符'B'
buf.setByte(0, (byte) 'B');
// 打印第一个字 符,现在是'B'
System.out.println((char) buf.getByte(0));
// 将会成功,因为这些操作 并不会修改相应的索引
assert readerIndex == buf.readerIndex();
assert writerIndex == buf.writerIndex();
// 运行结果
N
B

注意

1
2
3
4
5
6
7
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
buf.writeInt(2);
buf.writeInt(3);
System.out.println(buf.getInt(0)); // 1
System.out.println(buf.getInt(1)); // 256
System.out.println(buf.getInt(2)); // 65536

现在,让我们研究一下 read() 操作,其作用于当前的 readerIndex 或 writerIndex 这些方法将用于从 ByteBuf 中读取数据,如同它是一个流。表 5-3 展示了最常用的方法。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906060635.png

几乎每个 read() 方法都有对应的 write() 方法,用于将数据追加到 ByteBuf 中。注意, 表 5-4 中所列出的这些方法的参数是需要写入的值,而不是索引值。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906060723.png

代码清单 5-13 展示了这些方法的用法。

代码清单 5-13 ByteBuf 上的 read() 和 write() 操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Charset utf8 = Charset.forName("UTF-8");
// 创建一个新的 ByteBuf 以保存 给定字符串的字节,maxCapacity = Integer.MAX_VALUE
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 打印第一 个字符'N'
System.out.println((char) buf.readByte());
// 存储当前的 readerIndex
int readerIndex = buf.readerIndex();
// 存储当前的 writerIndex
int writerIndex = buf.writerIndex();
// 将字符'?'追加 到缓冲区
buf.writeByte((byte) '?');
assert readerIndex == buf.readerIndex();
// 将会成功,因为 writeByte() 方法移动了 writerIndex
assert writerIndex != buf.writerIndex();
// 运行结果
N

更多的操作

表 5-5 列举了由 ByteBuf 提供的其他有用操作。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906061311.png

ByteBufHolder接口

我们经常发现,除了实际的数据负载之外,我们还需要存储各种属性值。HTTP 响应便是一 个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。

为了处理这种常见的用例,Netty 提供了 ByteBufHolder 。 ByteBufHolder 也为 Netty 的 高级特性提供了支持,如缓冲区池化,其中可以从池中借用 ByteBuf ,并且在需要时自动释放。

ByteBufHolder 只有几种用于访问底层数据和引用计数的方法。表 5-6 列出了它们(这里 不包括它继承自 ReferenceCounted 的那些方法)。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906061449.png

如果想要实现一个将其有效负载存储在 ByteBuf 中的消息对象,那么 ByteBufHolder 将 是个不错的选择。

ByteBuf分配

在这一节中,我们将描述管理 ByteBuf 实例的不同方式。

按需分配:ByteBufAllocator接口

为了降低分配和释放内存的开销,Netty 通过 interface ByteBufAllocator 实现了 ( ByteBuf 的)池化,它可以用来分配我们所描述过的任意类型的 ByteBuf 实例。使用池化是特定于应用程序的决定,其并不会以任何方式改变 ByteBuf API(的语义)。 表 5-7 列出了 ByteBufAllocator 提供的一些操作。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906061727.png

ioBuffer(), 默认地,当所运行的环境具有 sun.misc.Unsafe 支持时,返回基于直接内存存储的 ByteBuf,否则 返回基于堆内存存储的 ByteBuf;当指定使用 PreferHeapByteBufAllocator 时,则只会返回基 于堆内存存储的 ByteBuf。

可以通过 Channel (每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。 代码清单 5-14 说明了这两种方法。

代码清单 5-14 获取一个到 ByteBufAllocator 的引用

1
2
3
4
5
6
7
8
9
Channel channel = ...;
// 从 Channel 获取一个到 ByteBufAllocator 的引用
ByteBufAllocator allocator = channel.alloc();
....

ChannelHandlerContext ctx = ...;
// 从 ChannelHandlerContext 获取一个 到 ByteBufAllocator 的引用
ByteBufAllocator allocator2 = ctx.alloc();
...

Netty提供了两种 ByteBufAllocator 的实现: PooledByteBufAllocator 和 Unpooled- ByteBufAllocator 。前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片。此实 现使用了一种称为jemalloc 的已被大量现代操作系统所采用的高效方法来分配内存。后者的实现不池化 ByteBuf 实例,并且在每次它被调用时都会返回一个新的实例。

虽然Netty默认 使用了 PooledByteBufAllocator , 但这可以很容易地通过 ChannelConfig API或者在引导你的应用程序时指定一个不同的分配器来更改。更多的细节可在第8 章中找到。

这里指 Netty4.1.x,Netty4.0.x 默认使用的是 UnpooledByteBufAllocator。

Unpooled缓冲区

可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提 供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf 实例。表 5-8 列举了这些中最重要的方法。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906062226.png

Unpooled 类还使得 ByteBuf 同样可用于那些并不需要 Netty 的其他组件的非网络项目, 使得其能得益于高性能的可扩展的缓冲区 API。

ByteBufUtil类

ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这个 API 是通用的,并 且和池化无关,所以这些方法已然在分配类的外部实现。

这些静态方法中最有价值的可能就是 hexdump() 方法, 它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记录 ByteBuf 的内容。十 六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的 版本还可以很容易地转换回实际的字节表示。

另一个有用的方法是 boolean equals(ByteBuf, ByteBuf) ,它被用来判断两个 ByteBuf 实例的相等性。如果你实现自己的 ByteBuf 子类,你可能会发现 ByteBufUtil 的其他有用方法。

引用计数

引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的工作。Netty 在第 4 版中为 ByteBuf 和 ByteBufHolder 引入了 引用计数工作,它们都实现了 interface ReferenceCounted 。

引用计数背后的想法并不是特别的复杂;它主要涉及跟踪到某个特定对象的活动引用的数 量。一个 ReferenceCounted 实现的实例将通常以活动的引用计数为 1 作为开始。只要引用计 数大于 0,就能保证对象不会被释放。当活动引用的数量减少到 0 时,该实例就会被释放。注意, 虽然释放的确切语义可能是特定于实现的,但是至少已经释放的对象应该不可再用了。

引用计数对于池化实现(如 PooledByteBufAllocator )来说是至关重要的, 它降低了内存分配的开销。代码清单 5-15 和代码清单 5-16 展示了相关的示例。

代码清单 5-15 引用计数

1
2
3
4
5
6
7
8
9
Channel channel = ...;
// 从 Channel 获取 ByteBufAllocator
ByteBufAllocator allocator = channel.alloc();

....
// 从 ByteBufAllocator 分配一个 ByteBuf
ByteBuf buffer = allocator.directBuffer();
// 检查引用计数是否 为预期的 1
assert buffer.refCnt() == 1;

代码清单 5-16 释放引用计数的对象

1
2
3
4
ByteBuf buffer = ...;
// 减少到该对象的活动引用。当减少到 0 时, 该对象被释放,并且该方法返回 true
boolean released = buffer.release();
...

试图访问一个已经被释放的引用计数的对象,将会导致一个 IllegalReferenceCountException。

注意,一个特定的( ReferenceCounted 的实现)类,可以用它自己的独特方式来定义它 的引用计数规则。例如,我们可以设想一个类,其 release() 方法的实现总是将引用计数设为 零,而不用关心它的当前值,从而一次性地使所有的活动引用都失效。

谁负责释放

一般来说,是由最后访问(引用计数)对象的那一方来负责将它释放。在第 6 章中, 我们将会解释这个概念和 ChannelHandler 以及 ChannelPipeline 的相关性。

使用release注意事项:

  • 只对DirectByteBuf类型的对象进行回收,由于这类数据非jvm控制,和c语言类似,需要显示回收内存。至于回收时间,应当在最后一次使用之后。
  • inbound中的自带实现中,SimpleInbound实现了回收.
  • 还有需要注意的是,ChannleHandlerContext的flush方法是默认release的。

为什么要有引用计数器

Netty里四种主力的ByteBuf,其中UnpooledHeapByteBuf 底下的byte[]能够依赖JVM GC自然回收;而UnpooledDirectByteBuf底下是DirectByteBuffer,如Java堆外内存扫盲贴所述,除了等JVM GC,最好也能主动进行回收;而PooledHeapByteBuf 和PooledDirectByteBuf,则必须要主动将用完的byte[]/ByteBuffer放回池里,否则内存就要爆掉。所以,Netty ByteBuf需要在JVM的GC机制之外,有自己的引用计数器和回收过程。

引用计数器常识

计数器基于 AtomicIntegerFieldUpdater,为什么不直接用AtomicInteger?因为ByteBuf对象很多,如果都把int包一层AtomicInteger花销较大,而AtomicIntegerFieldUpdater只需要一个全局的静态变量。

所有ByteBuf的引用计数器初始值为1。

调用release(),将计数器减1,等于零时, deallocate()被调用,各种回收。

调用retain(),将计数器加1,即使ByteBuf在别的地方被人release()了,在本Class没喊cut之前,不要把它释放掉。

由duplicate(), slice()和order(ByteOrder)所创建的ByteBuf,与原对象共享底下的buffer,也共享引用计数器,所以它们经常需要调用retain()来显示自己的存在。

当引用计数器为0,底下的buffer已被回收,即使ByteBuf对象还在,对它的各种访问操作都会抛出异常。

谁来负责Release

在C时代,我们喜欢让malloc和free成对出现,而在Netty里,因为Handler链的存在,ByteBuf经常要传递到下一个Hanlder去而不复还,所以规则变成了谁是最后使用者,谁负责释放。

另外,更要注意的是各种异常情况,ByteBuf没有成功传递到下一个Hanlder,还在自己地界里的话,一定要进行释放。

小结

本章专门探讨了 Netty 的基于 ByteBuf 的数据容器。我们首先解释了 ByteBuf 相对于 JDK 所提供的实现的优势。我们还强调了该 API 的其他可用变体,并且指出了它们各自最佳适用的特 定用例。

我们讨论过的要点有:

  • 使用不同的读索引和写索引来控制数据访问;
  • 使用内存的不同方式——基于字节数组和直接缓冲区;
  • 通过 CompositeByteBuf 生成多个 ByteBuf 的聚合视图;
  • 数据访问方法——搜索、切片以及复制;
  • 读、写、获取和设置 API;
  • ByteBufAllocator 池化和引用计数

在下一章中,我们将专注于 ChannelHandler ,它为你的数据处理逻辑提供了载体。因为 ChannelHandler 大量地使用了 ByteBuf ,你将开始看到 Netty 的整体架构的各个重要部分最终走到了一起。