目录

Netty源码分析之EventLoop

原文:https://www.jianshu.com/p/13a6e935d0c3

我们知道, 一个 Netty 程序启动时, 至少要指定一个 EventLoopGroup(如果使用到的是 NIO, 那么通常是 NioEventLoopGroup), 那么这个 NioEventLoopGroup 在 Netty 中到底扮演着什么角色呢? Netty 是 Reactor 模型的一个实现, 那么首先从 Reactor 的线程模型开始.

Reactor线程模型

  • 单线程模型
  • 多线程模型
  • 主从多线程模型

单线程模型

https://gitee.com/lienhui68/picStore/raw/master/null/20200906144431.png

所谓单线程, 即 acceptor 处理和 handler 处理都在一个线程中处理. 这个模型的坏处显而易见: 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了). 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少.

多线程模型

那么什么是 多线程模型 呢? Reactor 的多线程模型与单线程模型的区别就是 acceptor 是一个单独的线程处理, 并且有一组特定的 NIO 线程来负责各个客户端连接的 IO 操作. Reactor 多线程模型如下:

https://gitee.com/lienhui68/picStore/raw/master/null/20200906144804.png

Reactor 多线程模型 有如下特点:

  • 有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求.
  • 客户端连接的 IO 操作都是由一个特定的 NIO 线程池负责. 每个客户端连接都与一个特定的 NIO 线程绑定, 因此在这个客户端连接的生命周期中所有 IO 操作都是在同一个线程中完成的.
  • 客户端连接有很多, 但是 NIO 线程数是比较少的, 因此一个 NIO 线程可以同时绑定到多个客户端连接中.

主从多线程模型

一般情况下, Reactor 的多线程模式已经可以很好的工作了, 但是我们考虑一下如下情况: 如果我们的服务器需要同时处理大量的客户端连接请求或我们需要在客户端连接时, 进行一些权限的检查, 那么单线程的 Acceptor 很有可能就处理不过来, 造成了大量的客户端不能连接到服务器.Reactor 的主从多线程模型就是在这样的情况下提出来的, 它的特点是: 服务器端接收客户端的连接请求不再是一个线程, 而是由一个独立的线程池组成. 它的线程模型如下:

https://gitee.com/lienhui68/picStore/raw/master/null/20200906145057.png

可以看到, Reactor 的主从多线程模型和 Reactor 多线程模型很类似, 只不过 Reactor 的主从多线程模型的 acceptor 使用了线程池来处理大量的客户端请求.

NioEventLoopGroup 与 Reactor 线程模型的对应

我们介绍了三种 Reactor 的线程模型, 那么它们和 NioEventLoopGroup 又有什么关系呢? 其实, 不同的设置 NioEventLoopGroup 的方式就对应了不同的 Reactor 的线程模型.

单线程模型

1
2
3
4
5
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class)
 ...

注意, 我们实例化了一个 NioEventLoopGroup, 构造器参数是1, 表示 NioEventLoopGroup 的线程池大小是1. 然后接着我们调用 b.group(bossGroup) 设置了服务器端的 EventLoopGroup. 有些朋友可能会有疑惑: 我记得在启动服务器端的 Netty 程序时, 是需要设置 bossGroup 和 workerGroup 的, 为什么这里就只有一个 bossGroup? 其实很简单, ServerBootstrap 重写了 group 方法:

1
2
3
4
@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

因此当传入一个 group 时, 那么 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup 了. 这时候呢, 因为 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup, 并且这个 NioEventLoopGroup 只有一个线程, 这样就会导致 Netty 中的 acceptor 和后续的所有客户端连接的 IO 操作都是在一个线程中处理的。那么对应到 Reactor 的线程模型中, 我们这样设置 NioEventLoopGroup 时, 就相当于 Reactor 单线程模型。

多线程模型

同理, 再来看一下下面的例子:

1
2
3
4
5
6
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

bossGroup 中只有一个线程, 而 workerGroup 中的线程是 CPU 核心数乘以2, 因此对应的到 Reactor 线程模型中, 我们知道, 这样设置的 NioEventLoopGroup 其实就是 Reactor 多线程模型.

主从多线程模型

1
2
3
4
5
6
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

其实在Netty 的服务器端的 acceptor 阶段, 没有使用到多线程, 因此上面的 主从多线程模型在 Netty 的服务器端是不存在的. 服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费. 那么Netty 中的 bossGroup 为什么使用线程池的原因大家众所纷纭,在stackoverflow有人说:

the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps

多个引导类,意味着多个监听端口。

NioEventLoopGroup

我们再来简单回顾下NioEventLoopGroup 的初始化过程,

https://gitee.com/lienhui68/picStore/raw/master/null/20200906150227.png

  • EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池

  • 如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2

  • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组

  • 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.

  • NioEventLoop 属性——SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider;

    1
    2
    3
    
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
    
  • NioEventLoop 属性——Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

NioEventLoop

NioEventLoop 继承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 因此我们可以认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改变. NioEventLoop 类层次结构

NioEventLoop 的类层次结构图还是比较复杂的, 不过我们只需要关注几个重要的点即可. 首先 NioEventLoop 的继承链如下:

1
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行 通常来说, NioEventLoop 肩负着两种任务, 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等; 而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

https://gitee.com/lienhui68/picStore/raw/master/null/20200906151837.png

从上图可以看到, SingleThreadEventExecutor 有一个名为 thread 的 Thread 类型字段, 这个字段就代表了与 SingleThreadEventExecutor 关联的本地线程.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 省略清理代码
                ...
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    taskQueue = newTaskQueue();
}

在 SingleThreadEventExecutor 构造器中, 通过 threadFactory.newThread 创建了一个新的 Java 线程. 在这个线程中所做的事情主要就是调用 SingleThreadEventExecutor.this.run() 方法, 而因为 NioEventLoop 实现了这个方法, 因此根据多态性, 其实调用的是 NioEventLoop.run() 方法.

EventLoop 与 Channel 的关联

Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联, 它们的关联过程如下:

https://gitee.com/lienhui68/picStore/raw/master/null/20200906151914.png

从上图中我们可以看到, 当调用了 AbstractChannel#AbstractUnsafe.register 后, 就完成了 Channel 和 EventLoop 的关联. register 实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 删除条件检查.
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

在 AbstractChannel#AbstractUnsafe.register 中, 会将一个 EventLoop 赋值给 AbstractChannel 内部的 eventLoop 字段, 到这里就完成了 EventLoop 与 Channel 的关联过程.

EventLoop 的启动

在前面我们已经知道了,NioEventLoop 本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。然后让我们重温下AbstractBootstrap.initAndRegister(),

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel(); // 创建一个Channel
        init(channel); // 初始化
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    ChannelFuture regFuture = config().group().register(channel); // group().register(channel)
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

在跟踪register(channel);方法直到AbstractChannel.register():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    。。。
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

一路从 Bootstrap.bind 方法跟踪到 AbstractChannel#AbstractUnsafe.register 方法, 整个代码都是在主线程中运行的, 因此上面的 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute. eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute,在execute中最后会调用到doStartThread方法会启动NioEventLoop绑定的java本地线程。总的来说,当EventLoop.execute第一次被调用时,就会触发doStartThread的调用,进而导致了EventLoop所对应的Java线程的启动。

SingleThreadEventExecutor#doStartThread

netty的IO事件的循环处理

回忆下nio中selector的使用流程

  1. 通过 Selector.open() 打开一个 Selector.
  2. 将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
  3. 循环做以下流程:
    1. 调用 select() 方法
    2. 调用 selector.selectedKeys() 获取 selected keys
    3. 迭代每个 selected key:
      1. 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
      2. 判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 “SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()” 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
      3. 根据需要更改 selected key 的监听事件.
      4. 将已经处理过的 key 从 selected keys 集合中删除.

netty的IO事件的循环处理

  1. 打开selector,在NioEventLoop初始化的过程中已经实现:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector(); // 打开selector
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    
  2. channel注册到selector中,在Bootstrap.initAndRegister或者ServerBootstrap.initAndRegister中也已经实现;

  3. NioEventLoop中的run循环

    在之前已经提到当EventLoop.execute第一次被调用时, 就会触发SingleThreadEventExecutor .doStartThread的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动. doStartThread方法中的run方法主要工作就是调用了 SingleThreadEventExecutor.this.run() 方法. 而SingleThreadEventExecutor.run() 是一个抽象方法, 它的实现在 NioEventLoop 中。那么重点自然是在NioEventLoop的run方法中:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
       
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    可以看到for (;;)是个死循环,就是NioEventLoop事件循环的秘密所在。selector第三步就在这里实现。去除细枝节,接下来看select()方法的调用是在哪里:

    1
    2
    3
    4
    5
    6
    
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        ...
        int selectedKeys = selector.select(timeoutMillis);
        ...     
    }
    

    这边说明下:select()方法会一直阻塞直到有数据ready,selectNow()则会立即返回,selector.select(timeoutMillis), 而这个调用是会阻塞住当前线程的, timeoutMillis 是阻塞的超时时间。这里其实也比较好理解,当hasTask()为false时走SelectStrategy.SELECT分支,没有任务的话,可以阻塞等待IO就绪事件。等到有事件就绪后,就是需要获取selected keys,然后针对每一种key进行事件处理。沿着代码路径看:

    run–>processSelectedKeys()–>processSelectedKeysOptimized()

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i]
            selectedKeys.keys[i] = null;
       
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
       
            if (needsToSelectAgain) {
                selectedKeys.reset(i + 1);
       
                selectAgain();
                i = -1;
            }
        }
    }
    

    在这个方法中通过selectedKeys.keys[i]获取到获取selected keys,根据key处理事件则在processSelectedKey方法中。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }
       
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
       
                unsafe.finishConnect();
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    processSelectedKey 中处理了三个事件, 分别是:

    • OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.

    • OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.

    • OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态. 很建议研读OP_READ事件中的unsafe.read()源码,对理解EventLoop很有帮助。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      
          private final class NioMessageUnsafe extends AbstractNioUnsafe {
           
              private final List<Object> readBuf = new ArrayList<Object>();
           
              @Override
              public void read() {
                  assert eventLoop().inEventLoop();
                  final ChannelConfig config = config();
                  final ChannelPipeline pipeline = pipeline();
                  final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                  allocHandle.reset(config);
           
                  boolean closed = false;
                  Throwable exception = null;
                  try {
                      try {
                          do {
                              int localRead = doReadMessages(readBuf);
                              if (localRead == 0) {
                                  break;
                              }
                              if (localRead < 0) {
                                  closed = true;
                                  break;
                              }
           
                              allocHandle.incMessagesRead(localRead);
                          } while (allocHandle.continueReading());
                      } catch (Throwable t) {
                          exception = t;
                      }
           
                      int size = readBuf.size();
                      for (int i = 0; i < size; i ++) {
                          readPending = false;
                          pipeline.fireChannelRead(readBuf.get(i));
                      }
                      readBuf.clear();
                      allocHandle.readComplete();
                      pipeline.fireChannelReadComplete();
           
                      if (exception != null) {
                          closed = closeOnReadError(exception);
           
                          pipeline.fireExceptionCaught(exception);
                      }
           
                      if (closed) {
                          inputShutdown = true;
                          if (isOpen()) {
                              close(voidPromise());
                          }
                      }
                  } finally {
                      // Check if there is a readPending which was not processed yet.
                      // This could be for two reasons:
                      // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                      // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                      //
                      // See https://github.com/netty/netty/issues/2254
                      if (!readPending && !config.isAutoRead()) {
                          removeReadOp();
                      }
                  }
              }
          }
      

      重点代码是int localRead = doReadMessages(readBuf);和pipeline.fireChannelRead(readBuf.get(i)); read()中实现了:

      • 分配 ByteBuf
      • 从 SocketChannel 中读取数据
      • 调用 pipeline.fireChannelRead 发送一个 inbound 事件.

任务队列机制

在Netty 中,一个 NioEventLoop 通常需要肩负起两种任务,第一个是作为IO线程,处理 IO 操作;第二个就是作为任务线程,处理taskQueue中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制的。任务队列机制分为两部分:任务的添加和任务的执行。

任务的添加

NioEventLoop 继承于 SingleThreadEventExecutor, 而SingleThreadEventExecutor 中有一个 Queue<Runnable> taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每个 Task 都使用一个实现了 Runnable 接口的实例来表示。例如当我们需要将一个 Runnable 添加到 taskQueue 中时, 我们可以进行类似如下操作:

1
2
3
4
5
6
eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        register0(promise);
    }
});

然后调用SingleThreadEventExecutor的execute方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

addTask即添加任务。除了这类普通任务,还可以通过调用eventLoop.scheduleXXX 之类的方法来添加一个定时任务。EventLoop 中实现任务队列的功能在超类 SingleThreadEventExecutor 实现的, 而 schedule 功能的实现是在 SingleThreadEventExecutor 的父类, 即 AbstractScheduledEventExecutor 中实现的。在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

Queue<ScheduledFutureTask<?» scheduledTaskQueue;

通过AbstractScheduledEventExecutor.schedule方法实现任务的添加。

任务的执行

当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?让我们回到 NioEventLoop.run() 方法中, 在这个方法里, 会分别调用processSelectedKeys() 和 runAllTasks() 方法, 来进行 IO 事件的处理和 task 的处理. processSelectedKeys() 方法我们已经分析过了, 下面我们来看一下 runAllTasks() :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;
        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); 
        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

fetchFromScheduledTaskQueue()其实就是将 scheduledTaskQueue中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue中, 作为可执行的 task 等待被调度执行,然后runAllTasksFrom方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task, 然后调用它的 run() 方法来运行此 task。