目录

ChannelHandler和ChannelPipeline

本章主要内容

  • ChannelHandler API 和 ChannelPipeline API
  • 检测资源泄漏
  • 异常处理

在上一章中你学习了 ByteBuf —Netty 的数据容器。当我们在本章中探讨 Netty 的数据 流以及处理组件时,我们将基于已经学过的东西,并且你将开始看到框架的重要元素都结合到 了一起。

你已经知道,可以在 ChannelPipeline 中将 ChannelHandler 链接在一起以组织处理逻 辑。我们将会研究涉及这些类的各种用例,以及一个重要的关系— ChannelHandlerContext 。

理解所有这些组件之间的交互对于通过 Netty 构建模块化的、可重用的实现至关重要。

ChannelHandler家族

在我们开始详细地学习 ChannelHandler 之前,我们将在 Netty 的组件模型的这部分基础 上花上一些时间。

Channel的生命周期

Interface Channel 定义了一组和 ChannelInboundHandler API 密切相关的简单但功能强大的状态模型,表 6-1 列出了 Channel 的这 4 个状态。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906084628.png

Channel 的正常生命周期如图 6-1 所示。当这些状态发生改变时,将会生成对应的事件。 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler ,其可以随后对它们做出 响应。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906084724.png

ChannelHandler的生命周期

表 6-2 中列出了 interface ChannelHandler 定义的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些 方法中的每一个都接受一个 ChannelHandlerContext 参数。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906084846.png

Netty 定义了下面两个重要的 ChannelHandler 子接口:

  • ChannelInboundHandler ——处理入站数据以及各种状态变化
  • ChannelOutboundHandler ——处理出站数据并且允许拦截所有的操作

在接下来的章节中,我们将详细地讨论这些子接口。

ChannelInboundHandler接口

表 6-3 列出了 interface ChannelInboundHandler 的生命周期方法。这些方法将会在 数据被接收时或者与其对应的 Channel 状态发生改变时被调用。正如我们前面所提到的,这些 方法和 Channel 的生命周期密切相关。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906085139.png

当所有可读的字节都已经从 Channel 中读取之后,将会调用该回调方法channelReadComplete();所以,可能在 channelReadComplete()被调用之前看到多次调用 channelRead(…)

当某个 ChannelInboundHandler 的实现重写 channelRead() 方法时,它将负责显式地 释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCountUtil.release() ,如代码清单 6-1 所示。

代码清单 6-1 释放消息资源

1
2
3
4
5
6
7
8
// 扩展了 ChannelInboundHandlerAdapter
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
      // 丢弃已接 收的消息
        ReferenceCountUtil.release(msg);
    }
}

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现 违规的实例。 但是以这种方式管理资源可能很繁琐。 一个更加简单的方式是使用 Simple- ChannelInboundHandler 。代码清单 6-2 是代码清单 6-1 的一个变体,说明了这一点。

代码清单 6-2 使用 SimpleChannelInboundHandler

1
2
3
4
5
6
7
8
9
// 扩展了 SimpleChannelInboundHandler
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        // 不需要任何显 式的资源释放
        // No need to do anything special
    }
}

由于 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消 息的引用供将来使用,因为这些引用都将会失效。

6.1.6 节为引用处理提供了更加详细的讨论。

ChannelOutboundHandler接口

出站操作和数据将由 ChannelOutboundHandler 处理。它的方法将被 Channel 、ChannelPipeline 以及 ChannelHandlerContext 调用。

ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲 刷操作并在稍后继续。

表6-4 显示了所有由 ChannelOutboundHandler 本身所定义的方法(忽略了那些从ChannelHandler 继承的方法)。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906085952.png

ChannelPromise与ChannelFuture

ChannelOutboundHandler 中的大部分方法都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。 ChannelPromise 是 ChannelFuture 的一个 子类,其定义了一些可写的方法,如 setSuccess() 和 setFailure() ,从而使 ChannelFuture 不可变

这里借鉴的是 Scala 的 Promise 和 Future 的设计,当一个 Promise 被完成之后,其对应的 Future 的值便 不能再进行任何修改了。

接下来我们将看一看那些简化了编写 ChannelHandler 的任务的类。

ChannelHandler适配器

你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter ,它们 获得了它们共同的超接口 ChannelHandler 的方法。生成的类的层次结构如图 6-2 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906090137.png

ChannelHandlerAdapter 还提供了实用方法 isSharable() 。如果其对应的实现被标 注为 Sharable ,那么这个方法将返回 true ,表示它可以被添加到多个 ChannelPipeline 中(如在 2.3.1 节中所讨论过的一样)。

在 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所 提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法,从而将事件转发到 了 ChannelPipeline 中的下一个 ChannelHandler 中。

你要想在自己的 ChannelHandler 中使用这些适配器类,只需要简单地扩展它们,并且重写那些你想要自定义的方法。

资源管理

每当通过调用 ChannelInboundHandler.channelRead() 或者 ChannelOutboundHandler.write() 方法来处理数据时,你都需要确保没有任何的资源泄漏。你可能还记得在前面的章节中所提到的, Netty 使用引用计数来处理池化的 ByteBuf 。 所以在完全使用完某个 ByteBuf 后,调整其引用计数是很重要的。

为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了 class ResourceLeakDetector , 它将对你应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。相关的开销是非常小的。

ResourceLeakDetector, 其利用了 JDK 提供的 PhantomReference<T>类来实现这一点。

如果检测到了内存泄露,将会产生类似于下面的日志消息:

1
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=ADVANCED' or call ResourceLeakDetector.setLevel().

Netty 目前定义了 4 种泄漏检测级别,如表 6-5 所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906090604.png

泄露检测级别可以通过将下面的 Java 系统属性设置为表中的一个值来定义:

1
java -Dio.netty.leakDetectionLevel=ADVANCED

如果带着该 JVM 选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲 区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:

https://gitee.com/lienhui68/picStore/raw/master/null/20200906090802.png

实现 ChannelInboundHandler.channelRead() 和 ChannelOutboundHandler.write() 方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的 channelRead() 操作直接消费 入站消息的情况;也就是说,它不会通过调用 ChannelHandlerContext.fireChannelRead() 方法将入站消息转发给下一个 ChannelInboundHandler 。代码清单 6-3 展示了如何释放消息。

channelRead直接消费入栈消息,不会再转发给下一个ChannelInboundHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

代码清单 6-3 消费并释放入站消息

1
2
3
4
5
6
7
8
// 扩展了 ChannelInboundHandlerAdapter
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
      // 通过调用 ReferenceCountUtil.release() 方法释放资源
        ReferenceCountUtil.release(msg);
    }
}

消费入站消息的简单方式

由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被称为SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消息被 channelRead0() 方法消费之后自动释放消息。

在出站方向这边,如果你处理了 write() 操作并丢弃了一个消息,那么你也应该负责释放 它。代码清单 6-4 展示了一个丢弃所有的写入数据的实现。

代码清单 6-4 丢弃并释放出站消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 扩展了 ChannelOutboundHandlerAdapter
@Sharable
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx,
                      Object msg, ChannelPromise promise) {
        // 通过使用 ReferenceCountUtil.realse(...)方法释放资源
        ReferenceCountUtil.release(msg);
        // 通知 ChannelPromise 数据已经被处理了
        promise.setSuccess();
    }
}

重要的是,不仅要释放资源,还要通知 ChannelPromise 。否则可能会出现 ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。

总之,如果一个消息被消费或者丢弃了,并且没有传递给 ChannelPipeline 中的下一个 ChannelOutboundHandler ,那么用户就有责任调用 ReferenceCountUtil.release() 。 如果消息到达了实际的传输层,那么当它被写入时或者 Channel 关闭时,都将被自动释放。

ChannelPipeline接口

如果你认为 ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链, 那么就很容易看出这些 ChannelHandler 之间的交互是如何组成一个应用 程序数据和事件处理逻辑的核心的。

每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline 。这项关联是永久性 的; Channel 既不能附加另外一个 ChannelPipeline ,也不能分离其当前的。在 Netty 组件 的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个 ChannelHandler 。

ChannelHandlerContext

ChannelHandlerContext 使得 ChannelHandler 能够和它的 ChannelPipeline 以及其他的 ChannelHandler 交 互 。 ChannelHandler 可以通知其所属的 ChannelPipeline 中的下一个ChannelHandler ,甚至可以动态修改它所属的 ChannelPipeline 中 ChannelHandler 的编排。

ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。6.3 节将提供有关 ChannelHandlerContext 的更多内容。

图 6-3 展示了一个典型的同时具有入站和出站 ChannelHandler 的 ChannelPipeline 的布 局,并且印证了我们之前的关于 ChannelPipeline 主要由一系列的 ChannelHandler 所组成的 说法。 ChannelPipeline 还提供了通过 ChannelPipeline 本身传播事件的方法。如果一个入站 事件被触发,它将被从 ChannelPipeline 的头部开始一直被传播到 Channel Pipeline 的尾端。 在图 6-3 中,一个出站 I/O 事件将从 ChannelPipeline 的最右边开始,然后向左传播。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906092109.png

ChannelPipeline 相对论

你可能会说,从事件途经 ChannelPipeline 的角度来看, ChannelPipeline 的头部和尾端取 决于该事件是入站的还是出站的。然而 Netty 总是将 ChannelPipeline 的入站口(图 6-3 中的左侧) 作为头部,而将出站口(该图的右侧)作为尾端。

当你完成了通过调用 ChannelPipeline.add*() 方法将入站处理器( ChannelInboundHandler ) 和 出 站 处 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 后 , 每 一 个 ChannelHandler 从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图 6-3 中 的处理器( ChannelHandler )从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler 将是 1,而第一个被出站事件看到的 ChannelHandler 将是 5。

在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。 如果不匹配, ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutbound- Handler 接口。)

修改ChannelPipeline

ChannelHandler 可以通过ChannelPipeline添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline 的布局。(它也可以将它自己从 ChannelPipeline 中移除。)这是 ChannelHandler 最重要的能力之一,所以我们将仔细地来看看它是如何做到的。表 6-6 列出了相关的方法。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906092351.png

代码清单 6-5 展示了这些方法的使用。

代码清单 6-5 修改 ChannelPipeline

https://gitee.com/lienhui68/picStore/raw/master/null/20200906092454.png

稍后,你将看到,重组 ChannelHandler 的这种能力使我们可以用它来轻松地实现极其灵活的逻辑。

ChannelHandler 的执行和阻塞

通常 ChannelPipeline 中的每一个 ChannelHandler 都是通过它的 EventLoop (I/O 线程)来处 理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。

但有时可能需要与那些使用阻塞 API 的遗留代码进行交互。对于这种情况, ChannelPipeline 有一些 接受一个 EventExecutorGroup 的 add() 方法。如果一个事件被传递给一个自定义的 EventExecutorGroup ,它将被包含在这个 EventExecutorGroup 中的某个 EventExecutor 所处理,从而被从该 Channel 本身的 EventLoop 中移除。对于这种用例,Netty 提供了一个叫 DefaultEventExecutorGroup 的默认实现。

除了这些操作,还有别的通过类型或者名称来访问ChannelHandler 的方法。这些方法都 列在了表 6-7 中。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906092805.png

触发事件

ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。表 6-8 列出了入 站操作,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906092902.png

在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。表 6-9 列出了 ChannelPipeline API 的出站操作。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906092947.png

总结一下:

  • ChannelPipeline 保存了与Channel 相关联的 ChannelHandler
  • ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改;
  • ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。

ChannelHandlerContext接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关 联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext 。 ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在 同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。

下面的部分代码验证了上面这段说法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// ChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...
    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
}
// ChannelHandlerContext 链表
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            io.netty.channel.DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这 些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

表 6-10 对 ChannelHandlerContext API 进行了总结。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906094515.png

read()方法,通过配合 ChannelConfig.setAutoRead(boolean autoRead)方法,可以实现反应式系统的特性 之一回压(back-pressure)。

当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:

  • ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
  • 如同我们在本节开头所解释的一样,相对于其他类的同名方法, ChannelHandler Context 的方法将产生更短的事件流s,应该尽可能地利用这个特性来获得最大的性能。

使用ChannelHandlerContext

在这一节中我们将讨论 ChannelHandlerContext 的用法,以及存在于 ChannelHandlerContext 、 Channel 和 ChannelPipeline 上的方法的行为。图 6-4 展示了它们之间的关系。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906094802.png

在代码清单 6-6 中,将通过 ChannelHandlerContext 获取到 Channel 的引用。调用 Channel 上的 write() 方法将会导致写入事件从尾端到头部地流经 ChannelPipeline 。

代码清单 6-6 从 ChannelHandlerContext 访问 Channel

1
2
3
4
5
ChannelHandlerContext ctx = ..;
// 获取到与 ChannelHandlerContext 相关联的 Channel 的引用
Channel channel = ctx.channel();
// 通过 Channel 写入 缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

代码清单 6-7 展示了一个类似的例子,但是这一次是写入 ChannelPipeline 。我们再次看 到,(到 ChannelPipline 的)引用是通过 ChannelHandlerContext 获取的。

1
2
3
4
5
ChannelHandlerContext ctx = ..;
// 获取到与 ChannelHandlerContext 相关联的 ChannelPipeline 的引用
ChannelPipeline pipeline = ctx.pipeline();
// 通过 ChannelPipeline 写入 缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

如同在图 6-5 中所能够看到的一样,代码清单 6-6 和代码清单 6-7 中的事件流是一样的。重要的是要注意到,虽然被调用的 Channel 或 ChannelPipeline 上的 write() 方法将一直传播事件通 过整个 ChannelPipeline ,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler 到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906095128.png

为什么会想要从 ChannelPipeline 中的某个特定点开始传播事件呢?

  • 为了减少将事件传经对它不感兴趣的 ChannelHandler 所带来的开销。
  • 为了避免将事件传经那些可能会对它感兴趣的 ChannelHandler 。

要想调用从某个特定的 ChannelHandler 开始的处理过程,必须获取到在( ChannelPipeline )该 ChannelHandler 之前的 ChannelHandler 所关联的 ChannelHandlerContext 。这个 ChannelHandlerContext 将调用和它所关联的 ChannelHandler 之后的 ChannelHandler 。

代码清单 6-8 和图 6-6 说明了这种用法。

代码清单 6-8 调用 ChannelHandlerContext 的 write() 方法

1
2
3
4
// 获取到 ChannelHandlerContext 的引用
ChannelHandlerContext ctx = ..;
// write()方法将把缓冲区数据发送 到下一个 ChannelHandler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

如图 6-6 所示,消息将从下一个 ChannelHandler 开始流经 ChannelPipeline,绕过了所有前面的 ChannelHandler 。

https://gitee.com/lienhui68/picStore/raw/master/null/20200906095506.png

我们刚才所描述的用例是常见的, 对于调用特定的 ChannelHandler 实例上的操作尤其有用。

ChannelHandler和ChannelHandlerContext的高级用法

正如我们在代码清单 6-6 中所看到的,你可以通过调用 ChannelHandlerContext 上的 pipeline() 方法来获得被封闭的 ChannelPipeline 的引用。 这使得运行时得以操作 ChannelPipeline 的 ChannelHandler ,我们可以利用这一点来实现一些复杂的设计。例如, 你可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换

另一种高级的用法是缓存到 ChannelHandlerContext 的引用以供稍后使用,这可能会发 生在任何的 ChannelHandler 方法之外,甚至来自于不同的线程。代码清单 6-9 展示了用这种模式来触发事件。

代码清单 6-9 缓存到 ChannelHandlerContext 的引用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class WriteHandler extends ChannelHandlerAdapter {
    private ChannelHandlerContext ctx;

    @Override
    public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) {
        // 存储到 ChannelHandlerContext 的引用以供稍后使用
        this.ctx = ctx;
    }

    public void send(String msg) {
        // 使用之前存储的到 ChannelHandlerContext的引用来发送消息
        ctx.writeAndFlush(msg);
    }
}

因为一个 ChannelHandler 可以从属于多个 ChannelPipeline ,所以它也可以绑定到多 个 ChannelHandlerContext 实例。对于这种用法指在多个 ChannelPipeline 中共享同一 个 ChannelHandler ,对应的 ChannelHandler 必须要使用 @Sharable 注解标注;否则, 试图将它添加到多个 ChannelPipeline 时将会触发异常。显而易见,为了安全地被用于多个 并发的 Channel (即连接),这样的ChannelHandler 必须是线程安全的。

代码清单 6-10 展示了这种模式的一个正确实现。

代码清单 6-10 可共享的 ChannelHandler

1
2
3
4
5
6
7
8
9
@Sharable // 使用注解 @Sharable 标注
public class SharableHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Channel read message: " + msg);
        // 记录方法调用,并转发给下一个 ChannelHandler
        ctx.fireChannelRead(msg);
    }
}

前面的 ChannelHandler 实现符合所有的将其加入到多个 ChannelPipeline 的需求, 即它使用了注解 @Sharable 标注,并且也不持有任何的状态。相反,代码清单 6-11 中的实现将 会导致问题。

代码清单 6-11 @Sharable 的错误用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Sharable // 使用注解 @Sharable 标注
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
    private int count;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 将 count 字段的值加 1
        count++;
        // 记录方法调用,并转发给下一个ChannelHandler
        System.out.println("channelRead(...) called the " + count + " time");
        ctx.fireChannelRead(msg);
    }
}

这段代码的问题在于它拥有状态 ,即用于跟踪方法调用次数的实例变量 count 。将这个类 的一个实例添加到 ChannelPipeline 将极有可能在它被多个并发的 Channel 访问时导致问 题。(当然,这个简单的问题可以通过使 channelRead() 方法变为同步方法来修正。)

主要的问题在于,对于其所持有的状态的修改并不是线程安全的,比如也可以通过使用 AtomicInteger 来规避这个问题。

总之,只应该在确定了你的 ChannelHandler 是线程安全的时才使用 @Sharable 注解。

为何要共享同一个ChannelHandler

在多个ChannelPipeline中安装同一个 ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。

我们对于 ChannelHandlerContext 和它与其他的框架组件之间的关系的讨论到此就结束了。接下来我们将看看异常处理。

异常处理

异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现。因此,Netty 提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。这一节将帮助你了解如何设计 最适合你需要的方式。

处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler 里被触发的那一点开始流经 ChannelPipeline 。要想处理这种类型的入站异常,你需要在你 的 ChannelInboundHandler 实现中重写下面的方法。

1
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception

代码清单 6-12 展示了一个简单的示例,其关闭了 Channel 并打印了异常的栈跟踪信息。

代码清单 6-12 基本的入站异常处理

1
2
3
4
5
6
7
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻 辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站 异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。

你应该如何响应异常,可能很大程度上取决于你的应用程序。你可能想要关闭 Channel (和 连接),也可 能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑(或者没有消费该异常), 那么Netty将会记录该异常没有被处理的事实 。

Netty 将会通过 Warning 级别的日志记录该异常到达了 ChannelPipeline 的尾端,但没有被处理, 并尝试释放该异常。

总结一下:

  • ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给 ChannelPipeline 中的下一个 ChannelHandler ;
  • 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
  • 要想定义自定义的处理逻辑,你需要重写 exceptionCaught() 方法。然后你需要决定是否需要将该异常传播出去。

处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。

  • 每个出站操作都将返回一个 ChannelFuture 。注册到 ChannelFuture 的ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。

  • 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise的实例。作为 ChannelFuture 的子类。ChannelPromise 也可以被分配用于异步通知的监听器。但是, ChannelPromise 还具有提供立即通知的可写方法:

    1
    2
    
    ChannelPromise setSuccess(); 
    ChannelPromise setFailure(Throwable cause);
    

添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener (ChannelFutureListener) 方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是, 调用出站操作(如 write() 方法)所返回的 ChannelFuture 上的 addListener() 方法。

代码清单 6-13 使用了这种方式来添加 ChannelFutureListener ,它将打印栈跟踪信息 并且随后关闭 Channel 。

代码清单 6-13 添加 ChannelFutureListener 到 ChannelFuture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) {
        if (!f.isSuccess()) {
            f.cause().printStackTrace();
            f.channel().close();
        }
    }
});

第二种方式是将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOut- boundHandler 的方法的 ChannelPromise 。代码清单 6-14 中所展示的代码和代码清单 6-13 中所展示的具有相同的效果。

代码清单 6-14 添加 ChannelFutureListener 到 ChannelPromise

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
                      ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                if (!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
}

ChannelPromise 的可写方法

通过调用 ChannelPromise 上的 setSuccess() 和 setFailure() 方法,可以使一个操作的状态在 ChannelHandler 的方法返回给其调用者时便即刻被感知到。

为何选择一种方式而不是另一种呢?对于细致的异常处理,你可能会发现,在调用出站操 作时添加 ChannelFutureListener 更合适,如代码清单 6-13 所示。而对于一般的异常处 理,你可能会发现,代码清单 6-14 所示的自定义的 ChannelOutboundHandler 实现的方式 更加的简单。

如果你的 ChannelOutboundHandler 本身抛出了异常会发生什么呢?在这种情况下, Netty 本身会通知任何已经注册到对应 ChannelPromise 的监听器。

小结

在本章中我们仔细地研究了 Netty 的数据处理组件— ChannelHandler 。 我们讨论了 ChannelHandler 是如何链接在一起,以及它们是如何作为 ChannelInboundHandler 和 ChannelOutboundHandler 与 ChannelPipeline 进行交互的。

下一章将介绍 Netty 的 EventLoop 和并发模型,这对于理解 Netty 是如何实现异步的、事件驱动的网络编程模型来说至关重要。