当前位置: 首页 > news >正文

NIO的callback调用方式

1.消费者

public class CallbackClient {public static void main(String[] args) {try {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));ByteBuffer writeBuffer = ByteBuffer.allocate(32);ByteBuffer readBuffer = ByteBuffer.allocate(32);getMessage(readBuffer, socketChannel);sendRandomInt(writeBuffer, socketChannel, 1000);getMessage(readBuffer, socketChannel);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}sendRandomInt(writeBuffer, socketChannel, 10);getMessage(readBuffer, socketChannel);socketChannel.close();} catch (IOException e) {}}public static void sendRandomInt(ByteBuffer writeBuffer, SocketChannel socketChannel, int bound) {Random r = new Random();int d = 0;d = r.nextInt(bound);if (d == 0)d = 1;System.out.println(d);writeBuffer.clear();writeBuffer.put(String.valueOf(d).getBytes());writeBuffer.flip();try {socketChannel.write(writeBuffer);} catch (IOException e) {e.printStackTrace();}}public static void getMessage(ByteBuffer readBuffer, SocketChannel socketChannel) {readBuffer.clear();byte[] buf = new byte[16];try {socketChannel.read(readBuffer);} catch (IOException e) {}readBuffer.flip();readBuffer.get(buf, 0, readBuffer.remaining());System.out.println(new String(buf));}
}

2.服务提供者

package com.example.demo.callback;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;public class NioServer {public static void main(String[] args) throws IOException {// 打开服务器套接字通道ServerSocketChannel serverSocket = ServerSocketChannel.open();serverSocket.configureBlocking(false);serverSocket.socket().bind(new InetSocketAddress(8000));// 打开多路复用器Selector selector = Selector.open();// 注册服务器通道到多路复用器上,并监听接入事件serverSocket.register(selector, SelectionKey.OP_ACCEPT);final ByteBuffer buffer = ByteBuffer.allocate(1024);while (true) {// 非阻塞地等待注册的通道事件selector.select();// 获取发生事件的selectionKey集合Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();// 遍历所有发生事件的selectionKeywhile (it.hasNext()) {SelectionKey key = it.next();it.remove();// 处理接入请求if (key.isAcceptable()) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);SelectionKey newKey = socketChannel.register(selector, SelectionKey.OP_WRITE, ByteBuffer.allocate(1024));//添加后可使用处理方法2处理CommonClient client = new CommonClient(socketChannel, newKey);newKey.attach(client);}// 处理读事件if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();buffer.clear();while (socketChannel.read(buffer) > 0) {buffer.flip();String receivedMessage = StandardCharsets.UTF_8.decode(buffer).toString();handleReceivedMessage(socketChannel, receivedMessage);buffer.clear();}//处理方法2CommonClient client = (CommonClient) key.attachment();client.onRead();}// 处理写事件if (key.isWritable()) {//处理方法1可以仿照方法2的格式写//处理方法2CommonClient client = (CommonClient) key.attachment();client.onWrite();}}}}// 回调函数,处理接收到的数据private static void handleReceivedMessage(SocketChannel socketChannel, String message) throws IOException {System.out.println("Received message: " + message);// 回复客户端socketChannel.write(ByteBuffer.wrap("Server received the message".getBytes(StandardCharsets.UTF_8)));}
}
public class CommonClient {private SocketChannel clientSocket;private ByteBuffer recvBuffer;private SelectionKey key;private Callback callback;private String msg;public CommonClient(SocketChannel clientSocket, SelectionKey key) {this.clientSocket = clientSocket;this.key = key;recvBuffer = ByteBuffer.allocate(8);try {this.clientSocket.configureBlocking(false);key.interestOps(SelectionKey.OP_WRITE);} catch (IOException e) {}}public void close() {try {clientSocket.close();key.cancel();}catch (IOException e){};}// an rpc to notify client to send a numberpublic void sendMessage(String msg, Callback cback)  {this.callback = cback;try {try {recvBuffer.clear();recvBuffer.put(msg.getBytes());recvBuffer.flip();clientSocket.write(recvBuffer);key.interestOps(SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}catch (Exception e) {}}// when key is writable, resume the fiber to continue// to write.public void onWrite() {sendMessage("divident", new Callback() {@Overridepublic void onSucceed(int data) {int a = data;sendMessage("divisor", new Callback() {@Overridepublic void onSucceed(int data) {int b = data;sendMessage(String.valueOf(a / b), null);}});}});}public void onRead() {int res = 0;try {try {recvBuffer.clear();// read may fail even SelectionKey is readable// when read fails, the fiber should suspend, waiting for next// time the key is ready.int n = clientSocket.read(recvBuffer);while (n == 0) {n = clientSocket.read(recvBuffer);}if (n == -1) {close();return;}System.out.println("received " + n + " bytes from client");} catch (IOException e) {e.printStackTrace();}recvBuffer.flip();res = getInt(recvBuffer);// when read ends, we are no longer interested in reading,// but in writing.key.interestOps(SelectionKey.OP_WRITE);} catch (Exception e) {}this.callback.onSucceed(res);}public int getInt(ByteBuffer buf) {int r = 0;while (buf.hasRemaining()) {r *= 10;r += buf.get() - '0';}return r;}}
    public interface Callback {public void onSucceed(int data);}
http://www.lryc.cn/news/454329.html

相关文章:

  • 百度文心智能体平台开发萌猫科研加油喵
  • Hive数仓操作(十六)
  • 第十二届蓝桥杯嵌入式省赛程序设计题解析(基于HAL库)(第一套)
  • MongoDB入门:安装及环境变量配置
  • 利用 notepad++ 初步净化 HaE Linkfinder 规则所提取的内容(仅留下接口行)
  • RCE(remote command/code execute)远程命令注入
  • ​一篇关于密码学的概念性文章
  • 什么是汽车中的SDK?
  • 利用CRITIC客观权重赋权法进行数值评分计算——算法过程
  • 一个月学会Java 第4天 运算符和数据转换
  • Stream流的终结方法(一)
  • GO网络编程(二):客户端与服务端通信【重要】
  • 快速熟悉Nginx
  • VikParuchuri/marker 学习简单总结
  • 【AI知识点】词嵌入(Word Embedding)
  • Python从入门到高手5.1节-Python简单数据类型
  • Hbase要点简记
  • RabbitMQ的各类工作模式介绍
  • 李宏毅深度学习-图神经网络GNN
  • Redis篇(缓存机制 - 分布式缓存)(持续更新迭代)
  • python交互式命令时如何清除
  • Token,Cookie,Session,JWT详解
  • opencv-rust 系列: 1, 安装及运行自带示例和测试程序
  • Linux系统编程(一):Linux平台上静态库和动态库的制作与使用
  • Nginx的基础讲解之重写conf文件
  • RIFE: Real-Time Intermediate Flow Estimation for Video Frame Interpolation
  • rabbitMq-----broker服务器
  • MAC备忘录空白解决方案
  • cnn突破七(四层bpnet网络公式与卷积核bpnet公式相关)
  • PHP中的PEAR是什么