JAVA基础-使用BIO / NIO实现聊天室功能
Scoket&ServerSocket
客户端数据处理器
package org.example.code_case.javaIo.聊天室实现.Socket实现;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Set;public class ClientHandler implements Runnable {private Socket clientSocket;private Set<ClientHandler> clients;private PrintWriter out;private BufferedReader in;private String clientName;public ClientHandler(Socket accept, Set<ClientHandler> clients) {this.clientSocket = accept;this.clients = clients;}@Overridepublic void run() {try {out = new PrintWriter(clientSocket.getOutputStream(),true);in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));System.out.println("客户端连接成功");clientName = in.readLine();System.out.println(clientName + "上线了");//广播新用户加入broadcastMessage("[系统]" + clientName + "加入聊天室");//读取客户端消息String msg;while ((msg = in.readLine()) != null) {System.out.println("收到的消息:" + msg);broadcastMessage("[" + clientName + "]:" + msg);}} catch (IOException e) {throw new RuntimeException(e);}finally {try {clients.remove(this);if(clientName != null){broadcastMessage("[系统]" + clientName + "离开聊天室");}if (out != null) out.close();if (in != null) in.close();if (clientSocket != null) clientSocket.close();} catch (IOException e) {throw new RuntimeException(e);}}}private void broadcastMessage(String msg) {for (ClientHandler client : clients) {if (client != this) {client.out.println(msg);}}}
}
服务端实现
package org.example.code_case.javaIo.聊天室实现.Socket实现;import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ChatServer {private static final int PATH = 8888;private static final Set<ClientHandler> CLIENTS = ConcurrentHashMap.newKeySet();private static final ExecutorService POOL = Executors.newCachedThreadPool();public static void main(String[] args) {System.out.println("聊天室服务器启动......");try (ServerSocket serverSocket = new ServerSocket(PATH);){while (true) {Socket accept = serverSocket.accept();System.out.println("新客户端连接:"+accept.getInetAddress().getHostAddress());ClientHandler clientHandler = new ClientHandler(accept,CLIENTS);CLIENTS.add(clientHandler);POOL.execute(clientHandler);}} catch (IOException e) {throw new RuntimeException(e);}}}
客户端实现
package org.example.code_case.javaIo.聊天室实现.Socket实现;import java.io.*;
import java.net.Socket;
import java.util.Scanner;public class ChatClient {public static void main(String[] args) {try (Socket socket = new Socket("127.0.0.1", 8888);InputStream in = socket.getInputStream();OutputStream out = socket.getOutputStream();BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));Scanner scanner = new Scanner(System.in);PrintWriter printWriter = new PrintWriter(out, true);) {System.out.println("已连接到聊天室.....");System.out.println("请输入用户名:");String name = scanner.nextLine();System.out.println("用户名:" + name);printWriter.println(name);Thread thread = new Thread(() -> {try {String msg;while ((msg = bufferedReader.readLine()) != null) {System.out.println(msg);}} catch (IOException e) {System.out.println("聊天室关闭");}});thread.start();//发送消息System.out.println("您可以开始输入消息了(输入'exit'退出):");while (true) {String msg = scanner.nextLine();if ("exit".equalsIgnoreCase(msg)) {break;}printWriter.println(msg);}} catch (IOException e) {throw new RuntimeException(e);}}
}
SocketChannel&ServerSocketChannel
服务端实现
package org.example.code_case.javaIo.聊天室实现.SockerChannel实现;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;public class ChatServer {private static final int PORT = 9999;private ByteBuffer readBuffer = ByteBuffer.allocate(1024);private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);private Selector selector;public void start() {try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();) {//绑定端口serverSocketChannel.bind(new InetSocketAddress(PORT));//设置非阻塞模式serverSocketChannel.configureBlocking(false);//注册选择器selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务器启动,监听端口:"+PORT);while (true) {//等待就绪事件发生int select = selector.select();if (select == 0) {continue;}//获取就绪事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();try {if (key.isAcceptable()) {handlerAccept(key);} else if (key.isReadable()) {handlerRead(key);} else if (key.isWritable()) {handlerWrit(key);}}catch (IOException e) {key.cancel();if (key.channel() != null) {key.channel().close();}System.out.println("客户端断开连接");}}}} catch (IOException e) {throw new RuntimeException(e);}}private void handlerWrit(SelectionKey key) {}private void handlerAccept(SelectionKey key) throws IOException {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel clientChannel = channel.accept();clientChannel.configureBlocking(false);clientChannel.register(key.selector(), SelectionKey.OP_READ);System.out.println("客户端连接成功,客户端IP地址:"+clientChannel.getRemoteAddress());//发送欢迎消息String message = "欢迎连接到聊天室!请求输入您的用户名:";writeBuffer.clear();writeBuffer.put(message.getBytes());writeBuffer.flip();clientChannel.write(writeBuffer);}private void handlerRead(SelectionKey key) throws IOException {SocketChannel channel = (SocketChannel) key.channel();readBuffer.clear();int count = channel.read(readBuffer);if (count == -1) {System.out.println("客户端断开连接:" + channel.getRemoteAddress());key.cancel();channel.close();return;}//处理读数据readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String message = new String(bytes, "UTF-8");String clientName = (String) key.attachment();if(clientName == null){//第一次收到消息,作用用户名clientName = message.trim();key.attach(clientName);System.out.println("客户端"+clientName+"上线");//广播新用户加入broadcastMessage("[系统] " + clientName + " 加入了聊天室", channel);}else{//广播聊天消息broadcastMessage("[" + clientName + "] " + message, channel);}}private void broadcastMessage(String msg, SocketChannel exceptChannel) throws IOException {for (SelectionKey key : selector.keys()) {Channel channel = key.channel();if(channel instanceof SocketChannel clientChannel && channel != exceptChannel){writeBuffer.clear();writeBuffer.put(msg.getBytes());writeBuffer.flip();clientChannel.write(writeBuffer);}}}public static void main(String[] args) {new ChatServer().start();}
}
客户端实现
package org.example.code_case.javaIo.聊天室实现.SockerChannel实现;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;public class ChatClient {private Selector selector;private SocketChannel socketChannel;private ByteBuffer readBuffer = ByteBuffer.allocate(1024);private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);public void start() throws IOException {selector = Selector.open();socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);//连接服务器socketChannel.connect(new InetSocketAddress("localhost", 9999));//注册客户端通道,监听连接时间socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("尝试连接到服务器.....");Thread thread = new Thread(this::readUserInput);thread.setDaemon(true);thread.start();//轮询选择器while (true) {int select = selector.select();if (select == 0) continue;Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();try {// 处理事件if (key.isConnectable()) {// 处理连接事件handleConnect(key);} else if (key.isReadable()) {// 处理读事件handleRead(key);}} catch (IOException e) {// 处理异常key.cancel();if (key.channel() != null) {key.channel().close();}System.err.println("处理服务器响应时出错: " + e.getMessage());return;}}}}// 处理连接事件private void handleConnect(SelectionKey key) throws IOException {SocketChannel clientChannel = (SocketChannel) key.channel();// 完成连接过程if (clientChannel.isConnectionPending()) {clientChannel.finishConnect();}System.out.println("已连接到服务器");// 注册客户端通道,监听读事件clientChannel.register(selector, SelectionKey.OP_READ);}// 处理读事件private void handleRead(SelectionKey key) throws IOException {SocketChannel clientChannel = (SocketChannel) key.channel();readBuffer.clear();int bytesRead = clientChannel.read(readBuffer);if (bytesRead == -1) {// 服务器关闭连接System.out.println("与服务器断开连接");key.cancel();clientChannel.close();return;}// 处理读取的数据readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String message = new String(bytes, "UTF-8");System.out.println(message);}// 读取用户输入并发送到服务器private void readUserInput() {Scanner scanner = new Scanner(System.in);while (true) {String userInput = scanner.nextLine();if ("exit".equalsIgnoreCase(userInput)) {try {System.out.println("退出聊天室");socketChannel.close();System.exit(0);} catch (IOException e) {System.err.println("关闭连接时出错: " + e.getMessage());}break;}try {// 发送消息到服务器writeBuffer.clear();writeBuffer.put(userInput.getBytes());writeBuffer.flip();socketChannel.write(writeBuffer);} catch (IOException e) {System.err.println("发送消息时出错: " + e.getMessage());break;}}}public static void main(String[] args) throws IOException {new ChatClient().start();}
}
尝试连接到服务器.....
已连接到服务器
欢迎连接到聊天室!请求输入您的用户名:
[系统] 用户1 加入了聊天室
用户2
你是谁
[用户1] 我是用户2
1
[用户1] 2
3
[用户1] 4
尝试连接到服务器.....
已连接到服务器
欢迎连接到聊天室!请求输入您的用户名:
用户1
[系统] 用户2 加入了聊天室
[用户2] 你是谁
我是用户2
[用户2] 1
2
[用户2] 3
4
总结
- 使用BIO同步阻塞模型实现的聊天室:服务端需要为每个客户端单独开辟线程处理数据,否则就会就会造成服务端阻塞
- 优点:代码实现简单、可读性强易理解
- 缺点:线程资源浪费、内存占用高、并发能力有限、运行效率低、拓展性差
- 使用AIO同步非阻塞模型实现的聊天室:服务端通过Selector来监控客户端连接,事件就绪时,通知程序执行相应的逻辑,避免阻塞。然后通过IO多路复用,实现单个线程可以管理多个连接
- 优点:高并发的处理能力、事件驱动模型、资源利用率高
- 缺点:代码实现复杂、可读性差不易理解