当前位置: 首页 > news >正文

Flink实时电商数仓(八)

用户域登录各窗口汇总表

  • 主要任务:从kafka页面日志主题读取数据,统计
    • 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户
    • 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。

思路分析

  1. 读取kafka页面主题数据
  2. 转换数据结构:String -> JSONObject
  3. 过滤数据,uid不为null
    • 登录的两种情况
      • 用户打开应用后自动登录
      • 用户打印应用后没有登录,浏览后跳转到登录页面
    • 过滤条件:
      • uid不为null且last_page_id is null
      • last_page_id = login
  4. 设置水位线
  5. 按照uid分组
  6. 统计回流用户数和独立用户数
  7. 开窗聚合
  8. 写入doris

具体实现

  1. 设置端口、并行度、消费者组、kafka主题
  2. 读取dwd页面主题数据
    - stream.print()
  3. 对数据进行清洗过滤:uid不为空
    • stream.flatMap()使用flatMap过滤
    • new FlatMapFunction<>(){}在该方法内部转换为JSONObject, 并且获取uid和lastPageId, try-catch这段代码
    • 判断是否满足思路分析中的条件,如果中途发生异常,直接catch后打印到控制台清理掉即可。
  4. 先注册水位线
    • jsonObjStream.assignTimestampAndWatermark
    • new SerializableTimestampAssigner<>, 提取数据中的ts
  5. 按照uid分组
    • stream.keyby()按照uid进行分组
  6. 判断独立用户和回流用户
    • 创建UserLoginBean, 使用状态保存用户的登录信息
    • 在open方法中,getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))创建状态记录用户上一次的登录时间
    • processElement方法中比较当前登录的日期和状态存储的日期
      • 如果lastLoginDt==null是新用户
      • 如果不为空,判断上次登录时间和当前时间的差值是否大于7天;如果大于7天,说明是回流用户。
      • 如果小于7天,还需要判断上次登录时间是否是今天,如果不是今天,则说明该用户本次是独立用户。
  7. 开窗聚合
    • 使用滚动窗口开窗聚合
    • reduce算子中写聚合逻辑
    • process算子中获取窗口信息
  8. 写入doris
    • 创建doris sink,写出到doris

核心代码

public static void main(String[] args) {new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//1.读取dwd页面数据//stream.print();//2. 对数据进行清洗过滤SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);//3. 注册水位线SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);//4. 按照uid分组KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);//5. 判断独立用户和回流用户SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);//processedStream.print();//开窗聚合SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);//reducedStream.print();//写入DorisreducedStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));}

[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)

http://www.lryc.cn/news/271116.html

相关文章:

  • Python Pymysql实现数据存储
  • 软件测试/测试开发丨Python 常用第三方库 pymysql
  • 第二节 linux操作系统安装与配置
  • ChatGPT 对SEO的影响
  • 光伏逆变器MPPT的作用、原理及算法
  • 一文详解pyspark常用算子与API
  • 使用Rollup 搭建开发环境
  • ubuntu:beyond compare 4 This license key has been revoked 解决办法
  • 华为交换机生成树STP配置案例
  • Avalonia框架下实现热更新
  • 适用于各种危险区域的火焰识别摄像机,实时监测、火灾预防、安全监控,为安全保驾护航
  • react-router-dom5升级到6
  • Linux调试工具—gdb
  • SpringCloud(H版alibaba)框架开发教程之nacos做配置中心——附源码(2)
  • 网络摄像头爆破实战
  • 亚信安慧AntDB数据并行加载工具的实现(二)
  • 【Java进阶篇】JDK新版本中的新特性都有哪些
  • 力扣labuladong一刷day49天迪杰斯特拉
  • MCS接口技术----定时/计数,中断
  • Java开发框架和中间件面试题(10)
  • C++ 具名要求-基本概念-指定该类型对象可以从右值构造
  • Python如何把类当做字典来访问及浅谈Python类命名空间
  • 简述Redis备份策略以及对应的实现机制
  • 【5G PHY】5G 物理层加速卡介绍
  • lftp学习笔记
  • idea 插件开发之 HelloWorld
  • 极速文件搜索工具Everything结合内网穿透实现远程搜索本地文件
  • 【PowerMockito:编写单元测试过程中采用when打桩失效的问题】
  • [蓝桥杯 2018省赛]回家路费
  • 学生管理系统(vue + springboot)