当前位置: 首页 > news >正文

93、Redis 之 使用连接池管理Redis6.0以上的连接 及 消息的订阅与发布

★ 使用连接池管理Redis连接

从Redis 6.0开始,Redis可支持使用多线程来接收、处理客户端命令,因此应用程序可使用连接池来管理Redis连接。

上一章讲的是创建单个连接来操作redis数据库,这次使用连接池来操作redis数据库

Lettuce连接池 支持需要 Apache Commons Pool2 的支持,需要添加该依赖

接下来即可在程序中通过类似如下代码片段来创建连接池了。
var conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();

conf.setMaxTotal(20); // 设置连接池中允许的最大连接数

// 创建连接池对象(其中连接由redisClient的connectPubSub方法创建)
pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, conf);

代码演示

创建连接池对象,创建两个消息订阅者和一个消息发布者,然后操作redis数据库

1、添加依赖
在这里插入图片描述

Subscriper 第一个消息订阅者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
启动这个消息订阅者的程序
在这里插入图片描述

Subscriper 第二个消息订阅者

直接拷贝第一个消息订阅者,然后修改这个消息订阅者只订阅 c2 这个channel 主题
在这里插入图片描述

Publisher 消息发布者

也是拷贝消息订阅者的代码,因为创建连接池对象的代码都是一样的。
这里只需要把消息订阅的方法改成消息发布的方法就可以了,其他代码一样。

在这里插入图片描述

测试:

测试成功
消息发布者成功发布消息
消息订阅者也能接收到各自订阅的channel的消息
用小黑窗测试也没有问题
在这里插入图片描述

完整代码

Subscriper

package cn.ljh.app;import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import java.time.Duration;//使用 Lettuce ,这个类是消息订阅者
//通过连接池操作redis数据库
public class Subscriper
{private RedisClient redisClient;//连接池pool对象private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;public void init(){//1、定义RedisURIRedisURI uri = RedisURI.builder().withHost("127.0.0.1").withPort(6379)//选择redis 16个数据库中的哪个数据库.withDatabase(0).withPassword(new char[]{'1', '2', '3', '4', '5', '6'}).withTimeout(Duration.ofMinutes(5)).build();//2、创建 RedisClient 客户端this.redisClient = RedisClient.create(uri);//创建连接池的配置对象//GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();//设置连接池允许的最大连接数conf.setMaxTotal(20);//3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);}//关闭资源public void closeResource(){//关闭连接池--先开后关this.pool.close();//关闭RedisClient 客户端------最先开的最后关this.redisClient.shutdown();}//订阅消息的方法public void subscribe() throws Exception{//从连接池中取出连接StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();//4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法RedisPubSubCommands cmd = conn.sync();//监听消息:消息到来时,是通过监听器来实现的conn.addListener(new RedisPubSubAdapter<>(){//匿名内部类重写这3个方法:收到消息、订阅主题、取消订阅主题//接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看)//接收消息的方法@Overridepublic void message(String channel, String message){System.err.printf("从 %s 收到消息 : %s\n " , channel , message);}//订阅普通channel激发的方法,//订阅主题的方法--下面有这个订阅的方法cmd.subscribe("c1", "c2");//不太清楚这个 subscribed方法 和 下面的 cmd.subscribe 方法的关联 todo@Overridepublic void subscribed(String channel, long count){System.err.println("完成订阅 :" + count);}//不订阅普通的channel所使用方法--取消订阅//取消订阅的方法@Overridepublic void unsubscribed(String channel, long count){System.err.println("取消订阅");}});//订阅消息------订阅了 c1 和 c2 这两个主题 channelcmd.subscribe("c1", "c2");}public static void main(String[] args) throws Exception{Subscriper subscriper = new Subscriper();subscriper.init();subscriper.subscribe();//改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了Thread.sleep(600000);//关闭资源subscriper.closeResource();}
}

Subscriper2

package cn.ljh.app;import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import java.time.Duration;//使用 Lettuce ,这个类是消息订阅者2
//通过连接池操作redis数据库
public class Subscriper2
{private RedisClient redisClient;//连接池pool对象private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;public void init(){//1、定义RedisURIRedisURI uri = RedisURI.builder().withHost("127.0.0.1").withPort(6379)//选择redis 16个数据库中的哪个数据库.withDatabase(0).withPassword(new char[]{'1', '2', '3', '4', '5', '6'}).withTimeout(Duration.ofMinutes(5)).build();//2、创建 RedisClient 客户端this.redisClient = RedisClient.create(uri);//创建连接池的配置对象//GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();//设置连接池允许的最大连接数conf.setMaxTotal(20);//3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);}//关闭资源public void closeResource(){//关闭连接池--先开后关this.pool.close();//关闭RedisClient 客户端------最先开的最后关this.redisClient.shutdown();}//订阅消息的方法public void subscribe() throws Exception{//从连接池中取出连接StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();//4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法RedisPubSubCommands cmd = conn.sync();//监听消息:消息到来时,是通过监听器来实现的conn.addListener(new RedisPubSubAdapter<>(){//接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看),@Overridepublic void message(String channel, String message){System.err.printf("从 %s 收到消息 : %s\n " , channel , message);}//订阅普通channel激发的方法,@Overridepublic void subscribed(String channel, long count){System.err.println("完成订阅 :" + count);}//不订阅普通的channel所使用方法@Overridepublic void unsubscribed(String channel, long count){System.err.println("取消订阅");}});//订阅消息------订阅了 c2 这个主题 channelcmd.subscribe( "c2");}public static void main(String[] args) throws Exception{Subscriper2 subscriper2 = new Subscriper2();subscriper2.init();subscriper2.subscribe();//改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了Thread.sleep(600000);//关闭资源subscriper2.closeResource();}}

Publisher

package cn.ljh.app;import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import java.time.Duration;//消息发布者//通过连接池操作redis数据库
public class Publisher
{private RedisClient redisClient;//连接池pool对象private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;public void init(){//1、定义RedisURIRedisURI uri = RedisURI.builder().withHost("127.0.0.1").withPort(6379)//选择redis 16个数据库中的哪个数据库.withDatabase(0).withPassword(new char[]{'1', '2', '3', '4', '5', '6'}).withTimeout(Duration.ofMinutes(5)).build();//2、创建 RedisClient 客户端this.redisClient = RedisClient.create(uri);//创建连接池的配置对象//GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();//设置连接池允许的最大连接数conf.setMaxTotal(20);//3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);}//关闭资源public void closeResource(){//关闭连接池--先开后关this.pool.close();//关闭RedisClient 客户端------最先开的最后关this.redisClient.shutdown();}//订阅消息的方法public void publish() throws Exception{//从连接池中取出连接StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();//4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法RedisPubSubCommands cmd = conn.sync();//向这两个channel主题各自发布了一条消息cmd.publish("c2","c2 c2 c2 这是一条来自 c2 这个channel 里面的消息");cmd.publish("c1","c1 c1 c1 这是一条来自 c1 这个channel 里面的消息");//关闭资源redisClient.shutdown();}//发送消息,消息发出去,程序就退出了public static void main(String[] args) throws Exception{Publisher subscriper2 = new Publisher();subscriper2.init();subscriper2.publish();subscriper2.closeResource();}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>Lettucepool</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- 引入 Lettuce 这个操作redis的框架的依赖 --><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>6.1.4.RELEASE</version></dependency><!-- 创建连接池对象的依赖 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.9.0</version></dependency></dependencies>
</project>
http://www.lryc.cn/news/181705.html

相关文章:

  • doris动态分区开启历史分区
  • Linux用户与权限(认知root用户、修改权限控制 - chmod、修改权限控制 - chown)
  • 处理conda安装工具的动态库问题——解决记录 libssl.1.0.0 系统中所有openssl位置全览 whereis openssl
  • 如何在Go中格式化字符串
  • C程序设计内容与例题讲解 -- 第四章--选择结构程序设计第二部分(第五版)谭浩强
  • 接雨水问题
  • 小谈设计模式(9)—工厂方法模式
  • Android etc1tool之png图片转换pkm 和 zipalign简介
  • Spring Boot快速入门:构建简单的Web应用
  • JAVA 泛型、序列化和复制
  • 以太网基础学习(二)——ARP协议
  • 【Java-LangChain:使用 ChatGPT API 搭建系统-4】评估输入-分类
  • 嵌入式Linux应用开发-驱动大全-第一章同步与互斥③
  • 树的存储结构以及树,二叉树,森林之间的转换
  • 【AI视野·今日NLP 自然语言处理论文速览 第四十二期】Wed, 27 Sep 2023
  • 华为云云耀云服务器L实例评测|部署个人在线电子书库 calibre
  • 代码随想录刷题 Day28
  • 【生命周期】
  • 【C语言 模拟实现memcpy函数、memcpy函数】
  • opencv视频文件的读取,处理与保存
  • java - 七大比较排序 - 详解
  • 项目集成七牛云存储sdk
  • docker-compose一键启动neo4j
  • 深入剖析@ConfigurationProperties注解
  • 北京开发APP需要多少钱
  • self-attention、transformer、bert理解
  • junit @ExcludePackages排除多个包
  • Explain执行计划字段解释说明---select_type、table、patitions字段说明
  • 云原生微服务 第六章 Spring Cloud Netflix Eureka集成远程调用、负载均衡组件OpenFeign
  • 四、2023.9.30.C++面向对象end.4