目录

异步事件驱动机制详解

NIO这种IO模型是同步非阻塞式的,Netty底层使用的就是NIO这种网络IO模型,但是Netty定义为是一款异步的事件驱动的网络应用程序框架,两者之间是否矛盾?

ok, 现在就让我们带着上述问题阅读本篇文章。

开始之前先介绍基本概念

基础概念

同步/异步

同步和异步的区别是消息通信机制不一样,同步是调用者发出一个调用,在没有得到结果之前,该 调用 就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者 主动等待这个调用的结果。

而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

我们可以使用将同步转换成异步这种方式扩大cpu的处理能力,eg:fork/join 还有下文将要提到的Netty。

阻塞/非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会被唤醒。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

同步非阻塞

对NIO是同步非阻塞的理解

同步和异步

从操作系统角度来说,网络IO的数据拷贝主要分为两个阶段,一是数据准备阶段,此阶段会等待IO设备接收数据并由DMA将数据从IO设备的buffer拷贝到kernel buffer,二是数据从kernel buffer拷贝到user buffer中

同步IO指的是数据从内核拷贝到用户时。发起该请求的线程会自己来拷贝数据(表现为线程阻塞拷贝)。

一旦涉及到网络 IO必定会发生数据拷贝的阻塞,注意此阻塞非彼阻塞,这里的阻塞形容的是拷贝数据相对于CPU的速度来说是非常耗时的,看起来像线程阻塞了一样。我们在上面说的阻塞是线程挂起并让出CPU。

一句话总结就是自己来拷贝是阻塞的,别人帮我拷贝就是异步的,像是windows的IOCP 内核机制就有线程负责将数据从内核拷贝到用户空间。

阻塞和非阻塞

阻塞和非阻塞主要描述的是网络IO数据拷贝的第一个阶段,当用户线程发起读写等系统调用时,用户线程所处的状态。

阻塞指的是线程一直等待数据准备好,期间什么都不干,但是会让出CPU,这样其他线程可以执行(CPU的利用率比较高),数据准备好之后自己来拷贝数据。

非阻塞指的是在第一阶段,发起网络IO请求的时候会立即返会去干别的事情,但是会轮询内核数据是否准备好,由于CPU要处理更多的系统调用(每次询问都是系统调用),这种模型的CPU利用率低。操作系统提供了IO多路复用工作解决了cpu利用率低的痛点。

一句话总结NIO同步非阻塞

用户线程需要自己从内核拷贝数据到用户空间,由于拷贝数据相对于cpu来说是非常耗时的,线程会立即受到系统调用返回的结果,这样线程就可以做其他事情,当然别忘了还得拷贝数据,所以线程会采用轮询的方式来看数据是否准备妥当。

异步事件驱动

前文已经介绍了异步,现在介绍下事件驱动,其实上面有提到IO多路复用,采用的就是事件驱动机制。

事件驱动:一般有一个主循环和一个任务队列,所有事件只管往队列里塞,主循环则从队列里取出并处理。如果不依赖于多路复用处理多个任务就会需要多线程(与连接数对等),但是依赖于多路复用,这个循环就可以在单线程的情况下处理多个连接。无论是哪个连接发生了什么事件,都会被主循环从队列取出并处理(可能用回调函数处理等),也就是说程序的走向由事件驱动。

Java的NIO采用的是Reactor线程模型中的单Reactor单线程模型(前台和服务员是一个人,全程为顾客服务,可以服务多个人),Netty的NIO采用的是主从Reactor模型,是多Reactor多线程模型。

那么Netty的异步和NIO的同步矛盾吗?

Netty的异步和NIO的同步没半毛钱关系,异步体现在所有的IO调用都是异步的,所有的IO调用会立即返回,并不保证调用成功与否,但是调用会返回ChannelFuturenetty会通过ChannelFuture通知你调用是成功了还是失败了亦或是取消了。

io.netty.channel.Channel类的一段注释请参考。

1
2
3
4
5
6
All I/O operations are asynchronous.
All I/O operations in Netty are asynchronous. It means any I/O calls will
return immediately with no guarantee that the requested I/O operation has
been completed at the end of the call. Instead, you will be returned with
a {@link ChannelFuture} instance which will notify you when the requested I/O
operation has succeeded, failed, or canceled.

说白了就是将同步转成异步,类似下面这种写法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.eh.eden.pattern;

import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

class Demo {
    public static void main(String[] args) throws InterruptedException {
        LocalTime t1 = LocalTime.now();

        syncCall(); // 同步调用,耗时3s

        LocalTime t2 = LocalTime.now();

        System.out.println("同步调用耗时: " + Duration.between(t1, t2).toMillis());

        final CountDownLatch latch = new CountDownLatch(1);

        // 同步转异步,立即返回
        CompletableFuture
                .supplyAsync(() -> String.format("=======》将 %s 转为异步调用", syncCall()))
                .thenAccept(s -> {
                    System.out.println(s);
                    latch.countDown();
                });

        LocalTime t3 = LocalTime.now();

        System.out.println("异步调用耗时: " + Duration.between(t2, t3).toMillis());

        latch.await();
        LocalTime t4 = LocalTime.now();
        System.out.println("异步计算结果耗时: " + Duration.between(t3, t4).toMillis());

    }


    private static String syncCall() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "同步调用";
    }
}
// 运行结果
同步调用耗时: 3002
异步调用耗时: 18
======= 同步调用 转为异步调用
异步计算结果耗时: 3021

所以所谓的异步是针对用户而言的,用户使用Channel进行IO操作,会立即返回。但是这个IO操作的任务是提交给了Netty的NIO底层去进行处理,所以我们说Netty的异步事件驱动与Netty底层基于NIO(同步非阻塞)是不矛盾的。

两句话总结Netty的异步,一是异步事件处理,Event被放入EventQueue即可返回,后续再从Queue里消费处理。

二是异步IO,包括Bind、Write等操作会返回一个ChannelFuture,进而异步拿到结果,不会造成线程block。

1
2
3
4
5
6
7
serverBootstrap.bind(port).addListener(future -> {
       if (future.isSuccess()) {
           System.out.println("success, port: " + port);
       } else {
           System.out.println("failed, port: " + port);
       }
   });

参考

https://blog.csdn.net/weixin_41954254/article/details/106414746