Netty-NIO
文章目录
- 一、NIO-Selector
- 1.处理accept
- 2.cancel
- 3.处理read
- 4.处理客户端断开
- 5. 处理消息的边界
- 6. 写入内容过多的问题
- 7. 处理可写事件
一、NIO-Selector
1.处理accept
//1.创建selector,管理多个channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系(注册)
//SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
//四个事件:
//accept 会在有连接请求时触发
//connect 是客户端,连接建立后触发
//read 可读事件
//write 可写事件
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){//3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行selector.select();//4.处理事件,selectedKeys内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys.iterator();while(iter.next()){SelectionKey key = iter.next();ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();}
}
2.cancel
//1.创建selector,管理多个channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系(注册)
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){//3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行//select在事件未处理时,它不会阻塞,事件发生后要么处理,要么取消,不能置之不理selector.select();//4.处理事件,selectedKeys内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys.iterator();while(iter.next()){SelectionKey key = iter.next();key.cancel();}
}
3.处理read
用完key必须要remove
//1.创建selector,管理多个channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系(注册)
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){//3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行selector.select();//4.处理事件,selectedKeys内部包含了所有发生的事件//selector会在发生事件后,向集合中加入key,但不会删除Iterator<SelectionKey> iter = selector.selectedKeys.iterator();while(iter.next()){SelectionKey key = iter.next();//处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题iter.remove();//5.区分事件类型if(key.isAcceptable()){ //如果是acceptServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);}elseif(key.isReadable()){//拿到触发事件的channelServerSocketChannel channel = (ServerSocketChannel)key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);channel.read(buffer);buffer.flip();debugRead(buffer);}}
}
4.处理客户端断开
//1.创建selector,管理多个channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系(注册)
SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){ //3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行 selector.select(); //4.处理事件,selectedKeys内部包含了所有发生的事件 //selector会在发生事件后,向集合中加入key,但不会删除 Iterator<SelectionKey> iter = selector.selectedKeys.iterator(); while(iter.next()){ SelectionKey key = iter.next(); //处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题 iter.remove(); //5.区分事件类型 if(key.isAcceptable()){ //如果是accept ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept();sc.configureBlocking(false); SelectionKey sckey = sc.register(selector, 0, null); scKey.interestOps(SelectionKey.OP_READ); }elseif(key.isReadable()){ try{ //拿到触发事件的channel ServerSocketChannel channel = (ServerSocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(16); int read = channel.read(buffer);//如果是正常断开,read的方法的返回值是-1 if(read == -1){ key.cancel(); }else{ buffer.flip(); debugRead(buffer); } }catch(IOException e){ e.printStackTrace();//因为客户端断开了,因此需要将key取消(从selector 的keys集合中真正删除key) key.cancel();}}}
}
5. 处理消息的边界
- 固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
- 按分隔符拆分,缺点是效率低
- TLV格式,Type类型,Length长度,Value数据,可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,影响server吞吐量
- Http1.1是TLV格式
- Http2.0是LTV格式
private static void split(ByteBuffer source){source.flip();for(int i = 0; i < source.limit(); i++){//找到一条完整消息if(source.get(i) == '\n'){int length = i + 1 -source.position();//把这条完整消息存入新的ByteBufferByteBuffer target = ByteBuffer.allocate(length);//从source读,向target写for(int j = 0; j < length; j++){target.put(source.get());}debugAll(target);}}source.compact();
}public static void main(){//1.创建selector,管理多个channelSelector selector = Selector.open(); ByteBuffer buffer = ByteBuffer.allocate(16); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);//2.建立selector和channel的联系(注册)SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); while(true){ //3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行 selector.select(); //4.处理事件,selectedKeys内部包含了所有发生的事件 //selector会在发生事件后,向集合中加入key,但不会删除 Iterator<SelectionKey> iter = selector.selectedKeys.iterator(); while(iter.next()){ SelectionKey key = iter.next(); //处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题 iter.remove(); //5.区分事件类型 if(key.isAcceptable()){ //如果是accept ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept();sc.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); //attachment附件//将一个byteBuffer作为附件关联到selectionKey上SelectionKey sckey = sc.register(selector, 0, buffer); scKey.interestOps(SelectionKey.OP_READ); }elseif(key.isReadable()){ try{ //拿到触发事件的channel ServerSocketChannel channel = (ServerSocketChannel)key.channel(); //获取selectionKey上关联的附件ByteBuffer buffer = (ByteBuffer)key.attatchment();int read = channel.read(buffer);//如果是正常断开,read的方法的返回值是-1 if(read == -1){ key.cancel(); }else{ split(buffer);if(buffer.position() == buffer.limit()){//扩容ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);buffer.flip();newBuffer.put(buffer);//复制key.attach(newbuffer);//替换掉key上原有的buffer}} }catch(IOException e){ e.printStackTrace();//因为客户端断开了,因此需要将key取消(从selector 的keys集合中真正删除key) key.cancel();}}}}
}
6. 写入内容过多的问题
//服务器
public static void main(){ServerSocketChannel ssc = ServerSocketChannrl.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(trye){selector.select();Iterator<SelectionKey> iter = selector.selectedKeys.iterator();while(iter.hasNext()){SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);//1.向客户端发送大量数据StringBuilder sb = new StringBuilder();for(int i = 0; i < 3000000; i++){sb.append("a");}BytrBuffer buffer = Charset.defaultCharset().encode(sb.toString());//不符合非阻塞模式while(buffer.hasRemaining()){//2.返回值代表实际写入的字节数//不能一次性写完//write == 0 缓冲区满,写不了int write = sc.write(buffer);System.out.println(write):}}}}
}//客户端
public static void main(){SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));//3.接收数据int count = 0;while(true){ByteBuffer buffer = ByteBuffer.allocate(1024*1024);count += sc.read(buffer);System.out.println(count);buffer.clear();}
}
7. 处理可写事件
//服务器
public static void main(){ServerSocketChannel ssc = ServerSocketChannrl.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(trye){selector.select();Iterator<SelectionKey> iter = selector.selectedKeys.iterator();while(iter.hasNext()){SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, 0, null);sckey.interestOps(SelectionKey.OP_READ);//1.向客户端发送大量数据StringBuilder sb = new StringBuilder();for(int i = 0; i < 3000000; i++){sb.append("a");}BytrBuffer buffer = Charset.defaultCharset().encode(sb.toString());//2.返回值代表实际写入的字节数//不能一次性写完//先写一次int write = sc.write(buffer);System.out.println(write)://3.判断是否有剩余内容while(buffer.hasRemaining()){//4.关注可写事件sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);//sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);//5.把未写完的数据挂到sckey上sckey.attach(buffer);}}elseif(key.isWritable())[ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel)key.channel();int write = sc.write(buffer);System.out.println(write)://6.清理操作,内存释放if(!buffer.haeRemaining()){key.attach(null);//需要清除bufferkey.interestOps(key.interestOps() - SelectionKey.OP_WRITE);//不需关注可写事件}}}}
}