当前位置: 首页 > news >正文

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。

# 配置spark接口
import os
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
# 设置配置信息
conf = SparkConf()
conf.set("spark.driver.memory", "16g")
conf.set("spark.executor.memory", "16g")
conf.set("spark.driver.maxResultSize","3g")
conf.set("spark.executor.maxResultSize", "3g")
conf.set("spark.ui.showConsoleProgress","false") # 取消进度条显示
spark = SparkSession.builder.appName("local_redis_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 提升日志级别
import redis
# 初始化一个全局函数来获取Redis连接池
def get_redis_connection_pool():# 配置redis参数host='127.0.0.1' # 替换为redis的服务地址即可port=6379password='123456' # 密码db=1 # db库如果不设置 默认为0max_connections=10  # 设置最大连接数redis_pool = redis.ConnectionPool(host=host, port=port, db=db, password=password, max_connections=max_connections)  return redis_pool# 清空旧数据
with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.flushdb() # 清空当前库的所有数据 而flushall()则情况所有库数据
%%time
# 并行处理函数serv_id
def servid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.serv_id] = str((rw.r_inst_id, rw.avg_value))# 批量写入write_to_redis(dat)# 并行处理函数one_id
def oneid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.r_inst_id] = str((rw.offer_list,rw.filter_prod_offer_inst_list,rw.fuka_serv_offer_list,rw.filter_list,rw.new_serv_id))# 批量写入write_to_redis(dat)# 加载缓存数据
oneid_sdf = spark.sql("""select * from database.table1""")servid_sdf = spark.sql("""select * from database.table2""")# 设置分区数 如果批量写入的内存大小以及最大链接数有限制
# servid_num_parts = 50000
# oneid_num_parts = 10000 # 使用repartition方法进行重新分区
# servid_sdf_part = servid_sdf.repartition(servid_num_parts)
# oneid_sdf_part = oneid_sdf.repartition(oneid_num_parts)# 分批写入redis
servid_sdf.foreachPartition(servid_pfun)
print(f"servid字典缓存成功")
oneid_sdf.foreachPartition(oneid_pfun)
print(f"oneid字典缓存成功")
# 关闭spark
spark.stop() 
print(f"redis缓存插入成功")

执行时间可能跟资源环境有关,测试整个过程大概只需要5分钟左右,非常快速。

http://www.lryc.cn/news/304988.html

相关文章:

  • C# Socket的使用
  • Spring Cloud + Vue前后端分离-第17章 生产打包与发布
  • 力扣热题100_普通数组_56_合并区间
  • Springcloud OpenFeign 的实现(二)
  • [C++]智能指针用法
  • 六、行列式基本知识
  • 中断系统(详解与使用)
  • uniapp开发微信小程序跳转到另一个小程序中
  • chatGPT 使用随想
  • unity Aaimation Rigging使用多个约束导致部分约束失去作用
  • 什么是ChatGPT
  • 当我们浪费时我们在浪费什么
  • 一文搞懂TCP三次握手与四次挥手
  • FairyGUI × Cocos Creator 3.7.3 引入报错解决
  • 网络原理 - HTTP/HTTPS(5)
  • 设计模式——抽象工厂模式
  • 详解编译和链接!
  • 力扣226 翻转二叉树 Java版本
  • 免费的数据恢复软件哪个好?这10个数据恢复软件可以试试
  • 力扣2476二叉搜索树最近节点查询
  • 板块一 Servlet编程:第六节 HttpSession对象全解 来自【汤米尼克的JAVAEE全套教程专栏】
  • 后端设计PNR一点总结
  • BI 数据分析,数据库,Office,可视化,数据仓库
  • 汽车信息安全--S32K3的HSE如何与App Core通信(1)?
  • arcgisPro制图输出
  • 产品化Chatgpt所面临的五大技术挑战
  • 8.qt5使用opencv的库函数打开图片
  • 学习 python的第四天,顺便分享两首歌:we don‘ talk anymore,You ‘re Still The One
  • uniapp:APP端webview拦截H5页面跳转,华为市场发布需要限制webview的H5页面跳转
  • [HTML]Web前端开发技术28(HTML5、CSS3、JavaScript )JavaScript基础——喵喵画网页