当前位置: 首页 > news >正文

Java面试题027:一文深入了解数据库Redis(3)

Java面试题025:一文深入了解数据库Redis(1)

Java面试题026:一文深入了解数据库Redis(2)

        本节我们整理一下Redis高可用和消息队列使用场景的重点原理,让大家在面试或者实际工作中遇到这类问题时能够知道怎么入手,如何选择合理的方案,至于怎么去搭建和具体的操作步骤不是我们本节的内容。

1、主从结构

        Redis虽然读取写入的速度都特别快,但是也会产生读压力特别大的情况。为了分担读压力,Redis支持主从复制,保证主数据库的数据内容和从数据库的内容完全一致。

        Redis 主从架构是一种数据复制机制,用于提高数据库的可用性和扩展性。在这种架构中,数据可以从一个主节点(master)复制到一个或多个从节点(slave)。主节点负责处理写操作,而从节点则主要用于处理读操作,实现读写分离。

        根据拓扑复杂性可以分为三种:一主一从、一主多从、 树状主从结构。

开启方式

master:

[root@master src]# vim /etc/redis/6379.conf
#70行 修改监听地址为20.0.0.10 master地址
bind 20.0.0.10
#137行 开启守护进程
daemonize yes
#172行 修改日志文件目录
logfile /var/log/redis_6379.log
#264行 修改工作目录
dir /var/lib/redis/6379
#700行 开启AOF持久化功能
appendonly yes

slave1:

[root@slave1 src]# vim /etc/redis/6379.conf
#70行 修改监听地址为20.0.0.11 slave1地址
bind 20.0.0.11
#137行 开启守护进程
daemonize yes
#172行 修改日志文件目录
logfile /var/log/redis_6379.log
#264行 修改工作目录
dir /var/lib/redis/6379
#700行 开启AOF持久化功能
appendonly yes
#287 修改IP和端口 指向master
replicaof 20.0.0.10 6379

slave2:

[root@slave1 src]# vim /etc/redis/6379.conf
#70行 修改监听地址为20.0.0.12 slave2地址
bind 20.0.0.12
#137行 开启守护进程
daemonize yes
#172行 修改日志文件目录
logfile /var/log/redis_6379.log
#264行 修改工作目录
dir /var/lib/redis/6379
#700行 开启AOF持久化功能
appendonly yes
#287 修改IP和端口 指向master
replicaof 20.0.0.10 6379

复制原理

        先启动主节点,再启动从节点,从节点启动后,会向主数据库发送SYNC命令。同时主数据库收到SYNC命令后会开始在后台保存快照(即RDB持久化,在主从复制时,会无条件触发RDB),并将保存快照期间接收到的命令缓存起来,当快照完成后,redis会将快照文件和所有缓存命令发送给数据库。从数据库接收到快照文件和缓存命令后,会载入快照文件和执行命令,也就是说redis是通过RDB持久化文件和redis缓存命令来时间主从复制。----初始化复制。

        后续每当主数据库接到写命令时,就会将命令同步给从数据库,保证主从数据一致性。

        主从数据库断线重连后,主数据库只需要将断线期间执行的命令传送给从数据库。

       

复制方式

         主节点除了备份RDB文件之外还会维护者一个环形积压队列,以及环形队列的写索引和从节点同步的全局offset,环形队列用于存储最新的操作数据。
        每个redis实例会拥有一个唯一的运行id,当实例重启后,就会自动生成一个新的id。 从数据库会存储主数据库的运行id。
        主节点在复制同步阶段,主数据库每将一个命令传递给从数据库时,都会将命令存放到积压队列,并记录当前积压队列中存放命令的偏移量。

        从数据库接收到主数据库传来的命令时,会记录下偏移量。

(1)全量复制:一般发生在Slave初始化阶段

 在2.8之后,主从复制不再发送SYNC命令,取而代之的是PSYNC,格式为:“PSYNC ID offset”。

当主节点接到请求后,会判断请求是否满足以下两个条件,当满足时,不进行全量复制:

  • 从节点传递的run id和master的run id一致。
  • 主节点在环形队列上可以找到对应offset的值。

1. 发送psync命令进行数据同步,由于是第一次进行复制,从节点没有复制偏移量和主节点的运行ID,所以发送psync-1。

2. 主节点根据psync-1解析出当前为全量复制,回复+FULLRESYNC响应。

3. 从节点接收主节点的响应数据保存运行ID和偏移量offset

4. 主节点执行bgsave保存RDB文件到本地

5. 主节点发送RDB文件给从节点,从节点把接收的RDB文件保存在本地直接作为从节点数据文件

6. 对于从节点开始接收RDB快照到接收完成期间,主节点仍然响应读写命令,因此主节点会把这期间写命令数据保存在复制客户端缓冲区内,当从节点加载完RDB文件后,主节点再把缓冲区内的数据发送给从节点,保证主从之间数据一致性。

7. 从节点接收完主节点传送来的全部数据后会清空自身旧数据

8. 从节点清空数据后开始加载RDB文件

9. 从节点成功加载完RDB后,如果当前节点开启了AOF持久化功能, 它会立刻做bgrewriteaof操作,为了保证全量复制后AOF持久化文件立刻可用。

(2) 增量复制:

        增量复制主要是Redis针对全量复制的过高开销做出的一种优化措施, Slave初始化后开始正常工作时主服务器发生的写操作同步到从服务器的过程。

        增量复制的过程主要是主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令。

主从结构存在问题

  • 一旦主节点出现故障,需要手动将一个从节点晋升为主节点,同时需要修改应用方的主节点地址,还需要命令其他从节点去复制新的主节点,整个过程都需要人工干预。
  • 主节点的写能力受到单机的限制。
  • 主节点的存储能力受到单机的限制。

2、哨兵结构

        主从结构的手动重启和恢复都相对麻烦,这时候就需要哨兵登场了。

        哨兵的作用就是监控redis节点的运行状态,监控主数据库和从数据库是否能够正常运行,主数据库出现故障时自动将从数据库转换为主数据库。

当使用多个哨兵时,哨兵不仅会监控主数据库和从数据库,哨兵之间也会相互监控。

开启方式

[root@master ~]# vi redis-5.0.7/sentinel.conf
17行/protected-mode no                                  #关闭保护模式
26行/daemonize yes                                      #指定sentinel为后台启动
36行/logfile "/var/log/sentinel.log"                    #指定日志存放路径
65行/dir "/var/lib/redis/6379"                          #指定数据库存放路径
84行/sentinel monitor mymaster 20.0.0.10 6379 2         #至少几个哨兵检测到主服务器故障了,才会进行故障迁移,全部指向masterIP
113行/sentinel down-after-milliseconds mymaster 30000    #判定服务器down掉的时间周期,默认30000毫秒(30秒)sentinel auth-pass mymaster 123456
146行/sentinel failover-timeout mymaster 180000         #故障节的的最大超时时间为180000(180秒)

监控原理

Redis Sentinel通过三个定时监控任务完成对各个节点发现和监控:

        1. 每隔10秒,每个Sentinel节点会向主节点和从节点发送info命令获取最新的拓扑结构

        2. 每隔2秒,每个Sentinel节点会向Redis数据节点的sentinelhello 频道上发送该Sentinel节点对于主节点的判断以及当前Sentinel节点的信息

        3. 每隔1秒,每个Sentinel节点会向主节点、从节点、其余Sentinel节点发送一条ping命令做一次心跳检测,来确认这些节点当前是否存活。        

第一条操作的作用是获取当前数据库信息,比如发现新增从节点时,会建立连接,并加入到监控列表中,当主从数据库的角色发生变化进行信息更新。第二条操作的作用是将自己的监控数据和哨兵分享,发送的内容为:
<哨兵地址>,<哨兵端口>,<哨兵运行id>,<哨兵配置版本>,<主数据库名字>,<主数据库地址>,<主数据库端口>,<主数据库配置版本>,每个哨兵会订阅数据库的_sentinel_:hello频道,当其他哨兵收到消息后,会判断该哨兵是不是新的哨兵,如果是则将其加入哨兵列表,并建立连接。第三条操作的作用是监控节点是否存活。该时间间隔有down-after-millisecond实现,当该值小于1s时,哨兵会按照设定的值发送ping,当大于1s时,哨兵会间隔1s发送ping命令。

主观下线

        主观下线是当前哨兵节点认为某个节点有问题,客观下线就是超过一定数量的哨兵节点认为某个主节点有问题。

        每个Sentinel节点会每隔1秒对主节点、从节点、其他Sentinel节点发送ping命令做心跳检测,当这些节点超过 down-after-milliseconds没有进行有效回复,Sentinel节点就会对该节点做失败判定,这个行为叫做主观下线。

客观下线

        当Sentinel主观下线的节点是主节点时,该Sentinel节点会通过sentinel is- master-down-by-addr命令向其他Sentinel节点询问对主节点的判断,当超过 <quorum>个数,Sentinel节点认为主节点确实有问题,这时该Sentinel节点会做出客观下线的决定。

3、集群结构

        当数据量过大到一台服务器存放不下的情况时,主从模式或sentinel模式就不能满足需求了,这个时候需要对存储的数据进行分片,将数据存储到多个Redis实例中。cluster模式的出现就是为了解决单机Redis容量有限的问题,将Redis的数据根据一定的规则分配到多台机器。

        使用集群,只需要将redis配置文件中的cluster-enable配置打开即可。每个集群中至少需要三个主数据库才能正常运行。

        所有的节点都是一主一从(也可以是一主多从),其中从不提供服务,仅作为备用。

开启方式

参考下面这篇文章,很详细:

redis集群搭建之官方redis cluster 搭建实践_redis cluster搭建-CSDN博客文章浏览阅读2.2w次,点赞12次,收藏73次。redis cluster是官方的redis集群实现,本篇文章为搭建集群实践篇一、手动搭建redis官方已经redis-trib.rb命令来给我们实现redis搭建了。但是为了了解原理,首先我们来手动搭建不使用官方的命令。如果大家想快速搭建,可以直接跳到二。1、准备我们这个例子是在单机上部署集群,实际的工作情况会在不同的机器上搭建,一方面为了保证高可用也是为了扩大数据的容量所以实际中会在不同的机器..._redis cluster搭建 https://blog.csdn.net/fst438060684/article/details/80712433

集群创建过程

(1)设置节点

        Redis集群一般由多个节点组成,节点数量至少为6个才能保证组成完整高可用的集群。每个节点需要开启配置 cluster-enabled yes,让Redis运行在集群模式。

(2)节点握手

        节点握手是指一批运行在集群模式下的节点通过Gossip协议彼此通信, 达到感知对方的过程。节点握手是集群彼此通信的第一步,由客户端发起命令:cluster meet{ip}{port}。完成节点握手之后,一个的Redis节点就组成了一个多节点的集群。

(3)分配槽(slot

        Redis集群把所有的数据映射到16384个槽中。每个节点对应若干个槽,只有当节点分配了槽,才能响应和这些槽关联的键命令。通过 cluster addslots命令为节点分配槽。

4、消息队列

(1)使用list作为队列

        Redis的列表类型可以用来实现队列,并且支持阻塞式读取。 在Redis中,List类型是按照插入顺序排序的字符串链表。

  • lpush生产消息,rpop消费消息

redis.properties

redis.url=localhost
redis.port=6379
redis.maxIdle=30
redis.minIdle=10
redis.maxTotal=100
redis.maxWait=10000

工具类:

public class JedisPoolUtils {private static JedisPool pool = null;static {//加载配置文件InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties");Properties pro = new Properties();try {pro.load(in);} catch (IOException e) {e.printStackTrace();}//获得池子对象JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大闲置个数poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString()));}//获得jedis资源的方法public static Jedis getJedis() {return pool.getResource();}
}

消息生产者:

public class MessageProducer extends Thread {public static final String MESSAGE_KEY = "message:queue";private volatile int count;public void putMessage(String message) {Jedis jedis = JedisPoolUtils.getJedis();Long size = jedis.lpush(MESSAGE_KEY, message);System.out.println(Thread.currentThread().getName() + " put message,size=" + size + ",count=" + count);count++;}@Overridepublic synchronized void run() {for (int i = 0; i < 5; i++) {putMessage("message" + count);}}public static void main(String[] args) {MessageProducer messageProducer = new MessageProducer();Thread t1 = new Thread(messageProducer, "thread1");Thread t2 = new Thread(messageProducer, "thread2");Thread t3 = new Thread(messageProducer, "thread3");Thread t4 = new Thread(messageProducer, "thread4");Thread t5 = new Thread(messageProducer, "thread5");t1.start();t2.start();t3.start();t4.start();t5.start();}
}

redis后台查看:

127.0.0.1:6379> lrange message:queue 0 -11) "message24"2) "message23"3) "message22"4) "message21"5) "message20"6) "message19"7) "message18"8) "message17"9) "message16"
10) "message15"
11) "message14"
12) "message13"
13) "message12"
14) "message11"
15) "message10"
16) "message9"
17) "message8"
18) "message7"
19) "message6"
20) "message5"
21) "message4"
22) "message3"
23) "message2"
24) "message1"
25) "message0"

消费者:

public class MessageConsumer implements Runnable {public static final String MESSAGE_KEY = "message:queue";private volatile int count;public void consumerMessage() {Jedis jedis = JedisPoolUtils.getJedis();String message = jedis.rpop(MESSAGE_KEY);System.out.println(Thread.currentThread().getName() + " consumer message,message=" + message + ",count=" + count);count++;}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread6");Thread t2 = new Thread(messageConsumer, "thread7");t1.start();t2.start();}
}

结果:

thread6 consumer message,message=message0,count=0
thread6 consumer message,message=message1,count=1
thread6 consumer message,message=message2,count=2
thread6 consumer message,message=message3,count=3
thread7 consumer message,message=message4,count=4
thread6 consumer message,message=message5,count=5
thread7 consumer message,message=message6,count=6
thread6 consumer message,message=message7,count=7
thread7 consumer message,message=message8,count=8
thread6 consumer message,message=message9,count=9
thread7 consumer message,message=message10,count=10
thread6 consumer message,message=message11,count=11
thread7 consumer message,message=message12,count=12
thread6 consumer message,message=message13,count=13
thread7 consumer message,message=message14,count=14
thread6 consumer message,message=message15,count=15
thread7 consumer message,message=message16,count=16
thread6 consumer message,message=message17,count=16
thread7 consumer message,message=message18,count=18
thread6 consumer message,message=message19,count=19
thread7 consumer message,message=message20,count=20
thread6 consumer message,message=message21,count=20
thread7 consumer message,message=message22,count=22
thread6 consumer message,message=message23,count=22
thread7 consumer message,message=message24,count=24
thread6 consumer message,message=null,count=25
thread7 consumer message,message=null,count=26
thread6 consumer message,message=null,count=27
thread7 consumer message,message=null,count=28
thread6 consumer message,message=null,count=28
thread7 consumer message,message=null,count=30
thread6 consumer message,message=null,count=31...

        这种方式,消费者死循环rpop从队列中消费消息。即使队列里没有消息,也会进行rpop,会导致Redis CPU的消耗。

  • lpush生产消息,brpop消费消息
        brpop是 rpop 的阻塞版本, list 为空的时候,它会一直阻塞,直到 list 中有值或者超时。
public class MessageConsumer implements Runnable {public static final String MESSAGE_KEY = "message:queue";private volatile int count;private Jedis jedis = JedisPoolUtils.getJedis();public void consumerMessage() {List<String> brpop = jedis.brpop(0, MESSAGE_KEY);//0是timeout,返回的是一个集合,第一个是消息的key,第二个是消息的内容System.out.println(brpop);}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread6");Thread t2 = new Thread(messageConsumer, "thread7");t1.start();t2.start();}
}

(2)使pub/sub来进行消息的发布/订阅

        redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式。"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。

        发布者发布消息的命令是  publish,用法是 publish channel message。

        订阅频道的命令是 subscribe,可以同时订阅多个频道,用法是 subscribe channel1 [channel2 ...]。不会收到订阅之前就发布到该频道的消息。

        还可以使用psubscribe命令订阅指定的规则。规则支持通配符格式。命令格式为      psubscribe pattern [pattern ...]订阅多个模式的频道。

  通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符。

例如订阅者订阅三个通配符频道: psubscribe c? b* d?*

C:\Users\liqiang>redis-cli
127.0.0.1:6379> publish c m1
(integer) 0
127.0.0.1:6379> publish c1 m1
(integer) 1
127.0.0.1:6379> publish c11 m1
(integer) 0
127.0.0.1:6379> publish b m1
(integer) 1
127.0.0.1:6379> publish b1 m1
(integer) 1
127.0.0.1:6379> publish b11 m1
(integer) 1
127.0.0.1:6379> publish d m1
(integer) 0
127.0.0.1:6379> publish d1 m1
(integer) 1
127.0.0.1:6379> publish d11 m1
(integer) 1

        上面返回值为1表示被订阅者所接受,可以匹配上面的通配符。        

        使用psubscribe命令可以重复订阅同一个频道,如客户端执行了psubscribe c? c?*。这时向c1发布消息客户端会接受到两条消息

生产者:

public class MessageProducer extends Thread {public static final String CHANNEL_KEY = "channel:1";private volatile int count;public void putMessage(String message) {Jedis jedis = JedisPoolUtils.getJedis();Long publish = jedis.publish(CHANNEL_KEY, message);//返回订阅者数量System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish);count++;}@Overridepublic synchronized void run() {for (int i = 0; i < 5; i++) {putMessage("message" + count);}}public static void main(String[] args) {MessageProducer messageProducer = new MessageProducer();Thread t1 = new Thread(messageProducer, "thread1");Thread t2 = new Thread(messageProducer, "thread2");Thread t3 = new Thread(messageProducer, "thread3");Thread t4 = new Thread(messageProducer, "thread4");Thread t5 = new Thread(messageProducer, "thread5");t1.start();t2.start();t3.start();t4.start();t5.start();}
}

subscribe消费者:

public class MessageConsumer implements Runnable {public static final String CHANNEL_KEY = "channel:1";//频道public static final String EXIT_COMMAND = "exit";//结束程序的消息private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//处理接收消息public void consumerMessage() {Jedis jedis = JedisPoolUtils.getJedis();jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一个参数是处理接收消息,第二个参数是订阅的消息频道}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread5");Thread t2 = new Thread(messageConsumer, "thread6");t1.start();t2.start();}
}/*** 继承JedisPubSub,重写接收消息的方法*/
class MyJedisPubSub extends JedisPubSub {@Override/** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现* 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage* 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]**/public void onMessage(String channel, String message) {System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);//接收到exit消息后退出if (MessageConsumer.EXIT_COMMAND.equals(message)) {System.exit(0);}}
}

psubscribe消费者:

public class MessageConsumer implements Runnable {public static final String CHANNEL_KEY = "channel*";//频道public static final String EXIT_COMMAND = "exit";//结束程序的消息private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//处理接收消息public void consumerMessage() {Jedis jedis = JedisPoolUtils.getJedis();jedis.psubscribe(myJedisPubSub, CHANNEL_KEY);//第一个参数是处理接收消息,第二个参数是订阅的消息频道}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread5");Thread t2 = new Thread(messageConsumer, "thread6");t1.start();t2.start();}
}/*** 继承JedisPubSub,重写接收消息的方法*/
class MyJedisPubSub extends JedisPubSub {@Overridepublic void onPMessage(String pattern, String channel, String message) {System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message);//接收到exit消息后退出if (MessageConsumer.EXIT_COMMAND.equals(message)) {System.exit(0);}}
}

(3)缺点

Redis可以提供基本的发布订阅功能,但毕竟不像消息队列那种专业级别,所以会存在以下缺点:

  • redis无法对消息持久化存储,消息一旦被发送,如果没有订阅者接收,数据会丢失

  • 消息队列提供了消息传输保障,当客户端连接超时或事物回滚的等情况发生时,消息会重新发布给订阅者,redis没有该保障,导致的结果就是在订阅者断线超时或其他异常情况时,将会丢失所有发布者发布的信息

  • 若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃

http://www.lryc.cn/news/574279.html

相关文章:

  • Arrays.asList和 List<String> list = new ArrayList<>();有什么区别
  • C++11的内容
  • 智能生成分析报告系统在危化安全生产监测预警评估中的应用
  • NoSQL 之 Redis 配置与优化
  • 【科技公司的管理】
  • 深度解析 Caffeine:高性能 Java 缓存库
  • ​​MQTT​​通讯:​​物联网
  • 爬虫003----requests库
  • UP COIN:从 Meme 共识走向公链与 RWA 的多元生态引擎
  • VLN论文复现——VLFM(ICRA最佳论文)
  • 如何快速判断Excel文档是否被修改过?Excel多版本比对解决方案
  • 睿是信息携手Arctera,深化服务中国市场,共筑数据管理新未来
  • css元素超过两行隐藏并显示省略号全网独一份
  • 2025年CSS最新高频面试题及核心解析
  • ADIOS2 介绍与使用指南
  • 后台发热、掉电严重?iOS 应用性能问题实战分析全过程
  • 【数据结构初阶】--顺序表(一)
  • 【go的测试】单测之gomock包与gomonkey包
  • 板凳-------Mysql cookbook学习 (十--9)
  • K8S: etcdserver: too many requests
  • Halcon ——— OCR字符提取与多类型识别技术详解
  • Java 程序设计试题​
  • 多智能体协同的力量:赋能AI安全报告系统的智能设计之道
  • Elasticsearch(ES)与 OpenSearch(OS)
  • 苹果芯片macOS安装版Homebrew(亲测)
  • LoHoVLA技术:让机器人像人类一样思考与行动的统一框架
  • AI 智能体架构设计3阶段演进和3大关键技术对比剖析
  • 硬件工程师笔试面试高频考点汇总——(2025版)
  • 最近小峰一直在忙国际化项目,确实有点分身乏术... [特殊字符] 不过! 我正紧锣密鼓准备一系列干货文章/深度解析
  • SpringBoot中使用表单数据有效性检验