当前位置: 首页 > news >正文

JAVA TCP协议初体验

文章目录

  • 一、需求概述
  • 二、设计选择
  • 三、代码结构
  • 四、代码放送
  • 五、本地调试
    • 1. 服务端日志
    • 2. 客户端日志
    • 3. 断线重连日志
  • 六、服务器部署运行
    • 1. 源码下载
    • 2. 打包镜像
    • 3. 运行容器

一、需求概述

最近开发某数据采集系统,系统整体的数据流程图如下:

数据中心
客户端1
客户端2
客户端3
客户端4

同时,数据中心又需要下发命令到某客户端执行,客户端执行完成后将结果通知到数据中心。

二、设计选择

考虑功能点:

  1. 客户端多个,一段时间内数量可控相对固定。
  2. 客户端主动连接服务端,支持断线重连。
  3. 客户端与服务端支持双向通信。

选择TCP协议作为客户端与数据中心之间的交互协议比较合适,数据中心服务器作为tcp-server开放端口供tcp-client连接。

三、代码结构

在这里插入图片描述

四、代码放送

https://gitcode.com/00fly/tcp-show

或者使用下面的备份文件恢复成原始的项目代码

如何恢复,请移步查阅:神奇代码恢复工具

//goto docker\docker-compose.yml
version: '3.7'
services:tcp-server:image: registry.cn-shanghai.aliyuncs.com/00fly/tcp-show-server:0.0.1container_name: tcp-serverdeploy:resources:limits:cpus: '1.0'memory: 64Mreservations:cpus: '0.05'memory: 64Mports:- 8000:8000restart: on-failurelogging:driver: json-fileoptions:max-size: '5m'max-file: '1'tcp-client:image: registry.cn-shanghai.aliyuncs.com/00fly/tcp-show-client:0.0.1container_name: tcp-clientdepends_on:- tcp-serverdeploy:resources:limits:cpus: '1.0'memory: 64Mreservations:cpus: '0.05'memory: 64Mrestart: on-failureenvironment:#- TCP_SERVER=192.168.15.202- TCP_SERVER=tcp-serverlogging:driver: json-fileoptions:max-size: '5m'max-file: '1'
//goto docker\restart-server.sh
#!/bin/bash
docker-compose down tcp-server
sleep 10
docker-compose up -d tcp-server
docker logs -f tcp-server
//goto docker\restart.sh
#!/bin/bash
docker-compose down && docker-compose up -d
sleep 2
docker logs -f network-server
//goto docker\stop.sh
#!/bin/bash
docker-compose down
//goto Dockerfile
#基础镜像
#FROM openjdk:8-jre-alpine
FROM adoptopenjdk/openjdk8-openj9:alpine-slimRUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \echo 'Asia/Shanghai' >/etc/timezone#拷贝发布包
COPY target/*.jar  /app.jar#启动脚本
ENTRYPOINT ["java", "-Djava.security.egd=file:/dev/./urandom", "-Xshareclasses", "-Xquickstart", "-jar", "/app.jar"]
//goto pom-client.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.fly</groupId><artifactId>tcp-show</artifactId><version>0.0.1</version><name>tcp-show</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub><java.version>1.8</java.version><skipTests>true</skipTests></properties><dependencies><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope></dependency></dependencies><build><finalName>${project.artifactId}-client-${project.version}</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- 方式一:带dependencies运行包 --><plugin><artifactId>maven-assembly-plugin</artifactId><version>3.5.0</version><configuration><!-- 是否添加assemblyId --><appendAssemblyId>false</appendAssemblyId><archive><manifest><mainClass>com.fly.protocol.tcp.run.StartClient</mainClass></manifest></archive><descriptorRefs><!--将所有外部依赖JAR都加入生成的JAR包 --><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><!-- 配置执行器 --><id>make-assembly</id><phase>package</phase><!-- 绑定到package阶段 --><goals><goal>single</goal><!-- 只运行一次 --></goals></execution></executions></plugin><!-- 添加docker-maven插件 --><plugin><groupId>io.fabric8</groupId><artifactId>docker-maven-plugin</artifactId><version>0.40.3</version><executions><execution><phase>package</phase><goals><goal>build</goal><!--<goal>push</goal>--><!--<goal>remove</goal>--></goals></execution></executions><configuration><!-- 连接到带docker环境的linux服务器编译image --><!-- <dockerHost>http://192.168.182.10:2375</dockerHost> --><!-- Docker 推送镜像仓库地址 --><pushRegistry>${docker.hub}</pushRegistry><images><image><name>${docker.hub}/00fly/${project.artifactId}-client:${project.version}</name><build><dockerFileDir>${project.basedir}</dockerFileDir></build></image></images></configuration></plugin></plugins></build>
</project>
//goto pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.fly</groupId><artifactId>tcp-show</artifactId><version>0.0.1</version><name>tcp-show</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub><java.version>1.8</java.version><skipTests>true</skipTests></properties><dependencies><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope></dependency></dependencies><build><finalName>${project.artifactId}-server-${project.version}</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- 方式一:带dependencies运行包 --><plugin><artifactId>maven-assembly-plugin</artifactId><version>3.5.0</version><configuration><!-- 是否添加assemblyId --><appendAssemblyId>false</appendAssemblyId><archive><manifest><mainClass>com.fly.protocol.tcp.run.StartServer</mainClass></manifest></archive><descriptorRefs><!--将所有外部依赖JAR都加入生成的JAR包 --><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><!-- 配置执行器 --><id>make-assembly</id><phase>package</phase><!-- 绑定到package阶段 --><goals><goal>single</goal><!-- 只运行一次 --></goals></execution></executions></plugin><!-- 添加docker-maven插件 --><plugin><groupId>io.fabric8</groupId><artifactId>docker-maven-plugin</artifactId><version>0.40.3</version><executions><execution><phase>package</phase><goals><goal>build</goal><!--<goal>push</goal>--><!--<goal>remove</goal>--></goals></execution></executions><configuration><!-- 连接到带docker环境的linux服务器编译image --><!-- <dockerHost>http://192.168.182.10:2375</dockerHost> --><!-- Docker 推送镜像仓库地址 --><pushRegistry>${docker.hub}</pushRegistry><images><image><name>${docker.hub}/00fly/${project.artifactId}-server:${project.version}</name><build><dockerFileDir>${project.basedir}</dockerFileDir></build></image></images></configuration></plugin></plugins></build>
</project>
//goto src\main\java\com\fly\protocol\tcp\bio\TcpClient.java
package com.fly.protocol.tcp.bio;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;import lombok.extern.slf4j.Slf4j;@Slf4j
public class TcpClient implements Runnable
{private String ip;private int port;private Socket socket;private DataOutputStream dataOutputStream;private String clientName;private boolean isClientCoreRun = false;private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);private ExecutorService executor = Executors.newFixedThreadPool(2);public TcpClient(String clientName){super();this.clientName = clientName;}/*** * @param ip 服务端IP* @param port 服务端PORT* @return*/public boolean connectServer(String ip, int port){try{this.ip = ip;this.port = port;socket = new Socket(InetAddress.getByName(ip), port);log.info("****** TcpClient will connect to Server {}:{}", ip, port);scheduler.scheduleAtFixedRate(this::checkConnection, 0, 10, TimeUnit.SECONDS);isClientCoreRun = true;dataOutputStream = new DataOutputStream(socket.getOutputStream());dataOutputStream.writeUTF(clientName);dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());isClientCoreRun = false;}return isClientCoreRun;}/*** 检查TCP连接*/private void checkConnection(){if (socket == null || socket.isClosed()){log.error("Connection lost, attempting to reconnect");reconnect();}}private void reconnect(){try{socket = new Socket(InetAddress.getByName(ip), port);log.info("****** TcpClient will connect to Server {}:{}", ip, port);isClientCoreRun = true;executor.execute(new ReceiveMsg());dataOutputStream = new DataOutputStream(socket.getOutputStream());dataOutputStream.writeUTF(clientName);dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());isClientCoreRun = false;}}/*** 发送报文*/public void sendMsg(String msg){try{dataOutputStream.writeUTF(msg);dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());closeClientConnect();}}/*** 断开客户端与服务端的连接*/public void closeClientConnect(){if (dataOutputStream != null){try{dataOutputStream.close();isClientCoreRun = false;if (socket != null){socket.close();}}catch (IOException e){log.error(e.getMessage());}}}@Overridepublic void run(){executor.execute(new ReceiveMsg());// 发送数据scheduler.scheduleAtFixedRate(() -> {sendMsg(RandomStringUtils.randomAlphanumeric(10));}, RandomUtils.nextInt(1, 10), 10, TimeUnit.SECONDS);}class ReceiveMsg implements Runnable{private DataInputStream dataInputStream;public ReceiveMsg(){try{// 数据输入流dataInputStream = new DataInputStream(socket.getInputStream());}catch (IOException e){log.error(e.getMessage());}}@Overridepublic void run(){try{// server停止后, 会影响接受消息线程工作while (isClientCoreRun){String msg = dataInputStream.readUTF();log.info("{} get msg: {}", clientName, msg);}}catch (IOException e){log.error(e.getMessage());// 防止重连失败closeClientConnect();}}}
}
//goto src\main\java\com\fly\protocol\tcp\bio\TcpServer.java
package com.fly.protocol.tcp.bio;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;import lombok.extern.slf4j.Slf4j;@Slf4j
public class TcpServer implements Runnable
{private ServerSocket serverSocket;private boolean isServerCoreRun = false;private Map<String, NewClient> allClient = new HashMap<>();public boolean startServer(String ip, int port){try{serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(ip, port));isServerCoreRun = true;}catch (IOException e){log.error(e.getMessage());isServerCoreRun = false;}return isServerCoreRun;}/*** 关闭服务** #1 断开与所有客户端的连接,并将客户端容器中的所有已连接的客户端清空。 #2 关闭服务器套接字*/public void closeServer(){try{isServerCoreRun = false;for (Map.Entry<String, NewClient> all : this.allClient.entrySet()){all.getValue().isNewClientRun = false;all.getValue().socket.close();}allClient.clear();serverSocket.close();}catch (IOException e){log.error(e.getMessage());}}/*** 向客户端发送报文*/public void sendMsg(String clientName, String msg){if (allClient.containsKey(clientName)){allClient.get(clientName).sendMsg(msg);}}@Overridepublic void run(){try{log.info("TcpServer will start");while (isServerCoreRun){// 阻塞式等待客户端连接Socket socket = serverSocket.accept();String clientName = new DataInputStream(socket.getInputStream()).readUTF();String clientIP = socket.getInetAddress().getHostAddress();int clientPort = socket.getPort();String clientConnectDateTime = DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");NewClient newClient = new NewClient(socket, clientName, clientIP, clientPort, clientConnectDateTime);allClient.put(clientName, newClient);log.info("**** add new client ===> {}", allClient.keySet());new Thread(newClient).start();}}catch (IOException e){log.error(e.getMessage());}}class NewClient implements Runnable{// 客户端套接字private Socket socket;// 数据输入流private DataInputStream dataInputStream;// 数据输出流private DataOutputStream dataOutputStream;// 客户端运行(收、发报文)状态private boolean isNewClientRun = true;// 客户端的名称private String clientName;// 客户端的IP地址private String clientIP;public NewClient(){}// 构造方法初始化成员属性public NewClient(Socket socket, String clientName, String clientIP, int clientPort, String clientConnectDateTime){this.socket = socket;this.clientName = clientName;this.clientIP = clientIP;try{// 创建客户端数据输入、输出流dataInputStream = new DataInputStream(socket.getInputStream());dataOutputStream = new DataOutputStream(socket.getOutputStream());}catch (IOException e){log.error(e.getMessage());closeCurrentClient();}}@Overridepublic void run(){try{// 客户端在运行才能收发报文while (this.isNewClientRun){// 获取到客户端发送的报文String msg = dataInputStream.readUTF();if (StringUtils.isNotBlank(msg)){log.info("clientName: {}, clientIP: {}, send msg ===> {}", clientName, clientIP, msg);}// 向客户端传送数据int index = 0;for (String key : allClient.keySet()){index++;if (StringUtils.equals(key, clientName)){allClient.get(key).sendMsg("from server: " + msg + StringUtils.repeat("-----", index));}}}}catch (IOException e){log.error(e.getMessage());closeCurrentClient();}}/*** 断开当前客户端的连接释放资源*/public void closeCurrentClient(){try{// 结束客户端的运行状态isNewClientRun = false;// 断开数据输出出流if (dataOutputStream != null){dataOutputStream.close();}// 断开数据输入出流if (dataInputStream != null){dataInputStream.close();}// 断开客户端套解析if (socket != null){socket.close();}// 将该客户端从客户端容器中删除allClient.remove(clientName);log.info("**** remove client ===> {}", allClient.keySet());}catch (IOException e){log.error(e.getMessage());}}/*** 发送报文*/public void sendMsg(String msg){try{// 发送报文dataOutputStream.writeUTF(msg);// 清空报文缓存dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());closeCurrentClient();}}}
}
//goto src\main\java\com\fly\protocol\tcp\run\StartClient.java
package com.fly.protocol.tcp.run;import java.util.stream.IntStream;import org.apache.commons.lang3.StringUtils;import com.fly.protocol.tcp.bio.TcpClient;public class StartClient
{public static void main(String[] args){// docker环境下优先使用docker-compose中environment值String serverIp = StringUtils.defaultIfBlank(System.getenv().get("TCP_SERVER"), "127.0.0.1");IntStream.rangeClosed(1, 3).forEach(i -> {TcpClient client = new TcpClient("CLIENT_" + i);if (client.connectServer(serverIp, 8000)){new Thread(client).start();}});}
}
//goto src\main\java\com\fly\protocol\tcp\run\StartServer.java
package com.fly.protocol.tcp.run;import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;import com.fly.protocol.tcp.bio.TcpServer;public class StartServer
{public static void main(String[] args){TcpServer server = new TcpServer();if (server.startServer("0.0.0.0", 8000)){Executors.newScheduledThreadPool(2).scheduleAtFixedRate(() -> {int index = RandomUtils.nextInt(1, 4);server.sendMsg("CLIENT_" + index, "random: " + RandomStringUtils.randomAlphanumeric(10));}, 10, 60, TimeUnit.SECONDS);new Thread(server).start();}}
}
//goto src\main\resources\log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="off" monitorInterval="0"><appenders><console name="Console" target="system_out"><patternLayoutpattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n" /></console></appenders><loggers><root level="INFO"><appender-ref ref="Console" /></root></loggers>
</configuration>

五、本地调试

先后启动StartServer、StartClient

1. 服务端日志

在这里插入图片描述

2. 客户端日志

在这里插入图片描述

3. 断线重连日志

在这里插入图片描述

六、服务器部署运行

1. 源码下载

在安装好jdk、maven、docker环境的服务器下载源码

git clone https://gitcode.com/00fly/tcp-show.git

2. 打包镜像

#server打包
mvn clean package#client打包
mvn clean package -f pom-client.xml

3. 运行容器

上传docker文件目录到服务器,执行

sh restart.shsh restart-server.shdocker logs -f tcp-server
docker logs -f tcp-client

有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!

-over-

http://www.lryc.cn/news/449232.html

相关文章:

  • sqlserver迁移数据库文件存储位置
  • 配置项取值给静态类用
  • 【vs code(cursor) ssh连不上服务器】但是 Terminal 可以连上,问题解决 ✅
  • Go基础学习06-Golang标准库container/list(双向链表)深入讲解;延迟初始化技术;Element;List;Ring
  • 多层时间轮原理以及使用
  • 鸿蒙HarmonyOS开发生态
  • vue中使用jsencrypt加密
  • SpirngBoot核心思想之一AOP
  • 足球预测推荐软件:百万数据阐述百年足球历史-大数据模型量化球员成就值
  • AD中如何批量修改丝印的大小,节省layout时间
  • Ps:堆栈
  • 獨立IP和共用IP有什麼區別?
  • 枢纽云EKP数字门户模板上线!轻松复刻胖东来官网,实现数字化逆袭
  • 从自动化到智能化:AI如何推动业务流程自动化
  • Selenium与数据库结合:数据爬取与存储的技术实践
  • 在 Docker 中进入 Redis 容器后,可以通过以下方法查看 Redis 版本:
  • Windows 10 系统安装 FFmpeg 查看、转换、编辑音频文件
  • 反调试防护-API
  • 【视频讲解】非参数重采样bootstrap逻辑回归Logistic应用及模型差异Python实现
  • Linux系统中命令wc
  • redis集群部署
  • VUE条件树查询
  • vue框架学习 -- 日历控件 FullCalendar 使用总结
  • [数据集][目标检测]猪数据集VOC-2856张
  • 工业制造场景中的设备管理深度解析
  • OpenCV图像文件读写(3)统计多页图像文件中的页面数量函数imcount()的使用
  • 【数据治理-构建数据标准体系】
  • AI新方向:OpenAI o1是一个更擅长思考的模型系列:高级推理+逻辑严密+更广泛的知识,用于解决复杂的逻辑问题,慢思考
  • Laravel部署后,CPU 使用率过高
  • Rust调用tree-sitter支持自定义语言解析