当前位置: 首页 > news >正文

JAVA异步的TCP 通讯-客户端

一、客户端代码示例

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class AdvancedAsyncTCPClient {private static final String SERVER_HOST = "localhost";private static final int SERVER_PORT = 8888;private static final int BUFFER_SIZE = 1024;private final AsynchronousSocketChannel clientChannel;private final ExecutorService threadPool;public AdvancedAsyncTCPClient() throws IOException {// 创建异步套接字通道clientChannel = AsynchronousSocketChannel.open();// 创建一个固定大小的线程池,用于处理业务逻辑threadPool = Executors.newFixedThreadPool(5);}public void connect() {// 异步连接到服务器clientChannel.connect(new InetSocketAddress(SERVER_HOST, SERVER_PORT), null, new CompletionHandler<Void, Void>() {@Overridepublic void completed(Void result, Void attachment) {System.out.println("Connected to server: " + SERVER_HOST + ":" + SERVER_PORT);// 连接成功后开始读取服务器数据startReading();// 发送初始消息sendMessage("Hello, server!");}@Overridepublic void failed(Throwable exc, Void attachment) {System.err.println("Failed to connect to server: " + exc.getMessage());closeChannel();}});}private void startReading() {ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);// 异步读取服务器数据clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer bytesRead, ByteBuffer buffer) {if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String message = new String(data);System.out.println("Received message from server: " + message);// 继续读取服务器数据buffer.clear();clientChannel.read(buffer, buffer, this);} else if (bytesRead == -1) {// 服务器关闭连接System.out.println("Server closed the connection");closeChannel();}}@Overridepublic void failed(Throwable exc, ByteBuffer buffer) {System.err.println("Failed to read data from server: " + exc.getMessage());closeChannel();}});}public void sendMessage(String message) {ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());// 异步发送消息到服务器clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer bytesWritten, ByteBuffer buffer) {if (buffer.hasRemaining()) {// 如果还有数据未发送完,继续发送clientChannel.write(buffer, buffer, this);} else {System.out.println("Message sent to server: " + message);}}@Overridepublic void failed(Throwable exc, ByteBuffer buffer) {System.err.println("Failed to send message to server: " + exc.getMessage());closeChannel();}});}private void closeChannel() {try {System.out.println("Closing client connection");clientChannel.close();threadPool.shutdown();} catch (IOException e) {System.err.println("Error closing client channel: " + e.getMessage());}}public static void main(String[] args) {try {AdvancedAsyncTCPClient client = new AdvancedAsyncTCPClient();client.connect();} catch (IOException e) {System.err.println("Error creating client: " + e.getMessage());}}
}

二、代码分析

AdvancedAsyncTCPClient 类

  1. 构造函数:创建 AsynchronousSocketChannel 并初始化一个固定大小的线程池

  2. connect() 方法:异步连接到服务器,连接成功后开始读取服务器数据并发送初始消息。

  3. startReading() 方法:异步读取服务器发送的数据,使用 CompletionHandler 处理读取结果。

  4. sendMessage() 方法:异步发送消息到服务器,处理可能的未发送完的数据。

  5. closeChannel() 方法:关闭客户端通道并关闭线程池,处理关闭过程中可能出现的异常。

CompletionHandler

  1. 在 connect()startReading() 和 sendMessage() 方法中使用 CompletionHandler 来处理异步操作的完成结果。
  2. completed() 方法处理操作成功的情况,failed() 方法处理操作失败的情况。

线程池

        1.使用 Executors.newFixedThreadPool(5) 创建一个固定大小的线程池,用于处理业务逻辑,避免阻塞 I/O 操作。

三、优点

  • 异步 I/O:利用 Java NIO 2 的异步 I/O 特性,提高了客户端的并发处理能力。
  • 线程池:使用线程池处理业务逻辑,减少了线程创建和销毁的开销。
  • 异常处理:对各种异常情况进行了处理,增强了代码的健壮性。
  • 资源管理:在关闭客户端时,正确关闭客户端通道和线程池,避免资源泄漏。

四、注意事项

  • 该示例假设服务器运行在 localhost 的 8888 端口,你可以根据实际情况修改 SERVER_HOST 和 SERVER_PORT
  • 此代码只是一个基础示例,实际应用中可能需要根据具体需求进行扩展,如处理更复杂的消息格式、实现重连机制等。

http://www.lryc.cn/news/532371.html

相关文章:

  • MySQL的存储引擎对比(InnoDB和MyISAM)
  • 【2025-02-06】简单算法:相向双指针 盛最多水的容器 接雨水
  • 2.6-组合博弈入门
  • 【教学】推送docker仓库
  • 【大数据技术】本机PyCharm远程连接虚拟机Python
  • 3060显卡掉帧是为什么?3060掉帧卡顿解决方法
  • Kubernetes集群通过Filebeat收集日志
  • SQLAlchemy-2.0中模型定义和alembic的数据库迁移工具
  • [含文档+PPT+源码等]精品基于Python实现的django个性化健康餐计划订制系统
  • Python3中异常处理:try/except语句
  • [ Spring] Integrate Spring Boot Dubbo with Nacos 2025
  • 【3分钟极速部署】在本地快速部署deepseek
  • 【QT笔记】使用QScrollArea实现多行文本样式显示
  • 大模型中提到的超参数是什么
  • 【Uniapp-Vue3】z-paging插件组件实现触底和下拉加载数据
  • UE虚幻引擎No Google Play Store Key:No OBB found报错如何处理
  • OKHttp拦截器解析
  • STM32标准库移植RT-Thread nano
  • c++11总结26——std::regex
  • langchain教程-12.Agent/工具定义/Agent调用工具/Agentic RAG
  • leetcode_双指针 125.验证回文串
  • ML.NET库学习001:基于PCA的信用卡异常检查之样本处理与训练
  • 【华为OD机考】华为OD笔试真题解析(1)--AI处理器组合
  • edu小程序挖掘严重支付逻辑漏洞
  • 力扣 279. 完全平方数
  • 鸿蒙生态潮起:开发者的逐浪之旅
  • Diskgenius系统迁移之后无法使用USB启动
  • Kafka 可靠性探究—副本刨析
  • 我的博文天地测试报告
  • EtherCAT主站IGH-- 35 -- IGH之pdo_list.h/c文件解析