当前位置: 首页 > news >正文

【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程

系列文章目录

【zookeeper核心源码解析】第一课:zk启动类核心流程序列图
【zookeeper核心源码解析】第二课:俯瞰QuorumPeer启动核心流程,实现选举关键流程
【zookeeper核心源码解析】第三课:leader与follower何时开始同步,如何同步数据
【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程

【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程

  • 系列文章目录
  • 1. 先看服务端初始化与连接构建的准备
  • 2. 客户端代码


1. 先看服务端初始化与连接构建的准备

在第一节中,介绍到NIOServerCnxnFactory的初始化,该类其实就是专门为客户端读写数据准备的服务端。主要构建连接与数据读写。

c class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable

在run方法中构建连接与io读写,具体代码如下:

public void run() {while (!ss.socket().isClosed()) {try {selector.select(1000);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);for (SelectionKey k : selectedList) {if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();InetAddress ia = sc.socket().getInetAddress();int cnxncount = getClientCnxnCount(ia);if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){LOG.warn("Too many connections from " + ia+ " - max is " + maxClientCnxns );sc.close();} else {LOG.info("Accepted socket connection from "+ sc.socket().getRemoteSocketAddress());sc.configureBlocking(false);SelectionKey sk = sc.register(selector,SelectionKey.OP_READ);NIOServerCnxn cnxn = createConnection(sc, sk);sk.attach(cnxn);addCnxn(cnxn);}} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {NIOServerCnxn c = (NIOServerCnxn) k.attachment();c.doIO(k);} else {if (LOG.isDebugEnabled()) {LOG.debug("Unexpected ops in select "+ k.readyOps());}}}selected.clear();} catch (RuntimeException e) {LOG.warn("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn("Ignoring exception", e);}}closeAll();LOG.info("NIOServerCnxn factory exited run method");}

2. 客户端代码

ClientCnxn 类是客户端的入口代码。

/*** This class manages the socket i/o for the client. ClientCnxn maintains a list* of available servers to connect to and "transparently" switches servers it is* connected to as needed.**/

里面的EventThread专本对数据进行异步读写。感兴趣可以从run()方法进去看

       @Overridepublic void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");}
http://www.lryc.cn/news/512167.html

相关文章:

  • 强化学习蘑菇书笔记
  • 《机器学习》——线性回归模型
  • Linux(Centos 7.6)网卡信息没有了问题处理
  • WEB攻防-通用漏洞-文件上传-js验证-MIME验证-user.ini-语言特征
  • mybatis-plus代码生成器
  • 12.24-12.28Mysql锁阅读笔记
  • 支持最新 mysql9的workbench8.0.39 中文汉化教程来了
  • golang连接jenkins构建build
  • SCAU高程进阶题(自用)
  • 基于STM32F103控制L298N驱动两相四线步进电机
  • libreoffice在Windows和Linux环境的安装和结合Springboot使用教程
  • 前端开发 -- 自动回复机器人【附完整源码】
  • vue+echarts实现疫情折线图
  • 服务器nfs文件共享
  • 基于Vue+SSM+SpringCloudAlibaba的科目课程管理系统
  • vue3配置caddy作为静态服务器,在浏览器地址栏刷新出现404
  • 深入理解委托:C# 编程中的强大工具
  • 【Java 数据结构】合并两个有序链表
  • 基于微信小程序的校园访客登记系统
  • uniapp 判断多选、选中取消选中的逻辑处理
  • php8.0版本更新了哪些内容
  • Browser Use:AI智能体自动化操作浏览器的开源工具
  • Android笔记(四十):ViewPager2嵌套RecyclerView滑动冲突进一步解决
  • POS系统即销售点系统 文档与数据库设计
  • 安全合规遇 AI 强援:深度驱动行业发展新引擎 | 倍孜网络CEO聂子尧出席ICT深度观察报告会!
  • 算法进阶:贪心算法
  • C++ 设计模式:工厂方法(Factory Method)
  • 手机联系人 查询 添加操作
  • 【LeetCode】2506、统计相似字符串对的数目
  • 金仓数据库对象访问权限的管理