当前位置: 首页 > news >正文

Netty实现单通道并发读写,即多路复用

🧑 博主简介:CSDN博客专家历代文学网(PC端可以访问:https://literature.sinhy.com/#/?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程高并发设计Springboot和微服务,熟悉LinuxESXI虚拟化以及云原生Docker和K8s,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。
技术合作请加本人wx(注明来自csdn):foreast_sea

在这里插入图片描述


在这里插入图片描述

Netty实现单通道并发读写,即多路复用

引言:Netty共享通道连接池——突破传统连接模型的性能革命

在传统网络编程中,TCP连接常被视为单线程独占资源,这种设计在高并发场景下面临着严峻的性能瓶颈:每个连接只能串行处理请求,导致资源利用率低下,连接数量激增带来巨大开销。Netty共享通道连接池应运而生,它颠覆性地实现了单TCP连接的多线程并行读写,将连接复用提升到全新维度。

下面我们将实现一个高性能的连接池,支持多个线程共享同一个通道(每个通道最大共享线程数可配置),并确保高并发获取和释放连接的效率。

一、共享连接池实现:支持多线程共享同一通道

核心实现

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.*;import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;public class MultiThreadSharedChannelPool {// 配置参数private final int maxConnections;private final int maxThreadsPerChannel;private final long acquireTimeoutMillis;// 核心数据结构private final Queue<SharedChannel> availableChannels = new ConcurrentLinkedQueue<>();private final ConcurrentHashMap<Channel, SharedChannel> allChannels = new ConcurrentHashMap<>();private final AtomicInteger totalConnections = new AtomicInteger(0);private final EventLoopGroup eventLoopGroup;private final Bootstrap bootstrap;// 等待队列管理private final Queue<Promise<SharedChannel>> pendingAcquires = new ConcurrentLinkedQueue<>();public MultiThreadSharedChannelPool(Bootstrap bootstrap, int maxConnections, int maxThreadsPerChannel,long acquireTimeoutMillis) {this.bootstrap = bootstrap;this.eventLoopGroup = bootstrap.config().group();this.maxConnections = maxConnections;this.maxThreadsPerChannel = maxThreadsPerChannel;this.acquireTimeoutMillis = acquireTimeoutMillis;}/*** 获取共享通道(异步)*/public Future<SharedChannel> acquire() {Promise<SharedChannel> promise = eventLoopGroup.next().newPromise();acquireInternal(promise);return promise;}private void acquireInternal(Promise<SharedChannel> promise) {// 尝试从可用通道获取SharedChannel channel = tryAcquireAvailableChannel();if (channel != null) {promise.setSuccess(channel);return;}// 尝试创建新连接if (tryCreateNewConnection()) {// 创建成功后重新尝试获取eventLoopGroup.schedule(() -> acquireInternal(promise), 10, TimeUnit.MILLISECONDS);return;}// 加入等待队列if (acquireTimeoutMillis > 0) {scheduleAcquireTimeout(promise);}pendingAcquires.offer(promise);}private SharedChannel tryAcquireAvailableChannel() {for (SharedChannel channel : availableChannels) {if (channel.tryAcquire()) {// 如果通道已满,从可用队列移除if (!channel.isAvailable()) {availableChannels.remove(channel);}return channel;}}return null;}private boolean tryCreateNewConnection() {if (totalConnections.get() >= maxConnections) {return false;}if (!totalConnections.compareAndSet(totalConnections.get(), totalConnections.get() + 1)) {return false;}// 异步创建连接bootstrap.connect().addListener((ChannelFuture future) -> {if (future.isSuccess()) {Channel ch = future.channel();SharedChannel sharedChannel = new SharedChannel(ch, maxThreadsPerChannel);allChannels.put(ch, sharedChannel);// 新连接立即可用availableChannels.offer(sharedChannel);processPendingAcquires();// 添加关闭监听ch.closeFuture().addListener(f -> {allChannels.remove(ch);totalConnections.decrementAndGet();});} else {totalConnections.decrementAndGet();}});return true;}private void processPendingAcquires() {while (!pendingAcquires.isEmpty()) {Promise<SharedChannel> promise = pendingAcquires.poll();if (promise == null || promise.isDone()) continue;SharedChannel channel = tryAcquireAvailableChannel();if (channel != null) {promise.setSuccess(channel);} else {pendingAcquires.offer(promise);break;}}}private void scheduleAcquireTimeout(Promise<SharedChannel> promise) {eventLoopGroup.schedule(() -> {if (!promise.isDone() && pendingAcquires.remove(promise)) {promise.tryFailure(new TimeoutException("Acquire timeout"));}}, acquireTimeoutMillis, TimeUnit.MILLISECONDS);}/*** 释放共享通道*/public void release(SharedChannel channel) {channel.release();if (channel.isAvailable()) {// 如果变为可用状态,加入可用队列availableChannels.offer(channel);// 唤醒等待请求processPendingAcquires();}}/*** 共享通道包装类*/public static class SharedChannel {private final Channel physicalChannel;private final AtomicInteger usageCount = new AtomicInteger(0);private final int maxThreads;public SharedChannel(Channel physicalChannel, int maxThreads) {this.physicalChannel = physicalChannel;this.maxThreads = maxThreads;}public Channel getChannel() {return physicalChannel;}public boolean tryAcquire() {while (true) {int current = usageCount.get();if (current >= maxThreads) return false;if (usageCount.compareAndSet(current, current + 1)) {return true;}}}public void release() {usageCount.decrementAndGet();}public boolean isAvailable() {return usageCount.get() < maxThreads && physicalChannel.isActive();}public int getUsageCount() {return usageCount.get();}}// 统计信息方法public int getAvailableChannels() {return availableChannels.size();}public int getActiveChannels() {return allChannels.size() - availableChannels.size();}public int getTotalConnections() {return totalConnections.get();}public int getPendingAcquires() {return pendingAcquires.size();}
}

使用示例

// 1. 创建Netty引导
Bootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {// 初始化管道}});// 2. 创建连接池
MultiThreadSharedChannelPool pool = new MultiThreadSharedChannelPool(bootstrap,20,     // 最大物理连接数8,      // 每个连接最大共享线程数5000    // 获取超时时间(毫秒)
);// 3. 获取共享通道
pool.acquire().addListener((Future<MultiThreadSharedChannelPool.SharedChannel> future) -> {if (future.isSuccess()) {MultiThreadSharedChannelPool.SharedChannel sharedChannel = future.getNow();try {Channel channel = sharedChannel.getChannel();// 4. 使用通道channel.writeAndFlush(request).addListener(writeFuture -> {// 处理响应...});} finally {// 5. 释放通道pool.release(sharedChannel);}} else {// 处理获取失败}
});// 6. 关闭连接池
Runtime.getRuntime().addShutdownHook(new Thread(() -> {// 实际应用中需要更优雅的关闭bootstrap.config().group().shutdownGracefully();
}));

二、关键性能设计

1. 高性能无锁队列

// 使用并发性能最好的队列
private final Queue<SharedChannel> availableChannels = new ConcurrentLinkedQueue<>();// 使用CAS操作更新使用计数
public boolean tryAcquire() {while (true) {int current = usageCount.get();if (current >= maxThreads) return false;if (usageCount.compareAndSet(current, current + 1)) {return true;}}
}

2. 智能连接创建策略

private boolean tryCreateNewConnection() {// 双重检查避免过度创建if (totalConnections.get() >= maxConnections) return false;// CAS保证原子性if (!totalConnections.compareAndSet(current, current + 1)) {return false;}// 异步创建连接bootstrap.connect().addListener(future -> {if (future.isSuccess()) {// 添加新通道} else {// 回滚计数totalConnections.decrementAndGet();}});return true;
}

3. 高效等待队列处理

private void processPendingAcquires() {while (!pendingAcquires.isEmpty()) {Promise<SharedChannel> promise = pendingAcquires.poll();if (promise == null || promise.isDone()) continue;SharedChannel channel = tryAcquireAvailableChannel();if (channel != null) {promise.setSuccess(channel);} else {// 放回队列并退出循环pendingAcquires.offer(promise);break;}}
}

4. 连接预热机制

public void warmup(int connections) {for (int i = 0; i < Math.min(connections, maxConnections); i++) {tryCreateNewConnection();}
}

三、高级功能扩展

1. 连接健康检查

public void startHealthCheck(long interval, TimeUnit unit) {eventLoopGroup.scheduleAtFixedRate(() -> {for (SharedChannel sc : allChannels.values()) {if (!sc.getChannel().isActive() && sc.getUsageCount() == 0) {// 关闭无效连接sc.getChannel().close();}}}, interval, interval, unit);
}

2. 负载监控

public void startMonitoring() {eventLoopGroup.scheduleAtFixedRate(() -> {System.out.println("=== 连接池状态 ===");System.out.println("总连接数: " + totalConnections.get());System.out.println("可用通道: " + availableChannels.size());System.out.println("等待请求: " + pendingAcquires.size());// 打印每个通道的使用情况allChannels.forEach((ch, sc) -> {System.out.printf("通道 %s: 使用数=%d/%d%n", ch.id(), sc.getUsageCount(), sc.maxThreads);});}, 5, 5, TimeUnit.SECONDS);
}

3. 动态配置

public void updateConfig(int newMaxConnections, int newMaxThreadsPerChannel) {// 注意:需要线程安全地更新this.maxConnections = newMaxConnections;// 更新现有通道的最大线程数allChannels.values().forEach(sc -> sc.maxThreads = newMaxThreadsPerChannel);
}

四、粘包半包问题解决方案

1. 为什么需要 LengthFieldBasedFrameDecoder

在 TCP 网络通信中,数据是以字节流形式传输的,没有明确的消息边界。这会导致两个核心问题:

  1. 粘包问题:多个小数据包被合并成一个大数据包

    发送端: [包1][包2][包3]
    接收端: [包1包2包3] (合并接收)
    
  2. 拆包问题:大数据包被拆分成多个小数据包

    发送端: [大数据包]
    接收端: [部分1][部分2] (分次接收)
    

LengthFieldBasedFrameDecoder 正是 Netty 为解决这些问题提供的核心解码器,它通过长度字段来标识消息边界。

2. 工作流程

ChannelDecoderHandler接收原始字节流读取长度字段值 (N)计算帧长度 = N + 调整值等待直到收到完整帧 (N字节)转发完整数据帧 (剥离指定头部)ChannelDecoderHandler

3. 核心参数详解

3.1 构造函数
public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset,int lengthFieldLength,int lengthAdjustment,int initialBytesToStrip
)
参数名类型说明示例值
maxFrameLengthint最大帧长度(防攻击)1024 * 1024 (1MB)
lengthFieldOffsetint长度字段偏移量0
lengthFieldLengthint长度字段字节数4
lengthAdjustmentint长度调整值0
initialBytesToStripint初始剥离字节数4
3.2 参数关系公式
完整帧长度 = lengthFieldOffset + lengthFieldLength + (长度字段值) + lengthAdjustment

4. 典型使用场景

场景1:简单长度前缀协议
[长度字段(4字节)][消息体]
new LengthFieldBasedFrameDecoder(1048576, // 1MB最大帧0,       // 长度字段在开头4,       // 长度字段占4字节0,       // 无调整4        // 剥离长度字段
)
场景2:包含固定头部的协议
[魔数(2字节)][版本(1字节)][长度(4字节)][消息体]
new LengthFieldBasedFrameDecoder(1048576,2 + 1,   // 跳过魔数和版本 (3字节)4,       // 长度字段4字节0,       // 无调整2 + 1 + 4 // 剥离魔数+版本+长度字段 (7字节)
)
场景3:长度包含自身的情况
[长度(4字节)][消息体]  // 长度字段值 = 4 + 消息体长度
new LengthFieldBasedFrameDecoder(1048576,0,4,-4,      // 调整:长度字段包含自身,需减去44        // 剥离长度字段
)
场景4:复杂调整场景
[头标识(2)][长度(4)][版本(1)][消息体][校验(2)]
new LengthFieldBasedFrameDecoder(1048576,2,       // 跳过头标识4,       // 长度字段4字节1 + 2,   // 调整:长度字段值 + 版本长度 + 校验长度2 + 4    // 剥离头标识+长度字段
)

5. 完整代码示例

5.1 服务端配置
public class ServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 配置LengthFieldBasedFrameDecoderpipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024,   // 最大帧1MB0,             // 长度字段偏移04,             // 长度字段4字节0,             // 长度调整值4              // 剥离前4字节(长度字段)));// 添加自定义解码器pipeline.addLast(new MessageDecoder());// 添加业务处理器pipeline.addLast(new BusinessHandler());}
}
5.2 自定义消息解码器
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {// 此时msg已经是完整帧(已剥离长度字段)int type = msg.readByte();  // 读取消息类型byte[] payload = new byte[msg.readableBytes()];msg.readBytes(payload);CustomMessage message = new CustomMessage(type, payload);out.add(message);}
}
5.3 自定义消息类
public class CustomMessage {private final int type;private final byte[] payload;public CustomMessage(int type, byte[] payload) {this.type = type;this.payload = payload;}// 编码方法(用于客户端)public ByteBuf encode() {ByteBuf buf = Unpooled.buffer();buf.writeByte(type);buf.writeBytes(payload);// 添加长度前缀ByteBuf finalBuf = Unpooled.buffer();finalBuf.writeInt(buf.readableBytes());finalBuf.writeBytes(buf);return finalBuf;}
}

6.高级配置技巧

6.1 字节序控制
new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN,  // 大端序1048576,0, 4, 0, 4
)
6.2 快速失败模式
new LengthFieldBasedFrameDecoder(1048576,0, 4, 0, 4
) {@Overrideprotected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {long frameLength = super.getUnadjustedFrameLength(buf, offset, length, order);if (frameLength < 0) {throw new CorruptedFrameException("负长度: " + frameLength);}return frameLength;}
};
6.3结合其他解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 转换为字符串
pipeline.addLast(new JsonDecoder()); // JSON解析

7. 常见问题解决方案

问题1:长度字段值包含哪些部分?

解决方案:使用 lengthAdjustment 参数精确控制:

  • 若长度字段仅包含消息体:lengthAdjustment = 0
  • 若长度字段包含自身和消息体:lengthAdjustment = -长度字段字节数
  • 若长度字段包含其他头部:lengthAdjustment = 额外部分的长度
问题2:如何处理变长头部?
// 示例:头部包含变长字段
pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576,0, 4,-4,      // 调整长度4        // 剥离长度字段
));pipeline.addLast(new HeaderDecoder()); // 自定义头部解码器public class HeaderDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {int headerLength = in.readByte(); // 读取头部长度byte[] header = new byte[headerLength];in.readBytes(header);// 剩余部分为消息体out.add(new CustomMessage(header, in.retain()));}
}
问题3:超大消息处理
// 分块处理超大消息
pipeline.addLast(new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, // 10MB0, 4, 0, 4
));pipeline.addLast(new ChunkedMessageHandler());public class ChunkedMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {while (msg.readableBytes() > 0) {int chunkSize = Math.min(1024 * 64, msg.readableBytes());ByteBuf chunk = msg.readRetainedSlice(chunkSize);processChunk(chunk);}}
}

8. 性能优化

8.1 使用池化ByteBuf
// 在引导中配置
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
8.2 避免内存复制
// 在解码器中直接使用ByteBuf
public class DirectDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {// 避免复制:直接使用ByteBufout.add(msg.retain());}
}
8.3 精确控制最大长度
// 根据业务需求设置合理的最大帧长度
new LengthFieldBasedFrameDecoder(getConfig().getMaxFrameSize(), // 从配置读取0, 4, 0, 4
)

通过合理配置 LengthFieldBasedFrameDecoder,您可以高效解决TCP粘包/拆包问题,构建稳定可靠的网络通信系统。

总结

自实现的 MultiThreadSharedChannelPool 提供了:

  1. 真正的多线程共享:每个物理连接可被多个线程同时使用
  2. 智能连接分配:优先使用未饱和的连接
  3. 高效并发控制:无锁队列+CAS原子操作
  4. 完整生命周期管理:创建、使用、回收、销毁
http://www.lryc.cn/news/594681.html

相关文章:

  • Spring MVC 核心工作流程
  • 二、SpringBoot-REST开发
  • OSS文件上传(三):断点续传
  • CentOS 系统上部署一个简单的 Web 应用程序
  • Git上传与下载GitHub仓库
  • 计算机网络:概述层---计算机网络的性能指标
  • FastMCP全篇教程以及解决400 Bad Request和session termination的问题
  • 网络服务(第三次作业)
  • 果园里的温柔之手:Deepoc具身智能如何重塑采摘机器人的“生命感知”
  • GoLand安装指南
  • QT6 源,七章对话框与多窗体(5) 文件对话框 QFileDialog 篇二:源码带注释
  • Android 默认图库播放视频没有自动循环功能,如何添加2
  • 文远知行推出与联想共研的100%车规级HPC 3.0计算平台
  • SpringDoc 基本使用指南
  • Boost库智能指针boost::shared_ptr详解和常用场景使用错误示例以及解决方法
  • 如何防止QQ浏览器录屏,盗录视频资源?
  • Pytorch02:深度学习基础示例——猫狗识别
  • MySQL(05) mysql锁,MVCC、Innodb行锁
  • 网络协议与层次对应表
  • Spring Boot 集成 RabbitMQ:普通队列、延迟队列与死信队列全解析
  • 我的网页聊天室设计
  • Python100个库分享第38个—lxml(爬虫篇)
  • sky-take-out项目中Redis的使用
  • 【Linux】Prometheus 监控 Kafka 集群
  • 基于大数据的旅游推荐系统 Python+Django+Hive+Vue.js
  • 关于 URL 中 “+“ 号变成空格的问题
  • 机器学习对词法分析、句法分析、浅层语义分析的积极影响
  • 人工智能真的能编程吗?研究勾勒出自主软件工程的障碍
  • [Python] -项目实战10- 用 Python 自动化批量重命名文件
  • 识别并计算滑块距离