当前位置: 首页 > news >正文

Redis使用Pipeline(管道)批量处理

Redis 批量处理

在开发中,有时需要对Redis 进行大批量的处理。

比如Redis批量查询多个Hash。如果是在for循环中逐个查询,那性能会很差。

这时,可以使用 Pipeline (管道)。

Pipeline (管道)

Pipeline (管道) 可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。

RedisTemplate的管道操作

RedisTemplate的管道操作,使用**executePipelined()**方法。
org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.SessionCallback<?>)

	@Overridepublic List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(session, "Callback object must not be null");RedisConnectionFactory factory = getRequiredConnectionFactory();// 绑定连接RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);try {return execute((RedisCallback<List<Object>>) connection -> {//打开管道connection.openPipeline();boolean pipelinedClosed = false;try {Object result = executeSession(session);//callback的返回值,只能返回null,否则会报错。if (result != null) {throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the pipeline");}//关闭管道,并获取返回值List<Object> closePipeline = connection.closePipeline();pipelinedClosed = true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally {if (!pipelinedClosed) {connection.closePipeline();}}});} finally {RedisConnectionUtils.unbindConnection(factory);}}
  • 注意:
    (1) executePipelined()的callback参数,实现execute() 方法只能返回null,否则会报错。
    报错: InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
    源码如下:
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}

(2)在executePipelined()中, 使用 get()方法 得到的value会是null。
在 redisTemplate进行管道操作,结果值只能通过 executePipelined()方法的返回值List获取.

	/*** Get the value of {@code key}.** @param key must not be {@literal null}.* 在管道(pipeline)中使用 get()方法 得到的value会是null* @return {@literal null} when used in pipeline / transaction.*/@NullableV get(Object key);
  • redisTemplate获取管道返回值:
List<Object> list = stringRedisTemplate.executePipelined(new SessionCallback<String>() {@Overridepublic  String execute(@NonNull RedisOperations operations) throws DataAccessException {//idList是多个id的集合for (String id : idList) {//key由前缀加唯一id组成String key = KEY_PREFIX + id;//在管道中使用 get()方法 得到的value会是null。value通过executePipelined()的返回值List<Object>获取。operations.opsForHash().get(key, field);}//Callback只能返回null, 否则报错:// InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipelinereturn null;}});list.forEach(System.out::println);

Jedis操作管道

RedisTemplate操作管道比较方便,但如果要组装key和value的map,就会比较麻烦。
在这种情况下,可以使用Jedis。
比如,Jedis批量查询多个Hash,可以使用 Pipeline (管道)。
源码见: redis.clients.jedis.PipelineBase#hget(java.lang.String, java.lang.String)
hget()方法,跟普通hash的hget()有点类似,不过返回值是 Response。

    public Response<String> hget(String key, String field) {this.getClient(key).hget(key, field);return this.getResponse(BuilderFactory.STRING);}

示例:

public void testPipLine() {Map<String, Response<String>> responseMap = new HashMap<>();//try-with-resources, 自动关闭资源//先连接jedis,再拿到 pipelinetry (Jedis jedis = getJedis();Pipeline pipeline = jedis.pipelined()) {for (String id : idList) {//前缀加唯一idString key = KEY_PREFIX +  id;//使用pipeline.hget查询hash的数据Response<String> response = pipeline.hget(key, field);responseMap.put(id, response);}pipeline.sync();} catch (Exception ex) {log.error("responses error.", ex);}Map<String, String> map = new HashMap<>();//组装map。response.get()在pipeline关闭后才能执行,否则拿到的value都是nullresponseMap.forEach((k,response) -> map.put(k, response.get()));map.forEach((k,v)-> System.out.println(k+",val:"+v));}private static Pool<Jedis> jedisPool = null;/*** 连接redis,获取jedisPool* @return*/public Jedis getJedis() {if (jedisPool == null) {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(maxTotal);poolConfig.setMaxIdle(maxIdle);poolConfig.setMaxWaitMillis(maxWaitMillis);poolConfig.setTestOnBorrow(testOnBorrow);//配置可以写在配置中心/文件jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database);}return jedisPool.getResource();}

参考资料

https://redis.io/docs/manual/pipelining/
https://www.cnblogs.com/expiator/p/11127719.html

http://www.lryc.cn/news/230998.html

相关文章:

  • Linux中at命令添加一次性任务
  • 交换机基础知识之安全配置
  • Netty入门指南之Reactor模型
  • Ubuntu20.04软件安装顺序
  • 适配器模式 ( Adapter Pattern )(6)
  • JAVA G1垃圾收集器介绍
  • 十方影视后期“领进门”,成长与成就还得靠自身
  • Golang之火爆原因
  • WPF中Dispatcher对象的用途是什么
  • 图论17-有向图的强联通分量-Kosaraju算法
  • ubuntu中使用 vscode 连接docker开发环境
  • 【广州华锐视点】海外制片人VR虚拟情景教学带来全新的学习体验
  • 龙芯loongarch64麒麟服务器配置yum源
  • Centos7 单用户模式修改密码 3步搞定 666 (百分比成功)
  • 深度学习 机器视觉 车位识别车道线检测 - python opencv 计算机竞赛
  • Java主流分布式解决方案多场景设计与实战
  • docker安装MongoDB数据库,并且进行密码配置
  • ssh脚本找不到命令或者执行无效的解决办法
  • 2023年11月18日(星期六)骑行海囗林场公园
  • xss 漏洞
  • 一文图解爬虫_姊妹篇(spider)
  • 【vue实战项目】通用管理系统:api封装、404页
  • R语言编写代码示例
  • [RK3568][Android12.0]--- 系统自带预置第三方APK方法
  • 数据分析场景下,企业如何做好大模型选型和落地?
  • 使用VScode编译betaflight固件--基于windows平台
  • OkHttp网络请求读写超时
  • @postmapping 定义formdata传参方式
  • Windows客户端开发框架WPF简介
  • 2023NOIP A层联测32 sakuya