ChannelHandler和ChannelPipeline
本章主要内容
- ChannelHandler API 和 ChannelPipeline API
- 检测资源泄漏
- 异常处理
在上一章中你学习了 ByteBuf —Netty 的数据容器。当我们在本章中探讨 Netty 的数据 流以及处理组件时,我们将基于已经学过的东西,并且你将开始看到框架的重要元素都结合到 了一起。
你已经知道,可以在 ChannelPipeline 中将 ChannelHandler 链接在一起以组织处理逻 辑。我们将会研究涉及这些类的各种用例,以及一个重要的关系— ChannelHandlerContext 。
理解所有这些组件之间的交互对于通过 Netty 构建模块化的、可重用的实现至关重要。
ChannelHandler家族
在我们开始详细地学习 ChannelHandler 之前,我们将在 Netty 的组件模型的这部分基础 上花上一些时间。
Channel的生命周期
Interface Channel 定义了一组和 ChannelInboundHandler API 密切相关的简单但功能强大的状态模型,表 6-1 列出了 Channel 的这 4 个状态。
Channel 的正常生命周期如图 6-1 所示。当这些状态发生改变时,将会生成对应的事件。 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler ,其可以随后对它们做出 响应。
ChannelHandler的生命周期
表 6-2 中列出了 interface ChannelHandler 定义的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些 方法中的每一个都接受一个 ChannelHandlerContext 参数。
Netty 定义了下面两个重要的 ChannelHandler 子接口:
- ChannelInboundHandler ——处理入站数据以及各种状态变化;
- ChannelOutboundHandler ——处理出站数据并且允许拦截所有的操作。
在接下来的章节中,我们将详细地讨论这些子接口。
ChannelInboundHandler接口
表 6-3 列出了 interface ChannelInboundHandler 的生命周期方法。这些方法将会在 数据被接收时或者与其对应的 Channel 状态发生改变时被调用。正如我们前面所提到的,这些 方法和 Channel 的生命周期密切相关。
当所有可读的字节都已经从 Channel 中读取之后,将会调用该回调方法channelReadComplete();所以,可能在 channelReadComplete()被调用之前看到多次调用 channelRead(…)
当某个 ChannelInboundHandler 的实现重写 channelRead() 方法时,它将负责显式地 释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCountUtil.release() ,如代码清单 6-1 所示。
代码清单 6-1 释放消息资源
|
|
Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现 违规的实例。 但是以这种方式管理资源可能很繁琐。 一个更加简单的方式是使用 Simple- ChannelInboundHandler 。代码清单 6-2 是代码清单 6-1 的一个变体,说明了这一点。
代码清单 6-2 使用 SimpleChannelInboundHandler
|
|
由于 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消 息的引用供将来使用,因为这些引用都将会失效。
6.1.6 节为引用处理提供了更加详细的讨论。
ChannelOutboundHandler接口
出站操作和数据将由 ChannelOutboundHandler 处理。它的方法将被 Channel 、ChannelPipeline 以及 ChannelHandlerContext 调用。
ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲 刷操作并在稍后继续。
表6-4 显示了所有由 ChannelOutboundHandler 本身所定义的方法(忽略了那些从ChannelHandler 继承的方法)。
ChannelPromise与ChannelFuture
ChannelOutboundHandler 中的大部分方法都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。 ChannelPromise 是 ChannelFuture 的一个 子类,其定义了一些可写的方法,如 setSuccess() 和 setFailure() ,从而使 ChannelFuture 不可变
这里借鉴的是 Scala 的 Promise 和 Future 的设计,当一个 Promise 被完成之后,其对应的 Future 的值便 不能再进行任何修改了。
接下来我们将看一看那些简化了编写 ChannelHandler 的任务的类。
ChannelHandler适配器
你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter ,它们 获得了它们共同的超接口 ChannelHandler 的方法。生成的类的层次结构如图 6-2 所示。
ChannelHandlerAdapter 还提供了实用方法 isSharable() 。如果其对应的实现被标 注为 Sharable ,那么这个方法将返回 true ,表示它可以被添加到多个 ChannelPipeline 中(如在 2.3.1 节中所讨论过的一样)。
在 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所 提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法,从而将事件转发到 了 ChannelPipeline 中的下一个 ChannelHandler 中。
你要想在自己的 ChannelHandler 中使用这些适配器类,只需要简单地扩展它们,并且重写那些你想要自定义的方法。
资源管理
每当通过调用 ChannelInboundHandler.channelRead() 或者 ChannelOutboundHandler.write() 方法来处理数据时,你都需要确保没有任何的资源泄漏。你可能还记得在前面的章节中所提到的, Netty 使用引用计数来处理池化的 ByteBuf 。 所以在完全使用完某个 ByteBuf 后,调整其引用计数是很重要的。
为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了 class ResourceLeakDetector , 它将对你应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。相关的开销是非常小的。
ResourceLeakDetector, 其利用了 JDK 提供的
PhantomReference<T>
类来实现这一点。
如果检测到了内存泄露,将会产生类似于下面的日志消息:
|
|
Netty 目前定义了 4 种泄漏检测级别,如表 6-5 所示。
泄露检测级别可以通过将下面的 Java 系统属性设置为表中的一个值来定义:
|
|
如果带着该 JVM 选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲 区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:
实现 ChannelInboundHandler.channelRead() 和 ChannelOutboundHandler.write() 方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的 channelRead() 操作直接消费 入站消息的情况;也就是说,它不会通过调用 ChannelHandlerContext.fireChannelRead() 方法将入站消息转发给下一个 ChannelInboundHandler 。代码清单 6-3 展示了如何释放消息。
channelRead直接消费入栈消息,不会再转发给下一个ChannelInboundHandler
1 2 3 4 5 6 7 8 9 10 11
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
代码清单 6-3 消费并释放入站消息
|
|
消费入站消息的简单方式
由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被称为SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消息被 channelRead0() 方法消费之后自动释放消息。
在出站方向这边,如果你处理了 write() 操作并丢弃了一个消息,那么你也应该负责释放 它。代码清单 6-4 展示了一个丢弃所有的写入数据的实现。
代码清单 6-4 丢弃并释放出站消息
|
|
重要的是,不仅要释放资源,还要通知 ChannelPromise 。否则可能会出现 ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。
总之,如果一个消息被消费或者丢弃了,并且没有传递给 ChannelPipeline 中的下一个 ChannelOutboundHandler ,那么用户就有责任调用 ReferenceCountUtil.release() 。 如果消息到达了实际的传输层,那么当它被写入时或者 Channel 关闭时,都将被自动释放。
ChannelPipeline接口
如果你认为 ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链, 那么就很容易看出这些 ChannelHandler 之间的交互是如何组成一个应用 程序数据和事件处理逻辑的核心的。
每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline 。这项关联是永久性 的; Channel 既不能附加另外一个 ChannelPipeline ,也不能分离其当前的。在 Netty 组件 的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个 ChannelHandler 。
ChannelHandlerContext
ChannelHandlerContext 使得 ChannelHandler 能够和它的 ChannelPipeline 以及其他的 ChannelHandler 交 互 。 ChannelHandler 可以通知其所属的 ChannelPipeline 中的下一个ChannelHandler ,甚至可以动态修改它所属的 ChannelPipeline 中 ChannelHandler 的编排。
ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。6.3 节将提供有关 ChannelHandlerContext 的更多内容。
图 6-3 展示了一个典型的同时具有入站和出站 ChannelHandler 的 ChannelPipeline 的布 局,并且印证了我们之前的关于 ChannelPipeline 主要由一系列的 ChannelHandler 所组成的 说法。 ChannelPipeline 还提供了通过 ChannelPipeline 本身传播事件的方法。如果一个入站 事件被触发,它将被从 ChannelPipeline 的头部开始一直被传播到 Channel Pipeline 的尾端。 在图 6-3 中,一个出站 I/O 事件将从 ChannelPipeline 的最右边开始,然后向左传播。
ChannelPipeline 相对论
你可能会说,从事件途经 ChannelPipeline 的角度来看, ChannelPipeline 的头部和尾端取 决于该事件是入站的还是出站的。然而 Netty 总是将 ChannelPipeline 的入站口(图 6-3 中的左侧) 作为头部,而将出站口(该图的右侧)作为尾端。
当你完成了通过调用
ChannelPipeline.add*()
方法将入站处理器( ChannelInboundHandler ) 和 出 站 处 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 后 , 每 一 个 ChannelHandler 从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图 6-3 中 的处理器( ChannelHandler )从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler 将是 1,而第一个被出站事件看到的 ChannelHandler 将是 5。
在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。 如果不匹配, ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutbound- Handler 接口。)
修改ChannelPipeline
ChannelHandler 可以通过ChannelPipeline添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline 的布局。(它也可以将它自己从 ChannelPipeline 中移除。)这是 ChannelHandler 最重要的能力之一,所以我们将仔细地来看看它是如何做到的。表 6-6 列出了相关的方法。
代码清单 6-5 展示了这些方法的使用。
代码清单 6-5 修改 ChannelPipeline
稍后,你将看到,重组 ChannelHandler 的这种能力使我们可以用它来轻松地实现极其灵活的逻辑。
ChannelHandler 的执行和阻塞
通常 ChannelPipeline 中的每一个 ChannelHandler 都是通过它的 EventLoop (I/O 线程)来处 理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。
但有时可能需要与那些使用阻塞 API 的遗留代码进行交互。对于这种情况, ChannelPipeline 有一些 接受一个 EventExecutorGroup 的 add() 方法。如果一个事件被传递给一个自定义的 EventExecutorGroup ,它将被包含在这个 EventExecutorGroup 中的某个 EventExecutor 所处理,从而被从该 Channel 本身的 EventLoop 中移除。对于这种用例,Netty 提供了一个叫 DefaultEventExecutorGroup 的默认实现。
除了这些操作,还有别的通过类型或者名称来访问ChannelHandler 的方法。这些方法都 列在了表 6-7 中。
触发事件
ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。表 6-8 列出了入 站操作,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件。
在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。表 6-9 列出了 ChannelPipeline API 的出站操作。
总结一下:
- ChannelPipeline 保存了与Channel 相关联的 ChannelHandler
- ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改;
- ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。
ChannelHandlerContext接口
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关 联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext 。 ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在 同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。
下面的部分代码验证了上面这段说法:
|
|
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这 些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。
表 6-10 对 ChannelHandlerContext API 进行了总结。
read()方法,通过配合 ChannelConfig.setAutoRead(boolean autoRead)方法,可以实现反应式系统的特性 之一回压(back-pressure)。
当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:
- ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
- 如同我们在本节开头所解释的一样,相对于其他类的同名方法, ChannelHandler Context 的方法将产生更短的事件流s,应该尽可能地利用这个特性来获得最大的性能。
使用ChannelHandlerContext
在这一节中我们将讨论 ChannelHandlerContext 的用法,以及存在于 ChannelHandlerContext 、 Channel 和 ChannelPipeline 上的方法的行为。图 6-4 展示了它们之间的关系。
在代码清单 6-6 中,将通过 ChannelHandlerContext 获取到 Channel 的引用。调用 Channel 上的 write() 方法将会导致写入事件从尾端到头部地流经 ChannelPipeline 。
代码清单 6-6 从 ChannelHandlerContext 访问 Channel
|
|
代码清单 6-7 展示了一个类似的例子,但是这一次是写入 ChannelPipeline 。我们再次看 到,(到 ChannelPipline 的)引用是通过 ChannelHandlerContext 获取的。
|
|
如同在图 6-5 中所能够看到的一样,代码清单 6-6 和代码清单 6-7 中的事件流是一样的。重要的是要注意到,虽然被调用的 Channel 或 ChannelPipeline 上的 write() 方法将一直传播事件通 过整个 ChannelPipeline ,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler 到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的。
为什么会想要从 ChannelPipeline 中的某个特定点开始传播事件呢?
- 为了减少将事件传经对它不感兴趣的 ChannelHandler 所带来的开销。
- 为了避免将事件传经那些可能会对它感兴趣的 ChannelHandler 。
要想调用从某个特定的 ChannelHandler 开始的处理过程,必须获取到在( ChannelPipeline )该 ChannelHandler 之前的 ChannelHandler 所关联的 ChannelHandlerContext 。这个 ChannelHandlerContext 将调用和它所关联的 ChannelHandler 之后的 ChannelHandler 。
代码清单 6-8 和图 6-6 说明了这种用法。
代码清单 6-8 调用 ChannelHandlerContext 的 write() 方法
|
|
如图 6-6 所示,消息将从下一个 ChannelHandler 开始流经 ChannelPipeline,绕过了所有前面的 ChannelHandler 。
我们刚才所描述的用例是常见的, 对于调用特定的 ChannelHandler 实例上的操作尤其有用。
ChannelHandler和ChannelHandlerContext的高级用法
正如我们在代码清单 6-6 中所看到的,你可以通过调用 ChannelHandlerContext 上的 pipeline() 方法来获得被封闭的 ChannelPipeline 的引用。 这使得运行时得以操作 ChannelPipeline 的 ChannelHandler ,我们可以利用这一点来实现一些复杂的设计。例如, 你可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换。
另一种高级的用法是缓存到 ChannelHandlerContext 的引用以供稍后使用,这可能会发 生在任何的 ChannelHandler 方法之外,甚至来自于不同的线程。代码清单 6-9 展示了用这种模式来触发事件。
代码清单 6-9 缓存到 ChannelHandlerContext 的引用
|
|
因为一个 ChannelHandler 可以从属于多个 ChannelPipeline ,所以它也可以绑定到多 个 ChannelHandlerContext 实例。对于这种用法指在多个 ChannelPipeline 中共享同一 个 ChannelHandler ,对应的 ChannelHandler 必须要使用 @Sharable 注解标注;否则, 试图将它添加到多个 ChannelPipeline 时将会触发异常。显而易见,为了安全地被用于多个 并发的 Channel (即连接),这样的ChannelHandler 必须是线程安全的。
代码清单 6-10 展示了这种模式的一个正确实现。
代码清单 6-10 可共享的 ChannelHandler
|
|
前面的 ChannelHandler 实现符合所有的将其加入到多个 ChannelPipeline 的需求, 即它使用了注解 @Sharable 标注,并且也不持有任何的状态。相反,代码清单 6-11 中的实现将 会导致问题。
代码清单 6-11 @Sharable
的错误用法
|
|
这段代码的问题在于它拥有状态 ,即用于跟踪方法调用次数的实例变量 count 。将这个类 的一个实例添加到 ChannelPipeline 将极有可能在它被多个并发的 Channel 访问时导致问 题。(当然,这个简单的问题可以通过使 channelRead() 方法变为同步方法来修正。)
主要的问题在于,对于其所持有的状态的修改并不是线程安全的,比如也可以通过使用 AtomicInteger 来规避这个问题。
总之,只应该在确定了你的 ChannelHandler 是线程安全的时才使用 @Sharable
注解。
为何要共享同一个ChannelHandler
在多个ChannelPipeline中安装同一个 ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。
我们对于 ChannelHandlerContext 和它与其他的框架组件之间的关系的讨论到此就结束了。接下来我们将看看异常处理。
异常处理
异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现。因此,Netty 提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。这一节将帮助你了解如何设计 最适合你需要的方式。
处理入站异常
如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler 里被触发的那一点开始流经 ChannelPipeline 。要想处理这种类型的入站异常,你需要在你 的 ChannelInboundHandler 实现中重写下面的方法。
|
|
代码清单 6-12 展示了一个简单的示例,其关闭了 Channel 并打印了异常的栈跟踪信息。
代码清单 6-12 基本的入站异常处理
|
|
因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻 辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站 异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。
你应该如何响应异常,可能很大程度上取决于你的应用程序。你可能想要关闭 Channel (和 连接),也可 能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑(或者没有消费该异常), 那么Netty将会记录该异常没有被处理的事实 。
Netty 将会通过 Warning 级别的日志记录该异常到达了 ChannelPipeline 的尾端,但没有被处理, 并尝试释放该异常。
总结一下:
- ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给 ChannelPipeline 中的下一个 ChannelHandler ;
- 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
- 要想定义自定义的处理逻辑,你需要重写 exceptionCaught() 方法。然后你需要决定是否需要将该异常传播出去。
处理出站异常
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。
-
每个出站操作都将返回一个 ChannelFuture 。注册到 ChannelFuture 的ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。
-
几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise的实例。作为 ChannelFuture 的子类。ChannelPromise 也可以被分配用于异步通知的监听器。但是, ChannelPromise 还具有提供立即通知的可写方法:
1 2
ChannelPromise setSuccess(); ChannelPromise setFailure(Throwable cause);
添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener (ChannelFutureListener) 方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是, 调用出站操作(如 write() 方法)所返回的 ChannelFuture 上的 addListener() 方法。
代码清单 6-13 使用了这种方式来添加 ChannelFutureListener ,它将打印栈跟踪信息 并且随后关闭 Channel 。
代码清单 6-13 添加 ChannelFutureListener 到 ChannelFuture
|
|
第二种方式是将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOut- boundHandler 的方法的 ChannelPromise 。代码清单 6-14 中所展示的代码和代码清单 6-13 中所展示的具有相同的效果。
代码清单 6-14 添加 ChannelFutureListener 到 ChannelPromise
|
|
ChannelPromise 的可写方法
通过调用 ChannelPromise 上的 setSuccess() 和 setFailure() 方法,可以使一个操作的状态在 ChannelHandler 的方法返回给其调用者时便即刻被感知到。
为何选择一种方式而不是另一种呢?对于细致的异常处理,你可能会发现,在调用出站操 作时添加 ChannelFutureListener 更合适,如代码清单 6-13 所示。而对于一般的异常处 理,你可能会发现,代码清单 6-14 所示的自定义的 ChannelOutboundHandler 实现的方式 更加的简单。
如果你的 ChannelOutboundHandler 本身抛出了异常会发生什么呢?在这种情况下, Netty 本身会通知任何已经注册到对应 ChannelPromise 的监听器。
小结
在本章中我们仔细地研究了 Netty 的数据处理组件— ChannelHandler 。 我们讨论了 ChannelHandler 是如何链接在一起,以及它们是如何作为 ChannelInboundHandler 和 ChannelOutboundHandler 与 ChannelPipeline 进行交互的。
下一章将介绍 Netty 的 EventLoop 和并发模型,这对于理解 Netty 是如何实现异步的、事件驱动的网络编程模型来说至关重要。