当前位置: 首页 > news >正文

Java IO流(五)Netty实战[TCP|Http|心跳检测|Websocket]

Netty入门代码示例(基于TCP服务)

Server端

package com.bierce.io.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
public class NettyServer {public static void main(String[] args) throws InterruptedException {//创建BossGroup和WorkerGroup线程池组,均属于自旋状态EventLoopGroup bossGroup = new NioEventLoopGroup(); //负责连接请求处理EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行业务处理try {//创建服务器端启动,通过链式编程配置相关参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //设置为服务端通道.option(ChannelOption.SO_BACKLOG,128) //设置线程队列等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() { //匿名创建通道初始对象@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyServerHandler()); //为workerGroup下的NioEventLoop对应管道pipeline设置自定义处理器}});System.out.println("Server is start Successful !!!");ChannelFuture cf = bootstrap.bind(6668).sync(); //绑定指定端口并同步处理cf.channel().closeFuture().sync(); //监听关闭通道方法}finally { //关闭线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取客户端发送的数据//ctx:上下文对象,包含管道pipeline,通道等信息//msg:即客户端发送的数据ByteBuf buf = (ByteBuf) msg; //ByteBuf是Netty提供的缓冲区,性能更高System.out.println("客户端发送过来的msg = " + buf.toString((CharsetUtil.UTF_8)));System.out.println("客户端地址 = " + ctx.channel().remoteAddress());}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读取客户端信息完成后进行的业务处理
//        super.channelReadComplete(ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client!",CharsetUtil.UTF_8)); //将数据写到缓存并刷新}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close(); //处理异常需要关闭通道}
}

 Client

package com.bierce.io.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
public class NettyClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //客户端事件循环组try {Bootstrap bootstrap = new Bootstrap(); //客户端启动对象bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) //客户端通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyClientHandler()); //添加自定义处理器}});System.out.println("Client Start Successful!!!");ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();sync.channel().closeFuture().sync(); //监听关闭通道}finally {eventLoopGroup.shutdownGracefully();}}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { //通道就绪会触发该方法ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8)); //将数据写到缓存并刷新}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取服务端返回信息ByteBuf buf = (ByteBuf) msg; //ByteBuf是Netty提供的缓冲区,性能更高System.out.println("服务端发送过来的msg = " + buf.toString((CharsetUtil.UTF_8)));System.out.println("服务端地址 = " + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close(); //处理异常需要关闭通道}
}

运行结果

Server
Client

Netty入门代码示例(基于HTTP服务)

package com.bierce.io.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;import java.net.URI;public class TestServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {//创建服务器端启动,通过链式编程配置相关参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //设置为服务端通道.childHandler(new TestServerInitializer()); //设置为自定义的初始化System.out.println("Server is start Successful !!!");ChannelFuture cf = bootstrap.bind(9999).sync(); //绑定指定端口并同步处理cf.channel().closeFuture().sync(); //监听关闭通道方法}finally { //关闭线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class TestServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//HttpServerCodec是Netty提供的处理Http的编-解码器 使用io.netty:netty-all:4.1.20Final版本,其他版本不支持会报错pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());//增加自定义的handlerpipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());}
}
class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {//读取客户端数据@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {if (httpObject instanceof HttpRequest){System.out.println("httpObject Type = " + httpObject.getClass());System.out.println("Client Address = " + channelHandlerContext.channel().remoteAddress());//对特定资源进行过滤HttpRequest httpRequest = (HttpRequest)httpObject;URI uri = new URI(httpRequest.getUri());if ("/favicon.ico".equals(uri.getPath())){System.out.println("favicon.ico资源不做响应");return;}//回复浏览器信息(http协议)ByteBuf content = Unpooled.copiedBuffer("Hello, I'm Server", CharsetUtil.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());channelHandlerContext.writeAndFlush(response);}}
}

运行结果

Netty心跳检测机制

package com.bierce.io.netty.heartbeat;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;
public class MyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler:Netty提供的处理空闲状态的处理器//readerIdleTime:多长时间没有读操作,会发送心跳检测包检测是否连接//writerIdleTime:多长时间没有写操作,会发送心跳检测包检测是否连接//allIdleTime:多长时间没有读写操作,会发送心跳检测包检测是否连接pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));//IdleStateHandler触发后,将传递给下一个Handler的userEventTriggered方法去处理//通过自定义的Handler对空闲状态进一步处理pipeline.addLast(new MyServerHandler());}});ChannelFuture sync = bootstrap.bind(9999).sync().channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class MyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent IdleStateEvent = (IdleStateEvent) evt;String eventType = null;switch (IdleStateEvent.state()){case READER_IDLE:eventType = "读空闲";break;case WRITER_IDLE:eventType = "写空闲";break;case ALL_IDLE:eventType = "读写空闲";break;}System.out.println(ctx.channel().remoteAddress() + "-已超时,超时类型为: " + eventType );System.out.println("Server will deal with it instantly...");//发生空闲则关闭当前通道ctx.close();}}
}
class NettyClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //客户端事件循环组try {Bootstrap bootstrap = new Bootstrap(); //客户端启动对象bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) //客户端通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyClientHandler()); //添加自定义处理器}});System.out.println("Client Start Successful!!!");ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();sync.channel().closeFuture().sync(); //监听关闭通道}finally {eventLoopGroup.shutdownGracefully();}}
}

 注意: 需要调整readerIdleTime|writerIdleTime|allIdleTime参数才会显示对应超时信息

Netty入门代码示例(基于WebSocket协议)

服务端

package com.bierce.websocket;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;import java.time.LocalDateTime;public class MyWebsocketServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//基于Http协议,所以需要http解码编码pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ChunkedWriteHandler()); //处理块方式的写操作//http传输过程数据量非常大时会分段,而HttpObjectAggregator可以将多个分段聚合pipeline.addLast(new HttpObjectAggregator(8192));//webSocket采用帧方式传输数据//WebSocketServerProtocolHandler作用是将http协议升级为ws协议,且保持长连接pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));pipeline.addLast(new MyWebsocketServerHandler());}});ChannelFuture sync = bootstrap.bind(9999).sync().channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class MyWebsocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {System.out.println("Server receive the Info: " + msg.text());ctx.channel().writeAndFlush(new TextWebSocketFrame("Server time " + LocalDateTime.now() + " --- " + msg.text()));}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("有客户端连接成功 --" + ctx.channel().id().asLongText()); //asLongText唯一值}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("有客户端已经离开 --" + ctx.channel().id().asLongText()); //asLongText唯一值}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异常Info:" + cause.getMessage());ctx.close();}
}

客户端(浏览器)

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Websocket</title>
</head>
<body>
<script>var socket;if (window.WebSocket) {socket = new WebSocket("ws://localhost:9999/hello");//相当于channelRead0,读取服务器端的消息socket.onmessage = function(ev){var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + ev.data;}//开启连接socket.onopen = function(ev){var rt = document.getElementById("responseText");rt.value = "开启连接成功!";}//连接关闭socket.onclose = function(ev){var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + "连接关闭成功!";}}//发送消息给服务器function send(msg){if(!window.socket){ //是否已创建socketreturn;}if(socket.readyState == WebSocket.OPEN){socket.send(msg);}else{alert("socket未连接");}}
</script><form onsubmit="return false"><textarea name="message" style="height:300px;width:300px"></textarea><input type="button" value="Send" onclick="send(this.form.message.value)"><textarea id="responseText" style="height:300px;width:300px"></textarea><input type="button" value="Clear" onclick="document.getElementById('responseText').value=''"></form>
</body>
</html>

效果图

http://www.lryc.cn/news/137503.html

相关文章:

  • C#基础进阶
  • Java:ArrayList集合、LinkedList(链表)集合的底层原理及应用场景
  • 【Python】json文件的读取
  • 专用杂凑函数的消息鉴别码算法学习记录
  • Golang使用消息队列(RabbitMQ)
  • Apache Spark远程代码执行漏洞(CVE-2023-32007)漏洞复现
  • 春秋云镜 :CVE-2020-21650(MyuCMS后台rce)
  • 测试框架pytest教程(7)实现 xunit 风格的setup
  • 用队列实现栈
  • Anolis 8.6 下 Redis 7.2.0 集群搭建和配置
  • 综合能源系统(8)——综合能源系统支撑技术
  • MySQL5.7数据目录结构
  • Python Opencv实践 - 图像直方图均衡化
  • GAN:对抗生成网络,前向传播和后巷传播的区别
  • 压力变送器的功能与应用
  • 排序算法:选择排序
  • Windows运行Spark所需的Hadoop安装
  • KusionStack使用文档
  • ONLYOFFICE 文档如何与 Alfresco 进行集成
  • PostgreSQL下载路径与安装步骤
  • 如何在PHP中编写条件语句
  • LLM架构自注意力机制Transformers architecture Attention is all you need
  • 计算机网络 QA
  • 安果天气预报 产品介绍
  • net start Mysql 启动服务时 ,显示“Mysql服务正在启动 Mysql服务无法启动 服务没有报告任何错误
  • DAY24
  • Redis过期数据的删除策略
  • 如何使用CSS实现一个拖拽排序效果?
  • leetcode 118.杨辉三角
  • 微服务框架之SpringBoot面试题汇总