当前位置: 首页 > news >正文

Flink实时电商数仓之DWS层

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId>
</dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {public static void main(String[] args) throws IOException {String s = "Apple 苹果15 5G手机";StringReader stringReader = new StringReader(s);IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分Lexeme next = ikSegmenter.next();while (next!= null) {System.out.println(next.getLexemeText());next = ikSegmenter.next();}}
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** title:** @Author 浪拍岸* @Create 28/12/2023 上午11:06* @Version 1.0*/
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {public static void main(String[] args) {new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublic void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//1. 读取主流dwd页面主题数据tableEnv.executeSql("create table page_info(\n" +"    `common` map<string,string>,\n" +"    `page` map<string,string>,\n" +"    `ts` bigint,\n" +"    `row_time` as to_timestamp_ltz(ts,3),\n" +"     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql("select * from page_info").print();//2. 筛选出关键字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n" +"    page['item'] keywords,\n" +"    `row_time`,\n" +"    ts\n" +" from page_info\n" +" where page['last_page_id'] = 'search'\n" +" and page['item_type'] = 'keyword'\n" +" and page['item'] is not null");tableEnv.createTemporaryView("keywords_table", keywrodTable);// 测试是否获取到数据//tableEnv.executeSql("select * from keywords_table").print();//3. 自定义分词函数并注册tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );//4. 调用分词函数对keywords进行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +" from keywords_table" +" left join lateral Table(kwSplit(keywords)) on true");tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 对keyword进行分组开窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n" +"    keyword,\n" +"    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +"    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +"    cast(current_date as string)  cur_date,\n" +"    count(*) keyword_count\n" +"from split_kw_table\n" +"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 写出到doristableEnv.executeSql("create table doris_sink\n" +"(\n" +"    keyword                STRING,\n" +"    wStart                 STRING,\n" +"    wEnd                   STRING,\n" +"    cur_date               STRING,\n" +"    keyword_count          BIGINT\n" +")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));windowAggTable.insertInto("doris_sink").execute();}
}
http://www.lryc.cn/news/271427.html

相关文章:

  • MFC - CArchive/内存之间的序列化应用细节
  • C语言实验4:指针
  • 项目——————————
  • 【论文阅读】Realtime multi-person 2d pose estimation using part affinity fields
  • 图像分割实战-系列教程9:U2NET显著性检测实战1
  • RK3568平台 Android13 GKI架构开发方式
  • 阿里云服务器节省计划价格便宜_成本优化全解析
  • 3种依赖管理工具实现requirements.txt文件生成
  • 超图iClient3DforCesium地形、影像、模型、在线影像交互示例
  • 【解决】电脑上的WIFI图标不见了咋整?
  • 2 - 表结构 | MySQL键值
  • Redis(Linux版本7.2.3)
  • 八股文打卡day18——操作系统(1)
  • 设计模式—行为型模式之模板方法模式
  • 机器学习的分类与经典算法
  • 2.3物理层下面的传输媒体
  • 笙默考试管理系统-MyExamTest----codemirror(57)
  • Qt高质量的开源项目合集
  • HarmonyOS ARKUI深度解析:图像组件及权限配置实战指南
  • 萨姆·奥尔特曼的预言
  • iPhone 13 Pro 更换『移植电芯』和『超容电池』体验
  • JavaScript 常用事件演示
  • gzip引入后node_modules中.cache compression-webpack-plugin占用内存过多
  • Vue:使用IDEA开发Vue的相关配置
  • 黑马程序员SSM框架-SpringBoot
  • Javascript细节、经验锦集
  • git的使用基础教程
  • Springboot整合Elasticsearch 7.X 复杂查询
  • 第5课 使用openCV捕获摄像头并实现预览功能
  • Python3操作Json文件碰到的几个问题