目录

NIO多人聊天室

原文:https://segmentfault.com/a/1190000021519312

需求分析

使用NIO实现一个多人聊天室。聊天室包含以下功能。

服务端

  1. 处理客户连接
  2. 新连接客户端注册名字,并进行重名判断
  3. 新用户注册后向客户端广播新用户加入
  4. 接收客户端消息并单播或广播

客户端

  1. 向服务端发起连接
  2. 用户注册名称
  3. 接收服务端广播消息
  4. 发送聊天消息,支持单播和广播

系统分析

系统类设计

系统包括四个类,分别为:

  1. 消息处理类:Message,处理消息的编解码
  2. 消息枚举:MessageType,定义消息类型
  3. 聊天服务端:ChatServer
  4. 聊天客户端:ChatClient

https://gitee.com/lienhui68/picStore/raw/master/null/20200901071928.png

系统业务流程

  1. 服务端启动
  2. 客户端启动
  3. 客户端注册
    1. 服务端向客户端发送注册用户名提示,消息类型:REG_SERVER_SYN
    2. 客户端向服务端发送注册用户名,消息类型:REG_CLIENT_ACK
    3. 服务端向客户端发送注册确认消息,消息类型:REG_SERVER_ACK
    4. 服务端向所有客户端广播用户列表,消息类型:BROADCAST_USER_LIST
  4. 发送聊天信息
    1. 客户端向服务端发送聊天信息,指定toUser为单播,否则广播,消息类型:CHAT_MSG_SEND
    2. 服务端接收聊天信息,进行单播或关闭,消息类型:CHAT_MSG_RECEIVE
    3. 客户端显示消息内容

消息设计

系统消息采用简单的特殊字符串 String MSG_SPLIT = "#@@#" 分割字段的格式,消息格式分两种:

  1. message_type#@@#message_content格式,即命令+数据的格式
  2. message_type#@@#option#@@#message_content,其中option为附加消息,比如客户端发送单播聊天信息时指定toUser。

程序注意点

  1. 服务端为单线程模式,由一个Selector处理所有消息。

  2. 客户端注册后,用户名信息保存在服务端对应SelectionKey.attachment属性中。

  3. 通过Selector.keys可获取所有向Selector注册的客户端,获取客户端连接列表时,需要过滤掉ServerSocketChannel和关闭的Channel selector.keys().stream().filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen()).collect(Collectors.toSet());

  4. 当Socket连接的一端关闭时,另一端会触发 OP_READ 事件,但此时 socketChannel.read(byteBuffer) 返回-1或抛IOException,需要捕获这个异常并关闭socketChannel。

    此处通过是否返回-1判断客户端连接是否异常断开

  5. 客户端因为要同时处理服务端发送的数据和接收客户端消息输入,如果单线程,在客户端输入消息时,线程阻塞,无法接受服务端消息。所以客户端使用2个线程,主线程处理服务端消息,启动一个子线程接收用户输入并处理。

  6. 客户端分为两个阶段

    1. 初始为注册阶段 messageType = MessageType.REG_CLIENT_ACK
    2. 收到服务器端注册成功消息REG_SERVER_ACK后,进入聊天阶段 messageType = MessageType.CHAT_MSG_SEND

上述第3步,通过Selector.keys获取所有向Selector注册的客户端时,特别注意要过滤已经关闭的Channel,不然处理客户端下线事件时,取到的用户列表会包含刚下线的这个用户,可能是因为Selector只有执行select时才会去刷新并删除关闭的Channel的原因吧。

详细设计

代码比较简单,设计点上面基本都描述了

Message 和 MessageType

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package com.eh.eden.nio.chat;

import lombok.Data;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

@Data
public class Message {
    public static final String MSG_SPLIT = "#@@#";
    public static final Charset CHARSET = StandardCharsets.UTF_8;

    private MessageType action;
    private String option;
    private String message;

    public Message(MessageType action, String option, String message) {
        this.action = action;
        this.option = option;
        this.message = message;
    }

    public Message(MessageType action, String message) {
        this.action = action;
        this.message = message;
    }

    public ByteBuffer encode() {
        StringBuilder builder = new StringBuilder(action.getAction());
        if (option != null && option.length() > 0) {
            builder.append(MSG_SPLIT);
            builder.append(option);
        }
        builder.append(MSG_SPLIT);
        builder.append(message);

        return CHARSET.encode(builder.toString());
    }

    public static Message decode(String message) {
        if (message == null || message.length() == 0)
            return null;
        String[] msgArr = message.split(MSG_SPLIT);
        MessageType messageType = msgArr.length > 1 ? MessageType.getActionType(msgArr[0]) : null;

        switch (msgArr.length) {
            case 2:
                return new Message(messageType, msgArr[1]);
            case 3:
                return new Message(messageType, msgArr[1], msgArr[2]);
            default:
                return null;
        }
    }

    public static ByteBuffer encodeRegSyn() {
        return encodeRegSyn(false);
    }

    public static ByteBuffer encodeRegSyn(boolean duplicate) {
        MessageType action = MessageType.REG_SERVER_SYN;
        String message = "Please input your name to register.";
        if (duplicate) {
            message = "This name is used, Please input another name.";
        }
        return new Message(action, message).encode();
    }

    public static ByteBuffer encodeSendMsg(String msg) {
        return encodeSendMsg(null, msg);
    }

    public static ByteBuffer encodeSendMsg(String toUser, String msg) {
        MessageType action = MessageType.CHAT_MSG_SEND;
        String option = toUser;
        String message = msg;
        return new Message(action, option, message).encode();
    }

    public static ByteBuffer encodeReceiveMsg(String fromUser, String msg) {
        MessageType action = MessageType.CHAT_MSG_RECEIVE;
        String option = fromUser;
        String message = msg;
        return new Message(action, option, message).encode();
    }

    public static ByteBuffer encodeRegClientAck(String username) {
        MessageType action = MessageType.REG_CLIENT_ACK;
        String message = username;
        return new Message(action, message).encode();
    }

    public static ByteBuffer encodeRegServerAck(String username) {
        MessageType action = MessageType.REG_SERVER_ACK;
        String message = username + ", Welcome to join the chat room.";
        return new Message(action, message).encode();
    }

    public static ByteBuffer encodePublishUserList(String message) {
        MessageType action = MessageType.BROADCAST_USER_LIST;
        return new Message(action, message).encode();
    }
}

enum MessageType {
    REG_SERVER_SYN("reg_server_syn"),
    CHAT_MSG_SEND("chat_send"),
    CHAT_MSG_RECEIVE("chat_receive"),
    UNKNOWN("unknown"),
    REG_SERVER_ACK("reg_server_ack"),
    REG_CLIENT_ACK("reg_client_ack"),
    BROADCAST_USER_LIST("broadcast_user_list");

    private String action;

    MessageType(String action) {
        this.action = action;
    }

    public String getAction() {
        return action;
    }

    public static MessageType getActionType(String action) {
        for (MessageType messageType : MessageType.values()) {
            if (messageType.getAction().equals(action)) {
                return messageType;
            }
        }
        return UNKNOWN;
    }
}

ChatServer

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.eh.eden.nio.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * 多人聊天室服务端
 */
public class ChatServer {
    public static final int SERVER_PORT = 8080;

    Selector selector;
    ServerSocketChannel serverSocketChannel;
    boolean running = true;

    public void runServer() throws IOException {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.bind(new InetSocketAddress(SERVER_PORT));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("Server started.");

            while (running) {
                int eventCount = selector.select(100);
                if (eventCount == 0)
                    continue;
                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> keyIterable = set.iterator();
                while (keyIterable.hasNext()) {
                    SelectionKey key = keyIterable.next();
                    keyIterable.remove();
                    dealEvent(key);
                }
            }
        } finally {
            if (selector != null && selector.isOpen())
                selector.close();
            if (serverSocketChannel != null && serverSocketChannel.isOpen())
                serverSocketChannel.close();
        }
    }

    private void dealEvent(SelectionKey key) throws IOException {
        if (key.isAcceptable()) { // 建立连接
            System.out.println("Accept client connection.");
            SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            // 建立连接后 向客户端发送邀请注册信息
            socketChannel.write(Message.encodeRegSyn());
        }
        if (key.isReadable()) { // 处理客户端请求,包括退出聊天室
            SocketChannel socketChannel = null;
            try {
                System.out.println("Receive message from client.");
                socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                // client主动关闭连接,需要广播给大家知道
                int ret = socketChannel.read(byteBuffer);
                if (ret == -1) {
                    throw new IOException("客户端异常关闭");
                }
                byteBuffer.flip();
                String msg = Message.CHARSET.decode(byteBuffer).toString(); // 转String
                dealMsg(msg, key); // 处理消息
            } catch (IOException e) { // client主动关闭连接,需要广播给大家知道
                socketChannel.close();
                String username = (String) key.attachment();
                if (username == null || username.equals("")) { // 未注册用户无需广播
                    return;
                }
                System.out.println(String.format("User %s disconnected", username));
                broadcast(String.format("%s disconnected", username));
            }
        }
    }

    private void dealMsg(String msg, SelectionKey key) throws IOException {
        System.out.println(String.format("Message info is: %s", msg));
        Message message = Message.decode(msg);
        if (message == null)
            return;

        SocketChannel currentChannel = (SocketChannel) key.channel();
        Set<SelectionKey> keySet = getConnectedChannel();
        switch (message.getAction()) {
            case REG_CLIENT_ACK: // 客户端注册确认
                String username = message.getMessage(); // 获取用户名
                // 去重
                for (SelectionKey keyItem : keySet) {
                    String channelUser = (String) keyItem.attachment();
                    if (channelUser != null && channelUser.equals(username)) {
                        currentChannel.write(Message.encodeRegSyn(true));
                        return;
                    }
                }
                key.attach(username);
                // 向用户发送注册确认消息
                currentChannel.write(Message.encodeRegServerAck(username));
                System.out.println(String.format("New user joined: %s,", username));
                // 对聊天室进行广播
                broadcast("welcome " + username + " join us.");
                break;
            case CHAT_MSG_SEND: // 客户端发送消息
                String toUser = message.getOption();
                String msg2 = message.getMessage();
                String fromUser = (String) key.attachment();

                for (SelectionKey keyItem : keySet) {
                    if (keyItem == key) { // 排除自己
                        continue;
                    }
                    String channelUser = (String) keyItem.attachment();
                    SocketChannel channel = (SocketChannel) keyItem.channel();
                    if (toUser == null || toUser.equals(channelUser)) {
                        channel.write(Message.encodeReceiveMsg(fromUser, msg2));
                    }
                }
                break;
        }
    }

    public void broadcast(String message) throws IOException {
        Set<SelectionKey> keySet = getConnectedChannel();
        for (SelectionKey keyItem : keySet) {
            SocketChannel channel = (SocketChannel) keyItem.channel();
            channel.write(Message.encodePublishUserList(message));
        }
    }

    private Set<SelectionKey> getConnectedChannel() {
        /*
            Selector只有执行select时才会去刷新并删除关闭的Channel
         */
        return selector.keys().stream()
                .filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen())
                .collect(Collectors.toSet());
    }

    public static void main(String[] args) throws IOException {
        new ChatServer().runServer();
    }
}

ChatClient

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.eh.eden.nio.chat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 多人聊天室客户端
 */
public class ChatClient {
    Selector selector;
    SocketChannel socketChannel;
    boolean running = true;

    MessageType messageType = MessageType.REG_CLIENT_ACK;
    private final static String PROMPT_USERNAME = "Username:";
    private final static String PROMPT_INPUT = "Input the message:";

    public void runClient() throws IOException {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", ChatServer.SERVER_PORT));
            System.out.println("Client connecting to server.");

            socketChannel.register(selector, SelectionKey.OP_CONNECT);

            while (running) {
                int eventCount = selector.select(100);
                if (eventCount == 0)
                    continue;
                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> keyIterable = set.iterator();
                while (keyIterable.hasNext()) {
                    SelectionKey key = keyIterable.next();
                    keyIterable.remove();
                    dealEvent(key);
                }
            }
        } finally {
            if (selector != null && selector.isOpen())
                selector.close();

            if (socketChannel != null && socketChannel.isConnected())
                socketChannel.close();
        }
    }

    private void dealEvent(SelectionKey key) throws IOException {
        // 建立连接后,主线程负责处理来自服务端的消息
        if (key.isConnectable()) {
            SocketChannel channel = (SocketChannel) key.channel();
            if (channel.isConnectionPending()) {
                channel.finishConnect();
            }
            channel.register(selector, SelectionKey.OP_READ);

            // 建立连接后,起一个新线程专门负责向服务端发送消息
            new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                    while (running) {
                        String msg = reader.readLine();
                        if (msg == null || msg.length() == 0)
                            continue;

                        if (messageType == MessageType.REG_CLIENT_ACK) {
                            ByteBuffer bufferMsg = Message.encodeRegClientAck(msg);
                            channel.write(bufferMsg);
                        } else {
                            String[] msgArr = msg.split("#", 2);
                            ByteBuffer bufferMsg = Message.encodeSendMsg(msg);
                            if (msgArr.length == 2) {
                                bufferMsg = Message.encodeSendMsg(msgArr[0], msgArr[1]);
                            }

                            channel.write(bufferMsg);
                        }
                        printPrompt(PROMPT_INPUT);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {

                }
            }).start();
        } else if (key.isReadable()) {
            try {
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                channel.read(byteBuffer);
                byteBuffer.flip();
                String msg = Message.CHARSET.decode(byteBuffer).toString();
                dealMsg(msg);
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("Server exit.");
                System.exit(0);
            }
        }
    }

    private void dealMsg(String msg) {
        Message message = Message.decode(msg);
        if (message == null)
            return;

        switch (message.getAction()) {
            case REG_SERVER_SYN: // 服务端注册邀请消息
                printMsgAndPrompt(message.getMessage(), PROMPT_USERNAME);
                break;
            case REG_SERVER_ACK:
                messageType = MessageType.CHAT_MSG_SEND; // 服务端注册确认消息
                printMsgAndPrompt(message.getMessage(), PROMPT_INPUT);
                break;
            case CHAT_MSG_RECEIVE: // 接受聊天消息
                String info = "from " + message.getOption() + ": " + message.getMessage();
                printMsgAndPrompt(info, PROMPT_INPUT);
                break;
            default:
        }
    }

    private void printPrompt(String msg) {
        System.out.print(msg);
    }

    private void printMsgAndPrompt(String msg, String prompt) {
        System.out.println();
        System.out.println(msg);
        System.out.print(prompt);
    }

    public static void main(String[] args) throws IOException {
        new ChatClient().runClient();
    }
}

效果演示

https://gitee.com/lienhui68/picStore/raw/master/null/20200901073140.png