目录

IO模型详解

演示

注意:/dev/tcp/host/port 其实是一个 bash 的 feature,由于是 bash的 feature,因此在别的 shell下就不能生效,所以需要注意使用shell类型。

1
2
3
4
5
6
# 查看所有可用的shell
cat /etc/shells
# 切换shell
chsh -s /bin/bash
# 查看当前使用的shell
echo $shell

参考

跟百度建立连接

1
$ exec 8<> /dev/tcp/www.baidu.com/80

向百度发出一个GET请求

1
$ echo -e "GET / HTTP/1.0\n" 1>& 8

向百度发出一个数据读取请求

1
$ cat 0<& 8

说明

  1. exec 8<> /dev/tcp/www.baidu.com/80

    exec

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830000009.png

    exec是一个外壳程序,相当于一个while循环,如果后面跟一个命令程序(会有exit)则替换成命令程序,比如 exec ls 执行完这个命令程序 进程也就结束了。

    命令程序是可选项,如果没有给命令程序,就不替换

    8<> /dev/tcp/www.baidu.com/80

    创建一个文件描述符8,8的标准输入和输出都重定向到 /dev/tcp/www.baidu.com/80 这个目标地址

    https://gitee.com/lienhui68/picStore/raw/master/null/20200829231905.png

    $$ 表示 当前这个解释程序,可以使用echo $$ 输出当前解释程序进程号

    文件描述符可以理解成java变量引用、指针, 文件描述符 8 表示目标地址是百度的那个socket

  2. echo -e "GET / HTTP/1.0\n" 1> &8

    -e 识别换行符

    echo也是一个程序,有标准输出1,重定向到文件描述符8所指向的文件,也就是目标地址是百度的socket

    根据应用层Http协议 和百度通信

  3. cat 0< &8

    cat 输入冲向到 &8 这个文件,也就是目标地址是百度的socket,因为8是文件描述符,所以需要加&

    https://gitee.com/lienhui68/picStore/raw/master/null/20200829231259.png

    注意建立连接后要在一定时间内发起请求,否则会超时

以上,除了创建socket的文件描述符是在内核空间做的,echo和cat与百度通信都是在用户空间完成的

用户空间负责应用层交互

内核空间负责应用层以下的交互

同样可以使用nc进行演示

1
2
nc -l 8080 # 服务器
nc localhost 8080 # 客户端

网络通信IO的演变

同步/异步/阻塞/非阻塞

“阻塞”与"非阻塞"与"同步"与“异步"不能简单的从字面理解,提供一个从分布式系统角度的回答。

  1. 同步和异步

    同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication) 所谓同步,就是在发出一个 调用 时,在没有得到结果之前,该 调用 就不返回。但是一旦调用返回,就得到返回值了。 换句话说,就是由调用者 主动等待这个调用的结果。

    而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

    典型的异步编程模型比如Node.js

    举个通俗的例子: 你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。 而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。

  2. 阻塞和非阻塞

    阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

    阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会被唤醒。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

    还是上面的例子, 你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。 在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关。

IO模型的演变过程:bio->nio->select/poll->epoll

一个Demo

使用strace跟踪java程序: 追踪一个java程序有多少个线程;每一个线程对内核有哪些系统调用,产生了哪些损耗

模拟服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SocketServerM {
    public static void main(String[] args) throws IOException {

        int port = 7000;
        int clientNo = 1;

        ServerSocket serverSocket = new ServerSocket(port);

        // 创建线程池
        ExecutorService exec = Executors.newCachedThreadPool();

        try {
            while (true) {
                Socket socket = serverSocket.accept();
                System.out.println(String.format("客户端IP:%s, 端口号:%d", socket.getInetAddress(), socket.getPort()));
		exec.execute(new SingleServer(socket, clientNo));
                clientNo++;
            }
        } finally {
            serverSocket.close();
        }

    }
}

class SingleServer implements Runnable {

    private Socket socket;
    private int clientNo;

    public SingleServer(Socket socket, int clientNo) {
        this.socket = socket;
        this.clientNo = clientNo;
    }

    @Override
    public void run() {

        try {
            DataInputStream dis = new DataInputStream(
                    new BufferedInputStream(socket.getInputStream()));
            DataOutputStream dos = new DataOutputStream(
                    new BufferedOutputStream(socket.getOutputStream()));
            do {
                double length = dis.readDouble();
                System.out.println("从客户端" + clientNo + "接收到的边长数据为:" + length);
                double result = length * length;
                dos.writeDouble(result);
                dos.flush();
            } while (dis.readInt() != 0);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("与客户端" + clientNo + "通信结束");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

模拟客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.io.*;
import java.net.Socket;
import java.util.Scanner;


public class SocketClient {
    public static void main(String[] args) throws IOException {

        int port = 7000;
        String host = "localhost";

        // 创建一个套接字并将其连接到服务器端指定端口号
        Socket socket = new Socket(host, port);

        // 从socket获取输入流
        DataInputStream dis = new DataInputStream(
                new BufferedInputStream(socket.getInputStream()));

        // 从socket获取输出流
        DataOutputStream dos = new DataOutputStream(
                new BufferedOutputStream(socket.getOutputStream()));

        Scanner sc = new Scanner(System.in);
        boolean flag = false;
        while (!flag) {
            System.out.println("请输入正方形的边长:");
            double length = sc.nextDouble();

            dos.writeDouble(length);
            dos.flush();

            double area = dis.readDouble();
            System.out.println("服务器返回的计算面积为:" + area);

            while (true) {
                System.out.println("继续计算?(Y/N)");
                String str = sc.next();

                if (str.equalsIgnoreCase("N")) {
                    dos.writeInt(0);
                    dos.flush();
                    flag = true;
                    break;
                } else if (str.equalsIgnoreCase("Y")) {
                    dos.writeInt(1);
                    dos.flush();
                    break;
                }
            }
        }
        socket.close();
    }
}
  1. 第一步 服务端以跟踪方式启动

    1
    
    $ strace -ff -o out java SocketServerM
    

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830204104.png

    服务端阻塞

  2. 第二步 查看out跟踪文件

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830204210.png

    第二个out是主线程跟踪文件,进行查看

    1
    
    $ vim out.1220
    

    先拉到最后

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830211126.png

    使用tail指令可知一直hang在这个地方,阻塞等待

    可以使用查找命令依次找到对应的bind listen socket方法

    服务端建立一个连接(开始是处于监听状态 socket建立fd, bind绑定端口号 listen 监听端口)

    系统调用指令 都属于2类指令,可以使用man 2 poll进行查看

    bio 可以使用man 2 bind, bind帮助文档里 有bio的example

  3. 第三步 客户端连接

    1
    
    $ java SocketClient
    

    查看跟踪文件变化

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830211441.png

    此时poll接着往后执行,accept得以执行,返回客户端连接的文件描述符6

    我们也可以进入到这个进程的fd里验证

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830211745.png

    4不用管,是生成的ip6 文件描述符,后面已经close

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830212126.png

    使用netstat命令查看,也能看出连接是双向的,服务端永远保持一个处于listen的socket,还有一个已经建立连接的socket

    客户端只有一个已经建立连接的socket

    继续查看out文件,程序是开辟线程接受accept请求,并且主线程循环等待新的连接请求

    开辟新线程:

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830213024.png

    这也是为什么多出来out.1509的原因

    通过系统调用clone创建一个轻量级进程1509,也就是java里说的线程,为什么netstat显示 建立连接的 pid还是1209,这涉及到线程组的概念,还是用父线程的pid表示。

    主线程循环等待新的连接请求

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830213121.png

    继续阻塞等待新的accept请求。

  4. 第四步 根据程序定义,此时跟客户端的连接应该阻塞在读取事件,查看out.1509

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830214320.png

    可以看到新的线程正在读取刚与客户端建立连接创建的文件描述符6

  5. 同理再来客户端连接

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830214813.png

    服务端会起一个线程建立一个新的连接,阻塞IO 每个连接对应一个线程。

    计算机收了一个数据包,目标地址是本机的任何地址且端口是7000,源地址是任何地址任何端口都ok,就可以交给379这个java程序, 就可以和这个java程序建立连接

  6. 客户端传输数据

    1
    
    tail -20f out.1509
    

    https://gitee.com/lienhui68/picStore/raw/master/null/20200830215430.png

    从out.1509文件 可以看到有数据 返回

BIO

从上面的demo 可以知道 服务端java程序是如何执行起来的,执行了哪些系统调用

服务端要监听某端口,必然会先执行3个系统调用socket/bind/listen

注意:无论是ngix、redis、还是其他服务端,无论是bio、nio、多路复用、aio都会先执行这三步系统调用。

accept会阻塞 由于服务端要接受多个客户端连接,所以每来一个连接(accept有返回)就得起一个新的线程去接受然后做连接后的工作(读写等), 所以得接受accept的操作放到while循环中

receive 也会阻塞

https://gitee.com/lienhui68/picStore/raw/master/null/image-20200828171537646.png

内存 线性地址空间 分成内核空间 用户空间,用户空间通过系统调用 软终端, 切换到内核态

https://gitee.com/lienhui68/picStore/raw/master/null/20200830221239.png

小结

https://gitee.com/lienhui68/picStore/raw/master/null/20200830221113.png

BIO的问题是由于创建多线程导致的,而创建多线程是为了解决阻塞的问题,所以BIO的缺陷根源就是BLOCKING

NIO

NIO有两层语义:

java:New IO

操作系统:NonBlocking

是否阻塞受制于内核,当内核提供非阻塞的方法,BIO的缺陷也就解决了。

使用man 2 socket命令查看这个方法是否提供非阻塞类型。

https://gitee.com/lienhui68/picStore/raw/master/null/20200830222105.png

查看type类型列表,可知操作系统提供了对创建非阻塞socket的支持。

https://gitee.com/lienhui68/picStore/raw/master/null/20200830222117.png

写一个非阻塞的服务端程序演示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import com.google.common.collect.Lists;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalTime;
import java.util.List;

public class SocketNIO {
    public static void main(String[] args) throws Exception {

        /**
         * socket 开启一个管道(可读可写),区分之前单独的读写流
         */
        ServerSocketChannel channel = ServerSocketChannel.open();

        // 可选参数
//        channel.setOption(StandardSocketOptions.TCP_NODELAY, false);
//        StandardSocketOptions.TCP_NODELAY
//        StandardSocketOptions.SO_KEEPALIVE
//        StandardSocketOptions.SO_LINGER
//        StandardSocketOptions.SO_RCVBUF
//        StandardSocketOptions.SO_SNDBUF
//        StandardSocketOptions.SO_REUSEADDR

        /**
         * bind
         */
        channel.bind(new InetSocketAddress(8000));

        /**
         * 设置非阻塞,OS层面
         */
        channel.configureBlocking(false);

        List<SocketChannel> clients = Lists.newArrayList();
        // 接受客户端的连接
        while (true) {
            // 防止打印过于频繁,生产环境不用这行
            Thread.sleep(3000);
            //不会阻塞,没有连接请求就返回null,操作系统返回-1
            SocketChannel client = channel.accept();

            //accept  调用内核了:1,没有客户端连接进来,返回值?在BIO 的时候一直卡着,但是在NIO ,不卡着,返回-1,NULL
            //如果来客户端的连接,accept 返回的是这个客户端的fd  5,client  object
            //NONBLOCKING 就是代码能往下走了,只不过有不同的情况

            if (client == null) {
                System.out.println(LocalTime.now() + " 暂时没有连接请求");
            } else {
                // //重点  socket(服务端的listen socket
                // 连接请求三次握手后,往我这里扔,我去通过accept 得到连接的socket,供连接后数据读写使用
                client.configureBlocking(false);
                System.out.println(String.format("客户端:%s", client.getRemoteAddress()));
                clients.add(client);
            }

            ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //分配直接内存(堆外),也可以放在堆里

            //遍历已经链接进来的客户端能不能读写数据
            for (SocketChannel c : clients) {   //串行化!!!!  多线程!!
                int num = c.read(buffer);  // >0  -1  0   //不会阻塞
                if (num != -1) {
                    buffer.flip();
                    byte[] aaa = new byte[buffer.limit()];
                    buffer.get(aaa);

                    String b = new String(aaa);
                    System.out.println(c.getRemoteAddress() + "传入数据 : " + b);
                    buffer.clear();
                }
            }
        }
    }
}

启动服务

https://gitee.com/lienhui68/picStore/raw/master/null/20200830234439.png

查看主线程跟踪文件

nio同样要建立服务端文件描述符,绑定端口,监听端口

1
2
3
18672 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 7
20479 bind(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
20480 listen(7, 50)                           = 0

accept

https://gitee.com/lienhui68/picStore/raw/master/null/20200831000041.png

可以看到此时accept不会引起阻塞,os直接返回-1

客户端连接

1
nc localhost 8000

可以看到accept返回了一个文件描述符8

https://gitee.com/lienhui68/picStore/raw/master/null/20200831001632.png

小结

https://gitee.com/lienhui68/picStore/raw/master/null/20200831001923.png

多路复用器

多路复用是当前线程把所有得到的socket连接 全部交给内核处理,然后内核把有状态变化的连接集合(分成可读、可写、异常的)返回给当前线程。

多路复用, 复用的是一次系统调用

select/poll

定义

1
2
3
select()  and  pselect()  allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O
       operation (e.g., input possible).  A file descriptor is considered ready if it is possible to perform a corresponding I/O operation (e.g., read(2)  without  blocking,
       or a sufficiently small write(2)).
1
man 2 select

https://gitee.com/lienhui68/picStore/raw/master/null/20200831002702.png

https://gitee.com/lienhui68/picStore/raw/master/null/20200831003304.png

多路复用, 循环放在内核里 , 不用切换内核态到用户态;select返回准备好的fds, 有几个用户进程就调几个就好了

select 和poll

select 1024个fd限制 减少了系统调用的次数,你也可以自己编译linux内核程序

poll 操作系统 fd限制数

https://gitee.com/lienhui68/picStore/raw/master/null/20200831004156.png

epoll

epoll 基于事件机制,不进行遍历。

select/poll 弊端

  1. 重复传递fd, 解决:增量式传递,内核开辟空间(红黑树结构)保留fd
  2. 每次select、poll 都要重新遍历全量的fd 解决:中断,callback,增强

解决方案:中断之前将数据存放在buffer里,中断来了之后将红黑树上的fd标识一下,仅此而已。

同步和异步

同步IO模型:BIO、NIO、多路复用器 多路复用器只能给你状态,最终还是线程自己读写,所以都是同步模型。

异步IO模型:windows:IOCP 内核有线程负责将数据拷贝到程序的内存空间。

cpu01负责处理终端以及将有变动的文件描述符放入返回的地址空间,cpu02可以继续执行程序其他部分逻辑。

cpu01进行内核态操作,cpu02进行用户态操作;所以这个程序可以并行处理,充分利用多核cpu。

https://gitee.com/lienhui68/picStore/raw/master/null/20200831010312.png


epoll能够高效支持百万级别的句柄监听。

epoll高效,是因为内部用了一个红黑树记录添加的socket,用了一个双向链表接收内核触发的事件。是系统级别的支持的:

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。

eventpoll结构体如下所示:

1
2
3
4
5
6
7
8
struct eventpoll{
    ....
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};

每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。

这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。

而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中

在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:

1
2
3
4
5
6
7
struct epitem{
    struct rb_node  rbn;//红黑树节点
    struct list_head    rdllink;//双向链表节点
    struct epoll_filefd  ffd;  //事件句柄信息
    struct eventpoll *ep;    //指向其所属的eventpoll对象
    struct epoll_event event; //期待发生的事件类型
}

下面图的左上角文字写错了,应该是双向链表的每个节点都是基于epitem结构中的rdllink成员。

https://gitee.com/lienhui68/picStore/raw/master/null/20200901085247.png

上面这一句更具体的解释是(为什么能支持百万句柄):

  1. 不用重复传递。我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表。

  2. 在内核里,一切皆文件。所以,epoll向内核注册了一个文件系统,用于存储上述的被监控socket。当你调用epoll_create时,就会在这个虚拟的epoll文件系统里创建一个file结点。当然这个file不是普通文件,它只服务于epoll。

    epoll在被内核初始化时(操作系统启动),同时会开辟出epoll自己的内核高速cache区,用于安置每一个我们想监控的socket,这些socket会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。这个内核高速cache区,就是建立连续的物理内存页,然后在之上建立slab层,简单的说,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的对象。

  3. 极其高效的原因:

    这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

    这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。(注:好好理解这句话!)

    从上面这句可以看出,epoll的基础就是回调!

如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。

最后看看epoll独有的两种模式LT和ET。无论是LT和ET模式,都适用于以上所说的流程。区别是,LT模式下,只要一个句柄上的事件一次没有处理完,会在以后调用epoll_wait时次次返回这个句柄,而ET模式仅在第一次返回。

关于LT,ET,有一端描述,LT和ET都是电子里面的术语,ET是边缘触发,LT是水平触发,一个表示只有在变化的边际触发,一个表示在每个阶段都会触发。

参考了这篇文章:https://zhuanlan.zhihu.com/p/20315482

LT, ET这件事怎么做到的呢?当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了。所以,非ET的句柄,只要它上面还有事件,epoll_wait每次都会返回这个句柄。(从上面这段,可以看出,LT还有个回放的过程,低效了)


epoll_create

1
2
int epoll_create(int size);
int epoll_create1(int flags);

epoll_ctl

1
 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_wait

epoll_wait 是阻塞的, 尽量设置超时时间

1
2
3
4
5
int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
                int maxevents, int timeout,
                const sigset_t *sigmask);

单线程完成对多客户端的连接和接受工作

非阻塞,一个线程就解决了 线程内存浪费和消耗时间片的问题。

演示

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 单线程多路复用器
 */
public class SocketMultiplexingSingleThreadv1 {


    private ServerSocketChannel server;

    //linux 多路复用器(select poll    epoll kqueue) nginx  event{}
    private Selector selector;
    private static final int PORT = 9000;

    public void initServer() {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(PORT));

            //如果在epoll模型下,open--》  epoll_create -> fd3
            selector = Selector.open();  //  select  poll  *epoll  优先选择:epoll  但是可以 -D修正

            //server 约等于 listen状态的 fd4
        /*
        register
        如果:
        select,poll:jvm里开辟一个数组 fd4 放进去
        epoll:  epoll_ctl(fd3,ADD,fd4,EPOLLIN
         */

            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服务器启动了。。。。。");


        try {
            while (true) { // 死循环,一直处于监听中
                // selector管理的所有fd
                Set<SelectionKey> keys = selector.keys();
                System.out.println("内核空间中文件描述符的个数: " + keys.size());

                //1,调用多路复用器(select,poll  or  epoll  (epoll_wait))
                /*
                select()是啥意思:
                1,select,poll  其实  内核的select(fd4)  poll(fd4)
                2,epoll:  其实 内核的 epoll_wait()
                *, 参数可以带时间:没有时间,0  :  阻塞,有时间设置一个超时
                selector.wakeup()  结果返回0
                 */

                while (selector.select(3000) > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();  //返回的有状态的fd集合
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    //so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!
                    //  NIO  自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?
                    //幕兰,是不是很省力?
                    //我前边可以强调过,socket:  listen   通信 R/W
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove(); //set  不移除会重复循环处理
                        if (key.isAcceptable()) {
                            //看代码的时候,这里是重点,如果要去接受一个新的连接
                            //语义上,accept接受连接且返回新连接的FD对吧?
                            //那新的FD怎么办?
                            //select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
                            //epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                            //在当前线程,这个方法可能会阻塞  ,如果阻塞了十年,其他的IO早就没电了。。。
                            //所以,为什么提出了 IO THREADS
                            //redis  是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
                            //tomcat 8,9  异步的处理方式  IO  和   处理上  解耦
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端  fd7
            client.configureBlocking(false);

            ByteBuffer buffer = ByteBuffer.allocate(8192);  //前边讲过了

            // 0.0  我类个去
            //你看,调用了register
            /*
            select,poll:jvm里开辟一个数组 fd7 放进去
            epoll:  epoll_ctl(fd3,ADD,fd7,EPOLLIN
             */
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
        service.start();
    }

}

照样使用strace,查看主线程跟踪文件

照样是三步曲

socket

https://gitee.com/lienhui68/picStore/raw/master/null/20200831015516.png

得到服务端文件描述符7

fcntl

https://gitee.com/lienhui68/picStore/raw/master/null/20200831022030.png

和程序中 server.configureBlocking(false); 是对应的。

bind

listen

https://gitee.com/lienhui68/picStore/raw/master/null/20200831015553.png

绑定端口并监听

epoll_create

https://gitee.com/lienhui68/picStore/raw/master/null/20200831015634.png

首先创建一个代表内核空间(数据结构是红黑树)的的文件描述符(selector = Selector.open())

epoll_ctl

https://gitee.com/lienhui68/picStore/raw/master/null/20200831020209.png

将服务端文件描述符7放入内核空间10

追踪还发现往内核空间加入了一个8

https://gitee.com/lienhui68/picStore/raw/master/null/20200831015709.png

注意,这里是由于线程间通信建立管道,所以使用新建的文件描述符8,可以通过如下方式查看

获取进程号 jps

https://gitee.com/lienhui68/picStore/raw/master/null/20200831015826.png

列出打开的文件 lsof

https://gitee.com/lienhui68/picStore/raw/master/null/20200831020016.png

可以看到fd8是个pipe

epoll_wait

轮询监听结果

https://gitee.com/lienhui68/picStore/raw/master/null/20200831020446.png

因为只有服务端一个fd,所以结果都是0

注意最后一行 阻塞,因为我们设置了等待5s, 5s之后会继续执行。

使用tail -f out.5572

增加一个客户端连接,内核收到中断就会将11添加到fd10中

https://gitee.com/lienhui68/picStore/raw/master/null/20200831021646.png

在这之前内核肯定会先accept这个连接创建11这个文件描述符

https://gitee.com/lienhui68/picStore/raw/master/null/20200831021323.png

调用wait方法时就会返回1表示有一个变动的fd

https://gitee.com/lienhui68/picStore/raw/master/null/20200831020934.png

{u32=11}是内核的返回结果

ngix、redis…的底层都是使用epoll

以redis为例

1
$ strace -ff -o redisout redis-server /usr/local/redis/bin/redis.conf

https://gitee.com/lienhui68/picStore/raw/master/null/20200831032211.png

使用tail命令查看

https://gitee.com/lienhui68/picStore/raw/master/null/20200831032318.png

可知redis设置了超时

redis单线程,在一个线程里还需要做持久化、lru等其他事情。

多线程多路复用

以上是单线程多路复用,一个线程同时负责了连接的建立以及连接之后的读取计算。这样会有一个问题

当wait返回的结果里连接数越来越多,连接之后的读取计算耗时也就越来越长,建立连接的响应也就越来越慢。

可以将连接之后的读取计算放在新的线程里去做,不要影响连接的建立。

思路:分boss和worker

监听几个端口(server端)对应几个boss线程,负责建立连接

几个cpu(每个cpu负责处理一部分客户端fd)对应几个worker线程,分别负责对一部分fd的读写计算。

演示程序:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 单线程多路复用器
 */
public class SocketMultiplexingMultiThread {


    private ServerSocketChannel server;

    private Selector selector1;
    private Selector selector2;
    private Selector selector3;
    private static final int PORT = 9000;

    public void initServer() {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(PORT));

            selector1 = Selector.open();
            selector2 = Selector.open();
            selector3 = Selector.open();

            // server这个fd注册到其中的一个select上
            server.register(selector1, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        SocketMultiplexingMultiThread service = new SocketMultiplexingMultiThread();
        service.initServer();

        NioThread t1 = new NioThread(service.selector1, 2); // 得到客户端连接
        // t1得到客户端连接后 将fd按照轮询方式注册到下面这两个selector
        NioThread t2 = new NioThread(service.selector2);
        NioThread t3 = new NioThread(service.selector3);

        t1.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t2.start();
        t3.start();

        System.out.println("服务器启动了。。。。。");

    }


}

class NioThread extends Thread {
    Selector selector;
    static int selectors = 0;

    int id = 0;

    volatile static BlockingQueue<SocketChannel>[] queues;

    static AtomicInteger idx = new AtomicInteger();

    public NioThread(Selector selector, int n) {
        this.selector = selector;
        this.selectors = n;

        queues = new LinkedBlockingQueue[selectors];
        for (int i = 0; i < n; i++) {
            queues[i] = new LinkedBlockingQueue<>();
        }
        System.out.println("Boss 启动");
    }

    public NioThread(Selector selector) {
        this.selector = selector;
        id = idx.getAndIncrement() % selectors;
        System.out.println("worker: " + id + "启动");
    }

    @Override
    public void run() {
        try {
            while (true) {

                while (selector.select(3000) > 0) {

                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectionKeys.iterator();

                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        // 只有第一个线程的selector可能会有连接的fd,第一个selector是基于服务端fd创建的,注册了listen
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) { // 其他两个线程里的selector 只会 可能有读写的fd
                            readHandler(key);
                        }
                    }
                }

                if (!queues[id].isEmpty()) {
                    ByteBuffer buffer = ByteBuffer.allocate(8192);  //前边讲过了
                    SocketChannel client = queues[id].take();
                    // 将client注册到当前线程的select上
                    client.register(selector, SelectionKey.OP_READ, buffer);
                    System.out.println("-------------------------------------------");
                    System.out.println("新客户端:" + client.getRemoteAddress() + "分配到:" + (id));
                    System.out.println("-------------------------------------------");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端  fd7
            client.configureBlocking(false);

            int num = idx.getAndIncrement() % selectors;

            queues[num].add(client);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}