当前位置: 首页 > news >正文

Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)

单线程Reactor

package org.example.utils.echo.single;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;public class EchoServerReactor implements Runnable{Selector selector;ServerSocketChannel serverSocketChannel;EchoServerReactor() throws IOException {//Reactor初始化selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();InetSocketAddress address =new InetSocketAddress("localhost",8848);//非阻塞serverSocketChannel.configureBlocking(false);//分步处理,第一步,接收accept事件SelectionKey sk =serverSocketChannel.register(selector,0,new AcceptorHandler());// SelectionKey.OP_ACCEPTserverSocketChannel.socket().bind(address);System.out.println("服务端已经开始监听:"+address);sk.interestOps(SelectionKey.OP_ACCEPT);}@Overridepublic void run() {try {while (!Thread.interrupted()){selector.select();Set<SelectionKey>  selected=selector.selectedKeys();Iterator<SelectionKey> it=selected.iterator();while (it.hasNext()){SelectionKey sk=it.next();dispatch(sk);}selected.clear();}} catch (IOException e) {throw new RuntimeException(e);}}private void dispatch(SelectionKey sk) {Runnable handler=(Runnable) sk.attachment();if (handler!=null){handler.run();}}class AcceptorHandler implements Runnable{@Overridepublic void run() {try {SocketChannel channel=serverSocketChannel.accept();if (channel!=null)new EchoHandler(selector,channel);} catch (IOException e) {throw new RuntimeException(e);}}}public static void main(String[] args) throws IOException {new Thread(new EchoServerReactor()).start();}
}
package org.example.utils.echo.single;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;public class EchoHandler implements Runnable{final SocketChannel channel;final SelectionKey sk;final ByteBuffer byteBuffer=ByteBuffer.allocate(1024);static final int RECIEVING=0,SENDING=1;int state=RECIEVING;EchoHandler(Selector selector,SocketChannel c) throws IOException {channel=c;c.configureBlocking(false);sk=channel.register(selector,0);sk.attach(this);sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {try {if (state==SENDING){channel.write(byteBuffer);byteBuffer.clear();sk.interestOps(SelectionKey.OP_READ);state=RECIEVING;}else if (state==RECIEVING){int length=0;while ((length=channel.read(byteBuffer))>0){System.out.println(new String(byteBuffer.array(),0,length));}byteBuffer.flip();sk.interestOps(SelectionKey.OP_WRITE);state=SENDING;}} catch (IOException e) {throw new RuntimeException(e);}}
}

结果:

原理无非就是:

多线程,无非就是搞多个Reactor   ,   一个专门接受accept  ,  一个专门dispatch ,  再搞一个多线程池处理handle

这里面最主要的就是

handle类,sk.attach(this);把对象传回reactor

参考文献:

java高并发核心编程. 卷1,NIO、Netty、Redis、ZooKeeper  (尼恩)

http://www.lryc.cn/news/254944.html

相关文章:

  • NoSQL大数据存储技术测试题(参考答案)
  • Python查看文件列表
  • INA219电流感应芯片_程序代码
  • FlinkSql-Temporal Joins-Lookup Join
  • STM32之定时器
  • Canvas鼠标画线
  • Docker 安装部署 Sentinel Dashboard
  • 第21章网络通信
  • 一、运行时数据区域
  • OCR原理解析
  • 使用com组件编辑word
  • 国产Euler(欧拉)系统安装docker
  • Linux 进程控制
  • [ Linux Audio 篇 ] 音频开发入门基础知识
  • 关于高校电子邮件系统开通双因素认证的经验分享-以清华大学为例
  • 「Swift」类淘宝商品瀑布流展示
  • 道可云会展元宇宙平台全新升级,打造3D沉浸式展会新模式
  • Ant Design Pro初始化报错
  • 第16届中国R会议暨2023X-AGI大会开幕,和鲸科技分享ModelOps在数据科学平台中的实践与应用
  • ❀My学习Linux命令小记录(12)❀
  • MySQL学习day05
  • JAVA面试题7
  • 好用免费的AI换脸5个工具
  • 【Linux】公网远程访问AMH服务器管理面板
  • 随笔-这都是命吗
  • 优化网站性能,从容谈CDN加速的部署与运维
  • JavaScript-事件
  • linux的磁盘管理
  • qt-C++笔记之主线程中使用异步逻辑来处理ROS事件循环和Qt事件循环解决相互阻塞的问题
  • 【Docker】从零开始:18.使用Dockerfile构造自己的KingbaseES数据库镜像