当前位置: 首页 > article >正文

Netty(10)协议设计与解析(IdleStateHandler:空闲检测器、心跳)

为什么需要协议?

TCP/IP 中消息传输基于流的方式,没有边界。

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

协议举例

redis 协议

客户端代码

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.Charset;@Slf4j
public class TestRedis {/*向redis发送这样的一条命令:set name zhangsanredis将上面的一条命令看做数组,依次发送下面的内容1. 规定*号后面的就是数组元素的个数*3:元素的个数为32. 发送每个命令每个键值的长度,多个命令之间使用回车+换行进行分隔$3: set命令是三个字节set$4: key的长度是四个字节name$8: value的长度是八个字节zhangsan下面演示的客户端怎么发送:*/public static void main(String[] args) {/*13: 回车10: 换行*/final byte[] LINE = {13, 10};NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) {//因为使用的是英文和拼音,所以不考虑编码问题ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);//回车换行buf.writeBytes("$3".getBytes());//set命令的长度buf.writeBytes(LINE);//回车换行buf.writeBytes("set".getBytes());//set命令buf.writeBytes(LINE);//回车换行buf.writeBytes("$4".getBytes());//key的长度buf.writeBytes(LINE);buf.writeBytes("name".getBytes());buf.writeBytes(LINE);buf.writeBytes("$8".getBytes());//value的长度buf.writeBytes(LINE);buf.writeBytes("zhangsan".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}//接收redis返回到的结果@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});//连接本地的redis服务器ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

上面的客户端启动后,会向本机的redis发送数据,最终可以在本机的redis读取到set的内容“zhangsan”

http协议

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;@Slf4j
public class TestHttp {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));/*http协议的编解码器:HttpServerCodec: 作为http协议的服务器端Codec: 即包含解密又包含编码Decode:解码EnCode:编码二者组合在一起便是Codec当http请求来了过后,便会从上往下经过一个个的handler到了HttpServerCodec时,便会对请求进行解码,解码的内容将会被分为下面的两部分1. HttpRequest:包含请求行的请求头2. HttpContent:代表请求体*///HttpServerCodec: 即是入站处理器,又是出站处理器ch.pipeline().addLast(new HttpServerCodec());//解码后,对解码后的内容进行处理/*SimpleChannelInboundHandler:入站处理器,只关心某一种类型的消息;可以对消息进行区分,进行选择处理SimpleChannelInboundHandler<HttpRequest>():只关心HttpRequest类型的消息;其他类型消息便会被跳过*/ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {//msg: 代表请求头和请求行// 获取请求行log.debug(msg.uri());// 给浏览器返回响应/*DefaultFullHttpResponse: 响应结果便会结果出站处理器HttpServerCodec;编码成为ByteBuf,符合http协议的响应protocolVersion:     http协议的版本HttpResponseStatus:  响应的状态码*/DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();//加入响应头,设置其响应的消息长度response.headers().setInt(CONTENT_LENGTH, bytes.length);//写入一些响应的内容给response对象response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});/*//不进行消息的区分,使用instanceof判断消息的类型ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}", msg.getClass());//判断msg是否是HttpRequest类型,是的话便去处理请求行,请求头if (msg instanceof HttpRequest) { // 请求行,请求头} else if (msg instanceof HttpContent) { //请求体//判断msg是否是HttpRequest类型,是的话便去处理请求体}}});*/}});//监听本机的8080端口,当使用浏览器访问localhost:8080时,当前这个方法便会被触发,进行上面的处理ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

当浏览器访问:localhost:8080这个链接后,浏览器页面将会显示写入的html内容

在控制台也会进行相应的输出

自定义协议

自定义协议要素

  • 魔数,用来在第一时间判定是否是无效数据包

    发送的数据,一般前几个就是魔术值
    通过魔术值,来判断这个消息是不是需要的,可以理解为对暗号

  • 版本号,可以支持协议的升级

  • 序列化算法消息正文到底采用哪种序列化反序列化方式,可以由此扩展

    例如:将消息正文格式为(**常见的序列化方式**为)json、protobuf(谷歌出品的)、hessian、jdk

  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关

    消息是什么类型

  • 请求序号,为了双工通信,提供异步能力

    消息发送出去,接收的时候不一定是按照发送出去的顺序接收,所以需要一个序号去判断这次响应对应的是之前的那一次请求

  • 正文长度

    通过正文长度去判断需要读取的字节数是多少

  • 消息正文

实战

自定义消息的类型

所有消息的公共父类(Message)

@Data
public abstract class Message implements Serializable {/*** 根据消息类型字节,获得对应的消息 class* @param messageType 消息类型字节* @return 消息 class*/public static Class<? extends Message> getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId;private int messageType;//获取消息类型,其他继承这个方法的重新该方法,如此,子类便可以通过调用这个方法知道是什么消息类型public abstract int getMessageType();public static final int LoginRequestMessage = 0;public static final int LoginResponseMessage = 1;public static final int ChatRequestMessage = 2;public static final int ChatResponseMessage = 3;public static final int GroupCreateRequestMessage = 4;public static final int GroupCreateResponseMessage = 5;public static final int GroupJoinRequestMessage = 6;public static final int GroupJoinResponseMessage = 7;public static final int GroupQuitRequestMessage = 8;public static final int GroupQuitResponseMessage = 9;public static final int GroupChatRequestMessage = 10;public static final int GroupChatResponseMessage = 11;public static final int GroupMembersRequestMessage = 12;public static final int GroupMembersResponseMessage = 13;public static final int PingMessage = 14;public static final int PongMessage = 15;/*** 请求类型 byte 值*/public static final int RPC_MESSAGE_TYPE_REQUEST = 101;/*** 响应类型 byte 值*/public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}

各种消息类型的类

LoginRequestMessage
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {private String username;private String password;public LoginRequestMessage() {}public LoginRequestMessage(String username, String password) {this.username = username;this.password = password;}@Overridepublic int getMessageType() {return LoginRequestMessage;}
}
LoginResponseMessage
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage {public LoginResponseMessage(boolean success, String reason) {super(success, reason);}@Overridepublic int getMessageType() {return LoginResponseMessage;}
}
ChatRequestMessage
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {private String content;private String to;private String from;public ChatRequestMessage() {}public ChatRequestMessage(String from, String to, String content) {this.from = from;this.to = to;this.content = content;}@Overridepublic int getMessageType() {return ChatRequestMessage;}
}
GroupCreateRequestMessage
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {private String groupName;private Set<String> members;public GroupCreateRequestMessage(String groupName, Set<String> members) {this.groupName = groupName;this.members = members;}@Overridepublic int getMessageType() {return GroupCreateRequestMessage;}
}
GroupCreateRequestMessage
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {private String groupName;private Set<String> members;public GroupCreateRequestMessage(String groupName, Set<String> members) {this.groupName = groupName;this.members = members;}@Overridepublic int getMessageType() {return GroupCreateRequestMessage;}
}
GroupMembersRequestMessage
@Data
@ToString(callSuper = true)
public class GroupMembersRequestMessage extends Message {private String groupName;public GroupMembersRequestMessage(String groupName) {this.groupName = groupName;}@Overridepublic int getMessageType() {return GroupMembersRequestMessage;}
}
GroupJoinRequestMessage
@Data
@ToString(callSuper = true)
public class GroupJoinRequestMessage extends Message {private String groupName;private String username;public GroupJoinRequestMessage(String username, String groupName) {this.groupName = groupName;this.username = username;}@Overridepublic int getMessageType() {return GroupJoinRequestMessage;}
}
GroupQuitRequestMessage
@Data
@ToString(callSuper = true)
public class GroupQuitRequestMessage extends Message {private String groupName;private String username;public GroupQuitRequestMessage(String username, String groupName) {this.groupName = groupName;this.username = username;}@Overridepublic int getMessageType() {return GroupQuitRequestMessage;}
}
PingMessage
public class PingMessage extends Message {@Overridepublic int getMessageType() {return PingMessage;}
}
PongMessage
public class PongMessage extends Message {@Overridepublic int getMessageType() {return PongMessage;}
}

自定义消息的编解码器

@Slf4j
//加了Sharable就说明以后在使用这个类进行实例化时,可以将其抽取出来共享, 但是不能继承ByteToMessageCodec这个方法
//@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数:类似与一种暗号out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式, 约定:jdk: 0 , json: 1out.writeByte(0);//jdk的序列化方式// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节的请求序号out.writeInt(msg.getSequenceId());// 无意义,对齐填充, 满足2的n次方out.writeByte(0xff);// 6. 获取内容的字节数组: 序列化为二进制的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容, 在写入内容前面的长度部分是固定的:15+1out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//获取魔数值:四个字节int magicNum = in.readInt();//获取版本:一字节byte version = in.readByte();//获取序列化算法:一字节byte serializerType = in.readByte();//获取消息类型:一字节byte messageType = in.readByte();//消息序号:四字节int sequenceId = in.readInt();//填充,对齐的一字节in.readByte();//长度:四字节int length = in.readInt();//分配缓冲区byte[] bytes = new byte[length];//通过长度去读取多少个字节in.readBytes(bytes, 0, length);//内容
//        if (messageType == 0){//如果是什么类型,就进行反序列化
//            //。。。。
//        }//将内容进行反序列化ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));//将其转换为MessageMessage message = (Message) ois.readObject();//将内容进行打印log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);//将netty解码后的结果存入对应的参数里面去,不存入的话接下来的handler将拿不到结果out.add(message);}
}

测试

编码

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;public class TestMessageCodec {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(//打印日志new LoggingHandler(),new MessageCodec());// encodeLoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");channel.writeInbound(message);}
}

在这里插入图片描述

解码

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;public class TestMessageCodec {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),new MessageCodec());// encodeLoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");// decodeByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec().encode(null, message, buf);//入站channel.writeInbound(buf );}
}

在这里插入图片描述

黏包/半包问题解决

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;public class TestMessageCodec {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),//解决黏包/半包问题//最大长度1024,长度字段偏移量12,长度字段长度四字节,长度字段不需要调整0,不需要去除前面的部分0new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec());// encodeLoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");// decodeByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec().encode(null, message, buf);//将buf进行切片为两部分ByteBuf s1 = buf.slice(0, 100);ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);//writeInbound会在ByteBuf写完后,调用release方法,这时其引用记便变为0,//这时候ByteBuf占用的内存都被释放掉了s1.retain(); // 引用计数+1 = 2//模拟半包channel.writeInbound(s1); // release = 引用计数 - 1 = 1channel.writeInbound(s2);}
}

@Sharable什么情况下才能使用

  • 当 handler 不保存状态时,就可以安全地在多线程下被共享

  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制, 如果使用了便会报下面的错
    在这里插入图片描述

  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类

正确的编解码代码

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);//out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream ();oos.wirteObject(msg);byte[] bytes = bos.toByteArray();//byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();//序列化算法byte serializerAlgorithm = in.readByte(); // 0 或 1byte messageType = in.readByte(); // 0,1,2...int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);// 确定具体消息类型byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectOutputStream oos = new ObjectOutputStream (new ByteArrayOutputStream(bytes));Message message = (Message)ois.readObject();/*// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];// 确定具体消息类型Class<? extends Message> messageClass = Message.getMessageClass(messageType);Message message = algorithm.deserialize(messageClass, bytes);*/
//        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
//        log.debug("{}", message);out.add(message);}}
config
public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}public static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}

application.properties

serializer.algorithm=Json
cn.abc.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

聊天室业务说明

在这里插入图片描述

Service

/*** 用户管理接口*/
public interface UserService {/*** 登录* @param username 用户名* @param password 密码* @return 登录成功返回 true, 否则返回 false*/boolean login(String username, String password);
}

方便起见,这里直接使用map去存储用户信息

public class UserServiceMemoryImpl implements UserService {private Map<String, String> allUserMap = new ConcurrentHashMap<>();{allUserMap.put("zhangsan", "123");allUserMap.put("lisi", "123");allUserMap.put("wangwu", "123");allUserMap.put("zhaoliu", "123");allUserMap.put("qianqi", "123");}@Overridepublic boolean login(String username, String password) {String pass = allUserMap.get(username);if (pass == null) {return false;}return pass.equals(password);}
}

session

/*** 会话管理接口*/
public interface Session {/*** 绑定会话* @param channel 哪个 channel 要绑定会话* @param username 会话绑定用户*/void bind(Channel channel, String username);/*** 解绑会话* @param channel 哪个 channel 要解绑会话*/void unbind(Channel channel);/*** 获取属性* @param channel 哪个 channel* @param name 属性名* @return 属性值*/Object getAttribute(Channel channel, String name);/*** 设置属性* @param channel 哪个 channel* @param name 属性名* @param value 属性值*/void setAttribute(Channel channel, String name, Object value);/*** 根据用户名获取 channel* @param username 用户名* @return channel*/Channel getChannel(String username);
}

这里使用两个map集合去记录 用户名 跟 channel 的对应关系

public class SessionMemoryImpl implements Session {private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();@Overridepublic void bind(Channel channel, String username) {usernameChannelMap.put(username, channel);channelUsernameMap.put(channel, username);channelAttributesMap.put(channel, new ConcurrentHashMap<>());}@Overridepublic void unbind(Channel channel) {String username = channelUsernameMap.remove(channel);usernameChannelMap.remove(username);channelAttributesMap.remove(channel);}@Overridepublic Object getAttribute(Channel channel, String name) {return channelAttributesMap.get(channel).get(name);}@Overridepublic void setAttribute(Channel channel, String name, Object value) {channelAttributesMap.get(channel).put(name, value);}@Overridepublic Channel getChannel(String username) {return usernameChannelMap.get(username);}@Overridepublic String toString() {return usernameChannelMap.toString();}
}

/*** 聊天组会话管理接口*/
public interface GroupSession {/*** 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null* @param name 组名* @param members 成员* @return 成功时返回组对象, 失败返回 null*/Group createGroup(String name, Set<String> members);/*** 加入聊天组* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group joinMember(String name, String member);/*** 移除组成员* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeMember(String name, String member);/*** 移除聊天组* @param name 组名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeGroup(String name);/*** 获取组成员* @param name 组名* @return 成员集合, 如果群不存在或没有成员会返回 empty set*/Set<String> getMembers(String name);/*** 获取组成员的 channel 集合, 只有在线的 channel 才会返回* @param name 组名* @return 成员 channel 集合*/List<Channel> getMembersChannel(String name);
}

SessionFactory

public abstract class SessionFactory {private static Session session = new SessionMemoryImpl();public static Session getSession() {return session;}
}

GroupSession

/*** 聊天组会话管理接口*/
public interface GroupSession {/*** 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null* @param name 组名* @param members 成员* @return 成功时返回组对象, 失败返回 null*/Group createGroup(String name, Set<String> members);/*** 加入聊天组* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group joinMember(String name, String member);/*** 移除组成员* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeMember(String name, String member);/*** 移除聊天组* @param name 组名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeGroup(String name);/*** 获取组成员* @param name 组名* @return 成员集合, 如果群不存在或没有成员会返回 empty set*/Set<String> getMembers(String name);/*** 获取组成员的 channel 集合, 只有在线的 channel 才会返回* @param name 组名* @return 成员 channel 集合*/List<Channel> getMembersChannel(String name);
}

GroupSessionMemoryImpl

实现GroupSession里面的方法

public class GroupSessionMemoryImpl implements GroupSession {private final Map<String, Group> groupMap = new ConcurrentHashMap<>();@Overridepublic Group createGroup(String name, Set<String> members) {Group group = new Group(name, members);return groupMap.putIfAbsent(name, group);}@Overridepublic Group joinMember(String name, String member) {return groupMap.computeIfPresent(name, (key, value) -> {value.getMembers().add(member);return value;});}@Overridepublic Group removeMember(String name, String member) {return groupMap.computeIfPresent(name, (key, value) -> {value.getMembers().remove(member);return value;});}@Overridepublic Group removeGroup(String name) {return groupMap.remove(name);}@Overridepublic Set<String> getMembers(String name) {return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();}@Overridepublic List<Channel> getMembersChannel(String name) {return getMembers(name).stream().map(member -> SessionFactory.getSession().getChannel(member)).filter(Objects::nonNull).collect(Collectors.toList());}
}

ProcotolFrameDecoder

处理黏包和半包的代码一般是不会进行变化的,所以将这种处理的方法抽取出来,在这里提供一种无参构造

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {public ProcotolFrameDecoder() {this(1024, 12, 4, 0, 0);}public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);}
}

过程代码

连接建立后,就向服务器发送一个登录的请求(发一个登录消息),服务器去验证这个消息是否正确

通过多个客户端,连接同一个服务器端,对二者的客户端进行发送消息,进行通信

客户端代码

@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//使两个线程之间进行通信CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);//登录的状态AtomicBoolean LOGIN = new AtomicBoolean(false);AtomicBoolean EXIT = new AtomicBoolean(false);Scanner scanner = new Scanner(System.in);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(MESSAGE_CODEC);// 用来判断是不是 读空闲时间过长,或 写空闲时间过长// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了写空闲事件if (event.state() == IdleState.WRITER_IDLE) {
//                                log.debug("3s 没有写数据了,发送一个心跳包");ctx.writeAndFlush(new PingMessage());}}});ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {// 接收响应消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg: {}", msg);if ((msg instanceof LoginResponseMessage)) {LoginResponseMessage response = (LoginResponseMessage) msg;if (response.isSuccess()) {// 如果登录成功, 设置登录状态为trueLOGIN.set(true);}// 使记数减1:唤醒 system in 线程WAIT_FOR_LOGIN.countDown();}}// 在连接建立后触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 负责接收用户在控制台的输入,负责向服务器发送各种消息/*创建一个新线程,不创建新线程的话,其是在Nio的线程里面执行,会阻塞*/new Thread(() -> {System.out.println("请输入用户名:");String username = scanner.nextLine();if(EXIT.get()){return;}System.out.println("请输入密码:");String password = scanner.nextLine();if(EXIT.get()){return;}// 构造消息对象LoginRequestMessage message = new LoginRequestMessage(username, password);System.out.println(message);// 使用ctx对象,发送消息ctx.writeAndFlush(message);System.out.println("等待后续操作...");try {//阻塞线程,直到记数减为0(当channelRead里调用WAIT_FOR_LOGIN.countDown()时,该线程将被唤醒)WAIT_FOR_LOGIN.await();} catch (InterruptedException e) {e.printStackTrace();}// 如果登录失败if (!LOGIN.get()) {//关闭channelctx.channel().close();return;}//如果登录没有失败,则向下运行while (true) {System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = null;try {command = scanner.nextLine();} catch (Exception e) {break;}if(EXIT.get()){return;}String[] s = command.split(" ");switch (s[0]){case "send":ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));break;case "gsend":ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));break;case "gcreate"://组里面的成员Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));set.add(username); // 加入自己ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));break;case "quit":ctx.channel().close();return;}}}, "system in").start();}// 在连接断开时触发@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("连接已经断开,按任意键退出..");EXIT.set(true);}// 在出现异常时触发@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.debug("连接已经断开,按任意键退出..{}", cause.getMessage());EXIT.set(true);}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

服务器端代码

@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();QuitHandler QUIT_HANDLER = new QuitHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);// 用来判断是不是 读空闲时间过长,或 写空闲时间过长// 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了读空闲事件if (event.state() == IdleState.READER_IDLE) {log.debug("已经 5s 没有读到数据了");ctx.channel().close();}}});ch.pipeline().addLast(LOGIN_HANDLER);ch.pipeline().addLast(CHAT_HANDLER);ch.pipeline().addLast(GROUP_CREATE_HANDLER);ch.pipeline().addLast(GROUP_JOIN_HANDLER);ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);ch.pipeline().addLast(GROUP_QUIT_HANDLER);ch.pipeline().addLast(GROUP_CHAT_HANDLER);ch.pipeline().addLast(QUIT_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}

handler

将服务器端的处理handler抽取处理,避免服务器端里面的代码过多

LoginRequestMessageHandler

登录的信息处理


@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();boolean login = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage message;if (login) {SessionFactory.getSession().bind(ctx.channel(), username);message = new LoginResponseMessage(true, "登录成功");} else {message = new LoginResponseMessage(false, "用户名或密码不正确");}ctx.writeAndFlush(message);}
}

ChatRequestMessageHandler

聊天的信息处理


@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {String to = msg.getTo();Channel channel = SessionFactory.getSession().getChannel(to);// 在线if (channel != null) {channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));}// 不在线else {ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));}}
}

GroupCreateRequestMessageHandler

创建群


@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {String groupName = msg.getGroupName();//成员Set<String> members = msg.getMembers();// 群管理器GroupSession groupSession = GroupSessionFactory.getGroupSession();Group group = groupSession.createGroup(groupName, members);//创建成功if (group == null) {// 发送成功消息给群的创建者ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));// 发送拉群消息:给每一个群员发送消息//得到该群里面的所有在线人员List<Channel> channels = groupSession.getMembersChannel(groupName);//遍历channel集合,对这些channel发送消息for (Channel channel : channels) {channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));}} else {//给创建者发送失败消息ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));}}
}

GroupJoinRequestMessageHandler

@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());if (group != null) {ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));} else {ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));}}
}

GroupMembersRequestMessageHandler

@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {Set<String> members = GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName());ctx.writeAndFlush(new GroupMembersResponseMessage(members));}
}

GroupQuitRequestMessageHandler

@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());if (group != null) {ctx.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));} else {ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));}}
}

GroupChatRequestMessageHandler

群组里面发送信息


@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {List<Channel> channels = GroupSessionFactory.getGroupSession().getMembersChannel(msg.getGroupName());//遍历所有群成员的channel,对这些channel发送消息for (Channel channel : channels) {//发消息channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));}}
}

QuitHandler

处理退出事件

  1. 异常退出
  2. 正常退出
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {// 当连接断开时触发 inactive 事件@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {SessionFactory.getSession().unbind(ctx.channel());log.debug("{} 已经断开", ctx.channel());}// 当出现异常时触发@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {SessionFactory.getSession().unbind(ctx.channel());log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());}
}

空闲检测: 连接假死(IdleStateHandler

原因

  • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
  • 应用程序线程阻塞,无法进行数据读写

问题

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时

服务器端解决

  • 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
/*参数1:检查读的空闲时间超过对应的秒数(默认是秒)参数2:检查写的空闲时间超过对应的秒数参数3:读写都空闲的时间超过对应的秒数参数4:时间的单位
*/
/*
下面这个设置的意思是:5秒钟没有收到客户端发过来的数据(这时候就认为读的空闲时间太长)不关心其他的空闲5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件*/
new IdleStateHandler(5, 0, 0);
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件: 如读空闲事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{//获取 IdleState 事件IdleStateEvent event = (IdleStateEvent) evt;// 触发了读空闲事件if (event.state() == IdleState.READER_IDLE) {log.debug("已经 5s 没有读到数据了");ctx.channel().close();}}
});

心跳包

客户端定时心跳

  • 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器

// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了写空闲事件if (event.state() == IdleState.WRITER_IDLE) {//                                log.debug("3s 没有写数据了,发送一个心跳包");ctx.writeAndFlush(new PingMessage());}}
});
http://www.lryc.cn/news/2418274.html

相关文章:

  • class反编译
  • web前端面试题之JavaScript篇
  • 2022年最佳的9种逆向工程工具[持续更新]
  • 共享WiFi贴项目怎么推广?wifi贴详细地推攻略分享!
  • 致敬!向中外9名杰出女数学家
  • 震惊!680万用户信息泄露,国内某人脸识别公司数据库“裸奔”
  • MadPad:个性化的音乐创作应用
  • KUDU同步数据到Hive报错
  • 基于java的叮当书城系统毕业设计(源代码+数据库+部署文档+部署视频)
  • 最新版qq空间刷留言小秘书 V1.20 正式版
  • 服务器无法在此时接收控制信息,如何修复win10专业版wifi错误1061服务无法在此时接受控制信息...
  • PASCAL VOC 2012 数据集解析
  • UWB技术的工作原理
  • 电商系统的简单架构
  • cisco2600路由器破解密码和重装IOS
  • 看这里NetWork location failed because baidu location service can not decrypt the request query,so加载不到
  • Oracle11g安装详细步骤(图文教程)
  • C语言与Java的区别
  • php商城开发人人分销团队级差分红升级规则订制
  • 开通博客通告
  • 进程间通信IPC——管道(1) 两个进程间通信
  • myeclipse 9.0
  • swf反编译工具
  • python主要应用于哪些方面,python一般用来做什么
  • Linux运维工程师常见基础面试题
  • 电子商务网站后台管理系统的设计与实现
  • js中void 0是什么意思,javascript:void()
  • 负载均衡与反向代理
  • UG软件安装教程
  • 深度学习(十七)——SSD, YOLOv2