八股——IM项目
文章目录
- 1、 如何对用户的登录状态进行管理?
- 2、 服务进行水平扩展,如何正确找到用户的 websocket 长链接?
- 3、 整个消息是如何在系统中流转的?
- 4、 群聊消息是如何实现的?
- 5、 如何检测 TCP 连接已经断开
- 6 、如何保证消息不丢失?
- 思:ACK消息是什么?
- 7、项目中没有实现一个客户断线重连的功能,在项目中要怎么基于Netty实现这个短线重连功能?
- 8、我们用户登录后会去批量拉取离线消息,假设这个消息很多,用户获取消息的时间可能要几秒钟,这时候有其他用户持续给这个用户发送消息。要怎么保证用户实际拉取的信息要早于消息拉取期间收到的消息?
- 9、拉取离线消息和发送实时消息应该是一个并行的操作吧,那要怎么维持一个先处理离线消息再处理实时消息呢,是每一次实时消息都要判断一下当前 sequenceId 是否大于index才能进行推送,不然就阻塞吗。或者可不可以在用户登录拉取离线消息时,为用户创建一个消息队列,拉取的消息和实时通讯的消息都经过消息队列进行最终推送实现有序性。这样感觉也会增加一些维护的成本?
- 10、为每个用户维护一个全局自增的sequenceID要怎么实现呢,目前使用的是雪花算法来做分布式的全局ID,但是在分布式情况下也会产生两边服务器时间不一致,或者是时钟回拨时造成消息ID并不是趋势递增的情况。如果要为每个用户单独维护sequenceID的话(假设存放在用户表里),是不是每一次离线消息存储时都要去在比如说用户表里显性的更新最大的sequenceID?
- 思:分布式系统与多线程的区别?
- 思: 行级锁在分布式系统中的应用?
- 11、如果数据量太大了给消息队列塞满了怎么办?
- 12、群聊怎么实现?(自叙述)
1、 如何对用户的登录状态进行管理?
这里登录状态的管理分为两部分,一方面需要在 redis 中存放用户的登录信息(用户->其所链接的 Netty 服务器的信息),第二方面是需要在长链接模块中维护用户ID与 ws 的对应关系。
2、 服务进行水平扩展,如何正确找到用户的 websocket 长链接?
在用户进行登录时,会返回给用户一个token,用户携带token访问网关并通过鉴权即可与长链接模块建立长链接。当然在此网关会进行负载均衡选择一台长链接服务器与用户建立长链接。当用户与长链接服务完成三次握手权限校验等一系列动作后,将会把用户 ID 与长链接服务器的 IP 作为一对映射关系放入到redis 中,由 redis 完成分布式的用户 ID 和 长链接服务器的映射管理,这样当用户 A给用户 B发消息时,消息中转模块将会通过用户 B的 ID拿到与之建立长链接的服务器,将消息转发过去。
在长链接服务中,同时也维护了用户 ID 与其管道的映射关系,这样就可以当消息转发到长链接服务中时,可以通过用户 ID 拿到与其对应的管道,将消息发送给用户。
3、 整个消息是如何在系统中流转的?
4、 群聊消息是如何实现的?
群聊的转发消息的流程与单聊转发消息流程略显不同,单聊时消息中转模块会访问redis拿到与用户连接的长链接服务器,再将消息精准的转发过去;而群聊消息是消息中转将这条消息转发给全部的长链接服务器。这样做的原因如下:
- 单聊消息的接受者只有一个,系统仅仅访问一次redis即可拿到对应的长链接服务器地址将消息转发过去。
- 群聊消息的接受者往往是多个的,有时候甚至是几十个、几百个,这样如何按照以上规则进行消息转发,那么转发一次消息给群组中所有的人将会访问几十次甚至几百的redis,一方面会对redis造成压力,另一方面会对网络I0的开销产生压力。我们注意到,一般我们的长链接服务器不会很多,在中小型系统中一般在3-5台服务器之间,那么如果我们将群聊消息转发给全部的长链接服务器,也仅仅进行3-5次网络传输,远远小于每次查询redis的开销。
5、 如何检测 TCP 连接已经断开
TCP 链接数量是计算机中一个重要的资源和指标,及时释放掉没有用的 TCP 链接是一个必要选择,防止 TCP 连接泄漏。在检测 TCP 连接断开方面我们有以下几种手段。
- 在操作系统层面,我们必须开启keep-alive机制,在操作系统层面进行一次检测。
- TCP Keepalive机制 是一种用于检测空闲连接是否存活的传输层保活策略。它通过定期发送探测报文,确认对端是否仍在线,避免因网络中断或对端崩溃导致连接长期占用资源。
- 在应用层代码层面,在我们的系统中加入了心跳机制,当管道长期处于空闲(管道中没有消息传输)时,netty2将会自动帮我们触发读空闲超时,我们会在代码层面对相应的管道进行关闭,释放掉 TCP 连接资源,代码如下:
6 、如何保证消息不丢失?
在保证消息不丢失方面,有以下几种方案和策略:
- 在消息转发模块收到一条消息时,会先将其放入到kafka中,再进行消息的转发,这样如果放入到kafka失败了,那么证明消息持久化存储就失败了,直接返回报错。
- 在kafka层面上保证消息的不丢失,具体详情查看kafka如何保证消息不丢失。
- 当用户收到一个消息时,他如何保证其前面所有的消息都已经收到了呢? 这里我们可以让客户端在聊天室发送消息时,将上一条消息的 ID 也带上,这样当接受者收到一条消息时,他就可以对比起上一条收到的消息ID与本条消息中携带的上一条消息ID是否相同,如果不同则可以去离线消息模块中查询其未获得消息,保证消息的可靠性。另一方面,也可以每个聊天室维护一个全局递增的ID,这样在发送每条消息的时候就不用带上上一条消息的ID了。
- 当用户收到一条消息时,需要向服务端发送 ACK 信息,告诉服务端其已经收到消息,同时服务端应该在本地维护一个队列,存储尚未收到 ACK 的消息,并且可以设计的像 TCP 那样,如果没有收到 ACK 消息,对消息进行超时重传。
回忆:Kafka 如何保证消息的的不丢失?
-
- 生产者Producer,使用带回调通知的send(msg,callback)方法,并且设置acks = all 。它的消息投递要采用同步的方式。Producer要保证消息到达服务器,就需要使用到消息确认机制,也就是说,必须要确保消息投递到服务端,并且得到投递成功的响应,确认服务器已接收,才会继续往下执行。
-
- 设置broker中的配置项unclean.leader.election.enable = false,保证所有副本同步。同时,Producer将消息投递到服务器的时候,我们需要将消息持久化,也就是说会同步到磁盘。注意,同步到硬盘的过程中,会有同步刷盘和异步刷盘。如果选择的是同步刷盘,那是一定会保证消息不丢失的。就算刷盘失败,也可以即时补偿。但如果选择的是异步刷盘的话,这个时候,消息有一定概率会丢失。
-
- 消费者Consume。设置enable.auto.commit为false。在Kafka中,消息消费完成之后,它不会立即删除,而是使用定时清除策略,也就是说,我们消费者要确保消费成功之后,手动ACK提交。如果消费失败的情况下,我们要不断地进行重试。所以,消费端不要设置自动提交,一定设置为手动提交才能保证消息不丢失。
思:ACK消息是什么?
ACK 机制:
- 定义:ACK(Acknowledgement)是确认机制,接收方收到消息后向发送方发送 ACK,告知消息已被成功接收。
- 作用:确保消息可靠性。如果发送方未收到 ACK(超时),则对消息进行超时重传。
- 实现:ACK 包含消息 ID 和确认状态。接收方处理完消息后发送 ACK 给发送方。
7、项目中没有实现一个客户断线重连的功能,在项目中要怎么基于Netty实现这个短线重连功能?
在基于 Netty 的 IM 项目中实现客户端断线重连功能,可以通过** Ping-Pong 心跳机制 + 指数退避重连策略来实现,客户端定时(如每30秒)发送 Ping 消息,服务端收到后回复 Pong 响应:若客户端连续3次未收到Pong(约 90 秒超时),则先关闭当前连接,随后触发指数退避重连(首次1秒,之后2秒、4秒、8秒…逐步延长,避免重连风暴),同时需注意在连接失效时取消心跳任务防止内存泄漏。这里的指数退避重连**是指间隔多久客户端触发重连机制,可以避免加重服务器压力、适应网络恢复的渐进性和节省客户端资源。
8、我们用户登录后会去批量拉取离线消息,假设这个消息很多,用户获取消息的时间可能要几秒钟,这时候有其他用户持续给这个用户发送消息。要怎么保证用户实际拉取的信息要早于消息拉取期间收到的消息?
在服务端设计中,通过为每个用户维护一个全局自增的消息版本号(sequenceId)来严格保证消息顺序性。所有消息(包括离线消息和实时消息)均按 sequenceId 有序存储。当客户端发起离线消息拉取请求时,服务端首先记录当前最大 sequenceId 为 Index,随后返回所有 sequenceId ≤Index 的历史消息;在此之后产生的新消息,其 sequenceId 都会严格大于 Index。对于实时消息处理,在拉取过程中收到的新消息出于 sequenceId 必然大于Index,客户端会先处理完离线消息(sequenceId<Index 的部分),再按顺序处理新到达的消息(sequenceId>Index 的部分),从而确保消息时序的严格正确性。
9、拉取离线消息和发送实时消息应该是一个并行的操作吧,那要怎么维持一个先处理离线消息再处理实时消息呢,是每一次实时消息都要判断一下当前 sequenceId 是否大于index才能进行推送,不然就阻塞吗。或者可不可以在用户登录拉取离线消息时,为用户创建一个消息队列,拉取的消息和实时通讯的消息都经过消息队列进行最终推送实现有序性。这样感觉也会增加一些维护的成本?
在拉取离线消息期间,实时消息到达时,服务端/客户端可以暂存这些消息(sequenceId>index),等到离线消息拉取完成后,再推送暂存的实时消息。
服务端实现: 客户端发起离线消息拉取请求时,服务端记录当前用户的最大 sequenced 为 Index。拉取期间到达的实时消息(sequenceId>Index)被暂存到内存或持久化队列。离线消息拉取完成后,服务端按 sequenceId 顺序推送暂存的实时消息。主要的点在于暂存队列的维护,需要在用户连接上时为每个连接分配一个临时队列,离线消息拉取完成后触发队列消费,消费完要清除队列。
客户端实现: 服务端不做暂存,而是将所有消息(包括实时消息)立即推送到客户端。客户端本地维护两个缓冲区,离线消息区:只处理 sequenceId <= Index的消息。实时消息区:缓存 sequenceId>Index 的消息,待离线消息处理完毕后再合并处理。
10、为每个用户维护一个全局自增的sequenceID要怎么实现呢,目前使用的是雪花算法来做分布式的全局ID,但是在分布式情况下也会产生两边服务器时间不一致,或者是时钟回拨时造成消息ID并不是趋势递增的情况。如果要为每个用户单独维护sequenceID的话(假设存放在用户表里),是不是每一次离线消息存储时都要去在比如说用户表里显性的更新最大的sequenceID?
这个为每个用户维护一个全局自增的 sequenceID 的话可以再创一个表了,暂且我们叫做 user_message 表吧,每个用户创建的时候初始化创建一行数据,user_message 放的就两个字段,userId,sequenceId(这个用MySQL 自带的自增就好了),分布式情况下可以加一个行级锁。或者我们不用MySQL,直接用 Redis,维护一个key 为 useId,然后 value 为 sequenceId 的集合就好了,这样搜索更快,还能用** Lua 脚本**来保证原子性,而且在分布式情况下也适用。当然 Redis 需要做数据持久化,这时候我们可以用 Redis + MySQL,先在 Redis 上面记录然后定时同步到 MySQL,或者说你先存储 MySQL,然后通过 Canal(可以做到数据最终一致性)同步到 Redis,然后用户拿这个 sequenceID 先去 Redis 里面拿,如果没有再去 MySQL 里面拿。
思:分布式系统与多线程的区别?
一、分布式系统
(一)定义
分布式系统 是由多台计算机组成的系统,这些计算机通过网络连接在一起,协同完成一个任务。每台计算机(称为节点)都有自己的处理器、内存和存储设备,它们通过消息传递的方式进行通信和协作。
(二)特点
- 独立性:每个节点都是独立的计算机,有自己的硬件和操作系统。
- 通信:节点之间通过网络通信,消息传递是分布式系统的核心机制。
- 容错性:分布式系统通常设计为高可用性,即使部分节点故障,系统仍然可以正常运行。
- 可扩展性:可以通过增加更多的节点来扩展系统的处理能力。
(三)应用场景
- 云计算:如 AWS、Azure 等云服务提供商,通过分布式系统提供计算资源。
- 大数据处理:如 Hadoop、Spark 等框架,通过分布式计算处理海量数据。
- 分布式数据库:如 Cassandra、Redis 等,通过分布式存储和计算提高性能和可靠性。
二、多线程
(一)定义
多线程 是指在一个程序中同时运行多个线程。线程是程序执行的最小单位,多个线程可以共享同一进程的资源(如内存和文件句柄)。
(二)特点
- 共享资源:多个线程共享同一进程的内存空间和资源。
- 并发执行:多个线程可以同时运行,提高程序的执行效率。
- 轻量级:线程的创建和切换比进程更轻量级,开销更小。
- 同步机制:需要使用同步机制(如互斥锁、信号量)来避免线程之间的冲突。
(三)应用场景
- 多任务处理:如浏览器同时加载多个网页、播放视频和下载文件。
- 服务器端编程:如 Web 服务器同时处理多个客户端请求。
- 并发计算:如多线程计算矩阵乘法、图像处理等。
三、分布式系统与多线程的区别
(一)硬件资源
- 分布式系统:由多台独立的计算机组成,每台计算机有自己的硬件资源。
- 多线程:在同一台计算机上运行,共享同一进程的资源。
(二)通信机制
- 分布式系统:节点之间通过网络通信,消息传递是主要的通信方式。
- 多线程:线程之间通过共享内存通信,通常使用同步机制(如互斥锁、信号量)来协调。
(三)容错性
- 分布式系统:设计为高可用性,部分节点故障不会影响整个系统的运行。
- 多线程:如果一个线程崩溃,通常会影响整个进程,需要通过异常处理来避免。
(四)可扩展性
- 分布式系统:通过增加更多的节点来扩展系统的处理能力。
- 多线程:通过增加更多的线程来提高程序的并发性,但受限于单台计算机的资源。
(五)复杂性
- 分布式系统:设计和实现更复杂,需要考虑网络通信、容错机制、数据一致性等问题。
- 多线程:相对简单,主要关注线程同步和资源共享。
四、总结
- 分布式系统:由多台计算机组成,通过网络通信协同工作,具有高可用性和可扩展性。
- 多线程:在同一台计算机上运行多个线程,共享资源,提高程序的并发性和效率。
- 区别:分布式系统涉及多台计算机和网络通信,而多线程在同一台计算机上运行,共享资源。
思: 行级锁在分布式系统中的应用?
一、什么是行级锁?
(一)定义
行级锁 是一种细粒度的锁机制,锁定的是表中的具体数据行。相比于表级锁,行级锁允许多个事务在同一个表中并发执行不同行上的操作,而不会互相阻塞。
(二)特点
- 细粒度:只锁定具体的行,而不是整个表,适用于高并发场景。
- 高并发性:多个事务可以同时锁定不同的行,减少锁竞争。
- 开销较大:相比于表级锁,行级锁的管理成本更高,因为需要维护更多的锁信息。
二、为什么在分布式系统中需要行级锁?
(一)分布式系统的特性
- 分布式系统 由多台计算机组成,每台计算机通过网络通信协同工作。
- 在分布式系统中,多个节点可能同时访问和更新数据库中的数据,容易出现并发冲突。
(二)行级锁的作用
- 防止并发冲突:在分布式系统中,多个节点可能同时对同一行数据进行更新。行级锁可以确保在同一时刻只有一个节点可以更新特定的行,从而避免数据不一致的问题。
- 提高并发性能:相比于表级锁,行级锁只锁定需要操作的行,允许其他事务并发操作表中的其他行,从而提高系统的并发性能。
三、如何实现行级锁?
(一)在 MySQL 中实现行级锁
-
使用
SELECT ... FOR UPDATE
- 在事务中,使用
SELECT ... FOR UPDATE
语句可以锁定特定行。 - 示例:
START TRANSACTION; SELECT * FROM user_message WHERE userId = 1 FOR UPDATE; UPDATE user_message SET sequenceId = sequenceId + 1 WHERE userId = 1; COMMIT;
- 解释:
SELECT ... FOR UPDATE
会锁定查询到的行,直到事务提交或回滚。
- 在事务中,使用
-
使用
UPDATE
语句- 直接使用
UPDATE
语句也可以锁定行。 - 示例:
START TRANSACTION; UPDATE user_message SET sequenceId = sequenceId + 1 WHERE userId = 1; COMMIT;
- 解释:
UPDATE
语句会自动锁定被更新的行。
- 直接使用
(二)在 Redis 中实现行级锁
-
使用 Lua 脚本
- Redis 支持 Lua 脚本,可以将多个操作封装在一个 Lua 脚本中,确保操作的原子性。
- 示例 Lua 脚本:
local userId = KEYS[1] local sequenceId = redis.call('INCR', userId .. ':sequenceId') return sequenceId
- 在 C/C++ 中调用 Lua 脚本:
redisCommand(context, "EVAL \"local userId = KEYS[1] local sequenceId = redis.call('INCR', userId .. ':sequenceId') return sequenceId\" 1 %s", userId);
-
使用 Redis 的事务机制
- Redis 的事务机制可以确保多个命令的原子性执行。
- 示例:
MULTI INCR userId:sequenceId EXEC
四、总结
- 行级锁 是一种细粒度的锁机制,锁定的是表中的具体数据行。
- 分布式系统 中需要行级锁来防止并发冲突,提高并发性能。
- MySQL 中可以通过
SELECT ... FOR UPDATE
或UPDATE
语句实现行级锁。 - Redis 中可以通过 Lua 脚本或事务机制实现行级锁。
11、如果数据量太大了给消息队列塞满了怎么办?
- 原因:消息队列出现消息堆积,通常是因为消息的消费速度远小于生产速度。
- 消费者:
- 增加消费者实例:扩展消费者组的实例数量,确保消费者数量与分区数匹配
- 异步批处理:消费者采用批量拉取消息(
max.poll.records
)并异步处理,提升吞吐量。 - 多线程:将消息拉取与业务处理分离,使用线程池异步处理消息。例如,主线程负责从
Kafka
拉取消息,放入阻塞队列,子线程池并行处理消息。 - 流控机制:动态调整 Kafka 消费速率,流控机制的核心思想是根据队列的剩余容量动态调整消费速率,确保生产和消费的平衡
- 离线消息异步落库:将离线消息直接存入数据库,绕过 Kafka,减少队列压力
- Broker:
- 临时 Topic 应急:若积压严重,可新建临时 Topic,增加分区数,将积压消息转发至临时 Topic,同时启动更多消费者快速处理,处理完成后恢复原架构。
- 分区动态分配:监控分区负载情况,通过
Kafka
的kafka-reassign-partitions
工具均衡分区分布,避免部分分区过载。
- 生产者:
- 限流:在生产者端设置限流机制,避免消息生产速度过快(令牌桶算法)
- 批量发送:将多条消息批量发送,减少网络请求次数,可以通过设置
batch.size
参数来控制批量消息的大小。例如,设置batch.size = 1024 (1KB)
是,当消息累计到达1KB
时,生产者将这批消息一次性发送出去。 - 业务降级:当系统过载时,优先确保关键消息(如文字消息)发送,非关键消息(如图片,表情包,文件)延迟发送或暂存
- 容灾与监控:
- 消息持久化:引入本地缓存(如
Redis
)或外部存储,将积压消息暂存并异步处理,避免丢失 - 重试:设置死信队列(DLQ),捕获处理失败的消息,定时重试或人工介入
- 熔断:监控消费者延迟,超过阈值时触发告警,并暂停非核心业务的消息生产
- 消息持久化:引入本地缓存(如