当前位置: 首页 > news >正文

04_Americanas精益管理项目_数仓搭建

文章目录

  • 04_数仓搭建
    • 1、数仓设计【掌握】
    • 2、数据准备【实现】
      • 2.1 理解数据
      • 2.2 导入MySQL
        • (一)数据导入
        • (二)数据类型转换
    • 3、维表开发【掌握】
      • 3.1 数据同步
        • 3.1.1 数据探查
        • 3.1.2 建表设计
        • 3.1.3 数据同步
      • 3.2 渐变维处理
        • (1)什么是渐变维?
        • (2)拉链表怎么处理?
        • (3)传统拉链表有哪些痛点?
        • (4)StarRocks怎么创建拉链表?
        • (5)创建所有拉链表
      • 3.3 维表关联
    • 4、销售主题开发【掌握】
      • 4.1 需求分析
      • 4.2 需求开发思路
      • 4.3 数据探查
      • 4.4 数据建模
      • 4.5 ods开发
        • 4.5.1 建表设计
        • 4.5.2 数据同步
      • 4.6 DWD开发
        • 4.6.1 需求分析
        • 4.6.2 计算
        • 4.6.3 支付明细表
      • 4.7 DWS开发
      • 4.8 ADS开发
    • 5、会员主题开发【掌握】
      • 5.1 需求分析
      • 5.2 数据探查
      • 5.3 数据建模
      • 5.4 ODS开发
        • 5.4.1 dim_register_t
        • 5.4.2 ods_member_logins_t
      • 5.5 DWD开发
        • 5.5.1 dim_register_mv 会员注册物化视图
        • 5.5.2 dwd_member_order_mv 会员订单物化视图
        • 5.5.3 dwd_member_first_buy_t 会员首单表
        • 5.5.4 dwd_member_logins_t 会员登录表
      • 5.6 DWS开发
        • dws_member_city_analysis_day_t 每天城市会员分析表
      • 5.7 ADS开发
        • ads_member_state_analysis_day_t 每天州会员分析表
    • 6、Kettle及调度【掌握】
      • 6.1 Ktttle简介
      • 6.2 Ktttle安装
      • 6.3 调度
        • 6.3.1 设置参数
        • 6.3.2 设置调度作业
        • 6.3.3 设置SeaTunnel任务
        • 6.3.4 设置SQL任务
        • 6.3.5 设置定时

04_数仓搭建

1、数仓设计【掌握】

(1)主题
在这里插入图片描述

(2)数仓分层
在这里插入图片描述

DWD、DWS、DIM三层统称CDM层,即数据公共层。

(3)建模

采用雪花模型构建

2、数据准备【实现】

2.1 理解数据

在这里插入图片描述

order_status:订单状态

①delivered:已交付 ②invoiced:已开票 ③shipped:已发货 ④processing:处理中⑤unavailable:不可用/缺货 ⑥canceled:已取消 ⑦created:已创建 ⑧approved:已批准

payment_type:付款方式

①credit_card:信用卡 ②boleto:巴西的一种支付方式,类似于账单支付 ③voucher:代金券、凭证 ④debit_card:借记卡 ⑤not_defined:未定义

表关系

在这里插入图片描述

2.2 导入MySQL

在实际工作中,这些业务数据应该是存储在后端数据库中,比如MySQL、Oracle等,而不是存储在CSV文件中。这里将数据存储在CSV文件中的原因如下。

在工作中除了要处理关系型数据库中的数据外,有时也会处理Excel或CSV文件数据,处理方式有两种,一种是用Python进行处理,另一种则是导入到MySQL或者其他数据库中进行后续处理。在这里,我们模拟一下第二种处理方式的操作方式,从而让学生能掌握这种方法。

正常生产中,数据库可能不止一个,数据库的个数是跟随着你的业务系统的。

MySQL、PostgreSQL 《- 销售、会员【线上商城系统、CRM系统、线下门店系统】

Sql Server 《- 订单、仓库、物流 【ERP系统】

Oracle 《- 财务 【财务系统】

(一)数据导入

在Navicat中使用MySQL新建一个数据库,导入数据集。

新建数据库。

CREATE DATABASE ame CHARACTER SET utf8mb4;

可以使用Navicat来操作数据导入:在数据库下的表
在这里插入图片描述

选择CSV文件

在这里插入图片描述

然后添加文件,一直点击下一步,最后点击开始即可。

也可以使用DataGrip操作。
在这里插入图片描述

选择文件后,点击下一步,基本保持默认即可。需要注意的是,需要将字段的text类型改成varchar(255),否则无法给该字段添加索引。
在这里插入图片描述

(二)数据类型转换

(1)重命名表

# 原数据表名过长,为方便后续处理对表重新命名
RENAME TABLE ame_customers_dataset to customer;
RENAME TABLE ame_geolocation_dataset to geo;
RENAME TABLE ame_orders_dataset to orders;
RENAME TABLE ame_order_items_dataset to item;
RENAME TABLE ame_order_payments_dataset to payment;
RENAME TABLE ame_order_reviews_dataset to review;
RENAME TABLE ame_products_dataset to product ;
RENAME TABLE ame_sellers_dataset to seller;
RENAME TABLE product_category_name_translation to category;

(2)添加表注释

# 增加注释
ALTER TABLE customer COMMENT '客户信息表';
ALTER TABLE geo COMMENT '邮政编码信息表';
ALTER TABLE orders COMMENT '订单表';
ALTER TABLE item COMMENT '订单详情数据表';
ALTER TABLE payment COMMENT '订单付款数据表';
ALTER TABLE review COMMENT '客户评论数据表';
ALTER TABLE product COMMENT '商品信息表';
ALTER TABLE seller COMMENT '商家信息表';
ALTER TABLE category COMMENT '商品类别名翻译表';

(3)数据类型转换

为什么要做数据类型转换:主要为了方便后续的使用,比如日期类型、datetime类型、decimal类型等,转换之后,就可以更好的使用一些函数。

如果确定如何转换:有数据字典就按照数据字典转换,如果没有,就根据字段的含义及字段的具体数据来确定。

# 修改orders列类型
#更改列的数据类型(时间戳)
ALTER TABLE orders MODIFY order_purchase_timestamp DATETIME;
ALTER TABLE orders MODIFY order_approved_at DATETIME;
ALTER TABLE orders MODIFY order_delivered_carrier_date DATETIME;
ALTER TABLE orders MODIFY order_delivered_customer_date DATETIME;
ALTER TABLE orders MODIFY order_estimated_delivery_date DATETIME;#修改customer列类型
ALTER TABLE customer MODIFY customer_zip_code_prefix INT; 
ALTER TABLE customer MODIFY order_purchase_timestamp DATETIME;#更改列的数据类型(日期和时间)#修改item列类型
ALTER TABLE item MODIFY order_item_id INT;#更改列的数据类型(整数)
ALTER TABLE item MODIFY price DECIMAL(6,2);#更改列的数据类型(价格)
ALTER TABLE item MODIFY freight_value DECIMAL(6,2);#更改列的数据类型(价格)
ALTER TABLE item MODIFY shipping_limit_date DATETIME;#更改列的数据类型(日期和时间) 
ALTER TABLE item MODIFY order_purchase_timestamp DATETIME;#更改列的数据类型(日期和时间)#修改payment列类型
ALTER TABLE payment MODIFY payment_sequential INT;#更改列的数据类型(整数)
ALTER TABLE payment MODIFY payment_installments INT;
ALTER TABLE payment MODIFY payment_value DECIMAL(10,2);#更改列的数据类型(存储价格金额) 
ALTER TABLE payment MODIFY order_purchase_timestamp DATETIME;#更改列的数据类型(日期和时间)#修改review列类型
ALTER TABLE review MODIFY review_score INT;#更改列的数据类型(整数)
ALTER TABLE review MODIFY review_creation_date DATE;#更改列的数据类型(日期)
ALTER TABLE review MODIFY review_answer_timestamp DATETIME;#更改列的数据类型(时间戳)#修改product列类型
#更改列的数据类型(整数)
ALTER TABLE product MODIFY product_name_lenght INT;
ALTER TABLE product MODIFY product_description_lenght INT;
ALTER TABLE product MODIFY product_photos_qty INT;
ALTER TABLE product MODIFY product_weight_g INT;
ALTER TABLE product MODIFY product_length_cm INT;
ALTER TABLE product MODIFY product_height_cm INT;
ALTER TABLE product MODIFY product_width_cm INT; 
ALTER TABLE product MODIFY create_time DATETIME; 
ALTER TABLE product MODIFY update_time DATETIME;#修改category列类型
ALTER TABLE category MODIFY create_time DATETIME; 
ALTER TABLE category MODIFY update_time DATETIME;#修改seller列类型
ALTER TABLE seller MODIFY seller_zip_code_prefix INT;#更改列的数据类型(整数) 
ALTER TABLE seller MODIFY create_time DATETIME; 
ALTER TABLE seller MODIFY update_time DATETIME;#修改geo列类型
ALTER TABLE geo MODIFY geolocation_zip_code_prefix INT;
ALTER TABLE geo MODIFY geolocation_lat DECIMAL(10,6);
ALTER TABLE geo MODIFY geolocation_lng DECIMAL(10,6);

(4)空值处理

使用sql判断空值数量的方法:

SELECTSUM(CASE WHEN `review_id` IS NULL THEN 1 ELSE 0 END) AS `review_id_null_count`,SUM(CASE WHEN `order_id` IS NULL THEN 1 ELSE 0 END) AS `order_id_null_count`,SUM(CASE WHEN `review_score` IS NULL THEN 1 ELSE 0 END) AS `review_score_null_count`,SUM(CASE WHEN `review_comment_title` IS NULL THEN 1 ELSE 0 END) AS `review_comment_title_null_count`,SUM(CASE WHEN `review_comment_message` IS NULL THEN 1 ELSE 0 END) AS `review_comment_message_null_count`,SUM(CASE WHEN `review_creation_date` IS NULL THEN 1 ELSE 0 END) AS `review_creation_date_null_count`,SUM(CASE WHEN `review_answer_timestamp` IS NULL THEN 1 ELSE 0 END) AS `review_answer_timestamp_null_count`
FROM `review`;

处理空值:

UPDATE review
SET review_comment_title=0
where review_comment_title is null;UPDATE review
SET review_comment_message=0
WHERE review_comment_message is NULL;

处理空值的方法:

1)填充默认值

2)填充平均数

3)填充众数

4)给字段删除掉

5)用相邻的数据进行填充【时序数据】

注意:对重要字段的空值进行处理。如果可以提前识别出来哪些字段可以处理,就可以直接处理。但是更多的情况是用到的时候才进行分析和处理,并且不同的字段的处理方式不同,如填充不同的默认值或者填充均值或者删除等

(5)检查重复值

检查重复值的重要性:1)为了确定数据的质量,因为重复的数据是不可用的 2)确定表的主键

检查重复值的方法:如果有数据字典,就需要按照数据字典中的信息进行核验

​ 如果没有数据字典,只能根据表的含义和字段的含义去探索,最后确定出来哪个是主键

# orders表
select order_id from orders
group by order_id
having count(*)>1;# item表
select order_id,order_item_id from item
group by order_id,order_item_id
having count(*)>1;# product表
select product_id from product
group by product_id
having count(*)>1;# category表
select product_category_name from category
group by product_category_name
having count(*)>1;# seller表
select seller_id from seller
group by seller_id
having count(*)>1;# review表
# review_id不唯一
select review_id
from review
group by review_id
having count(*)>1;
select * from review where review_id = '28642ce6250b94cc72bc85960aec6c62';# order_id不唯一
select order_id
from review
group by order_id
having count(*)>1;
select * from review where order_id = '0035246a40f520710769010f752e7507';# 使用review_id,order_id
select review_id,order_id
from review
group by review_id,order_id
having count(*)>1;# customer表
select customer_id from customer
group by customer_id
having count(*)>1;# payment表
# order_id不唯一
select order_id from payment
group by order_id
having count(*)>1;
select * from payment where order_id = '5cfd514482e22bc992e7693f0e3e8df7';# 使用order_id, payment_sequential
select order_id, payment_sequential from payment
group by order_id, payment_sequential
having count(*)>1;# geo表
# geolocation_zip_code_prefix有重复的,不能直接使用
select geolocation_zip_code_prefix from geo
group by geolocation_zip_code_prefix
having count(*)>1;

重要拓展:

如果有重复数据如何处理

在mysql中,在orders表中有相同的order_id的记录有重复的,可能有2条或者3条。如何只保留update_time最大的那条记录。
如果数据量少,可以按下面的语句运行。
DELETE FROM orders
WHERE (order_id, update_time) NOT IN (SELECT order_id, MAX(update_time) as update_timeFROM ordersGROUP BY order_id
);如果数据量大,上面的语句执行效率很低,可能无法运行。可以采用更高效的方法,如下:
1)先创建一张临时表,保存重复记录需要留下来的数据
CREATE TEMPORARY TABLE temp_max_orders AS
SELECT order_id, max(update_time) AS update_time
FROM orders
GROUP BY order_id
having count(1) > 1;
2)给临时表添加索引
CREATE INDEX idx_order_id ON temp_max_orders(order_id);
3)删除原表中重复的记录
DELETE o1
FROM orders o1
JOIN temp_max_orders o2 ON o1.order_id = o2.order_id AND o1.update_time < o2.update_time;如果是mysql8,则可以使用开窗函数。
1)创建结果表
CREATE TABLE orders_temp AS
SELECT order_id, create_time, xxx
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY update_time DESC) AS rnFROM orders
) AS subquery
WHERE rn = 1;
2)删除原表
DROP TABLE orders;
3)结果表重命名
RENAME TABLE orders_temp TO orders;

这里,需要对geo表进行处理

由于geo表没有主键,也没有时间,所以在删除重复值比较麻烦。为了方便处理,这里添加一个主键。

ALTER TABLE geo
ADD COLUMN id INT AUTO_INCREMENT FIRST,
ADD PRIMARY KEY (id);

然后,相同geolocation_zip_code_prefix的数据保留id最大的那条。

DELETE FROM geo
WHERE id NOT IN ((SELECT * FROM (SELECT MAX(id) as id FROM geo GROUP BY geolocation_zip_code_prefix) t)
);

(6)添加索引

添加索引的目的:加快查询效率

给哪些字段添加:一般来说就是凭经验,先把主键索引加上,然后其他常用的加上普通索引。剩下的字段,在用到的时候再加也可以。

# 将order_id列设置为orders中的主键
ALTER TABLE orders ADD PRIMARY KEY (order_id);
-- 在 orders 表的 customer_id 字段上创建索引
CREATE INDEX idx_customer_id ON orders(customer_id);# 将order_id, order_item_id列设置为item中的主键
ALTER TABLE item ADD PRIMARY KEY (order_id, order_item_id);
-- 在 item 表的 product_id字段上创建索引
CREATE INDEX idx_product_id ON item(product_id);
-- 在 item 表的 seller_id字段上创建索引
CREATE INDEX idx_seller_id ON item(seller_id);# 将product_id列设置为product中的主键
ALTER TABLE product ADD PRIMARY KEY (product_id);
-- 在 product表的 product_category_name 字段上创建索引
CREATE INDEX idx_product_category ON product(product_category_name);# 将product_category_name列设置为category中的主键
ALTER TABLE category ADD PRIMARY KEY (product_category_name);# 将seller_id列设置为seller中的主键
ALTER TABLE seller ADD PRIMARY KEY (seller_id);# 将review_id,order_id列设置为review中的主键
ALTER TABLE review ADD PRIMARY KEY (review_id, order_id);
-- 在 review表的 order_id 字段上创建索引
CREATE INDEX idx_order_id ON review(order_id);# 将customer_id列设置为MySQL中的主键
ALTER TABLE customer ADD PRIMARY KEY (customer_id);# 将order_id, payment_sequential列设置为 payment 中的主键
ALTER TABLE payment ADD PRIMARY KEY (order_id, payment_sequential);

3、维表开发【掌握】

数仓开发流程:先大概分析需求,然后看用到哪些表以及涉及到哪些维表,然后把需要用到的维表先处理好。所以在构建数仓时,先加工维表。

需要注意的是,这步操作并不是一次性的,后续在数仓开发时,遇到没有的维表则需要进行新的维表开发;如果已经开发好的维表有需要添加的字段或修改的逻辑,也要进行修改。所以维表开发也是一个迭代的过程!

3.1 数据同步

3.1.1 数据探查

就看一下用哪些表,这些表是什么含义,有哪些字段,需要进行什么处理!

(1)product表

selectcount(1) as num,count(distinct product_id) product_num
from product;

另外,因为item表要和product表进行关联,所以需要查看item中的商品是否都在product表中。如果有不在product表中,还需要进行处理。

select i.*
from item i
left join product p on i.product_id = p.product_id
where p.product_id is null;

结论:结果为空,也就是item中所有的数据都在product表中

(2)category表

selectcount(1) as num,count(distinct product_category_name) as category_num
from category;

检查category数据是否齐全

select count(1) c
from product p
left join category c on p.product_category_name = c.product_category_name
where c.product_category_name is null;

发现623条关联不上,然后检查原因。

有一部分product_category_name为空的,需要进行填充。如果是企业中,需要让业务人员填充成正确的品类名称,这里在清洗product表时直接填充成other_category即可。

对于product_category_name不为空,但是关联不上的,如果是企业中,需要让业务人员在category表中录入正确的数据。这里在清洗category表时直接添加数据即可

首先查询一下缺失的数据。

select distinct p.product_category_name
from product p
left join category c on p.product_category_name = c.product_category_name
where c.product_category_name is null;

在这里插入图片描述

(3)seller表

探查方法如product

(4)customer表

selectcount(1) as num,count(distinct customer_id) as customer_num,count(distinct customer_unique_id) as customer_unique_num
from customer;

结论:共有99441条记录,99441个用户,96096个唯一用户,说明有的用户有两个或以上的customer_id。

接下来验证这里的99441个用户是否和99441个订单一一对应。

select count(1) ct
from orders o
inner join customer c on o.customer_id = c.customer_id;

结果为99441,说明确实是一一对应的。这样可以理解成,customer_id是为了联系orders和customer表的一个字段,真正的用户id应该是 customer_unique_id

所以,这里的customer表并不是一个维表。只是一个连接orders表和用户表的中间表。

(5)geo表

鉴于geo表的质量比较差,另外seller表和customer表中已经有了city和state关键信息,也可以考虑不处理和不用geo表。

总结:所以,一共要同步3张维表,一张是product表,需要对product_category_name字段进行清洗;一张是category表,需要添加一些额外的数据;一张seller表,不需要额外处理。

3.1.2 建表设计

因为考虑到后续渐变维的处理,需要保留历史的维表状态,所以,需要将变化前的维表数据也保留下来。

所以,在设计表模型时,可以使用明细模型。

但是,明细模型会有一个问题,在任务重启时,如果重复数据导入了,会将重复的数据存储起来,不符合我们的要求,所以不能使用。

接下来就是可以选择更新模型和主键模型。为了保留更新前的数据,我们需要将update_time和原主键共同作为新的主键。这样,老的数据和新的数据就会都存在starrocks里。至于选更新模型和主键模型,需要考虑他们的应用场景。 维表属于变化少,但经常查的场景,即写少读多,所以,使用主键模型

(1)创建库

CREATE DATABASE IF NOT EXISTS dim;

(2)创建表

CREATE TABLE dim.dim_product_t (`product_id` varchar(255) NOT NULL,`update_time` datetime NOT NULL,`product_category_name` varchar(255) DEFAULT NULL,`product_name_lenght` int(11) DEFAULT NULL,`product_description_lenght` int(11) DEFAULT NULL,`product_photos_qty` int(11) DEFAULT NULL,`product_weight_g` int(11) DEFAULT NULL,`product_length_cm` int(11) DEFAULT NULL,`product_height_cm` int(11) DEFAULT NULL,`product_width_cm` int(11) DEFAULT NULL,`create_time` datetime DEFAULT NULL
) ENGINE=OLAP
PRIMARY KEY(`product_id`, `update_time`)
COMMENT "商品信息表"
DISTRIBUTED BY HASH(`product_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE dim.dim_seller_t (`seller_id` varchar(255) NOT NULL,`update_time` datetime NOT NULL,`seller_zip_code_prefix` int(11) DEFAULT NULL,`seller_city` varchar(255) DEFAULT NULL,`seller_state` varchar(255) DEFAULT NULL,`create_time` datetime DEFAULT NULL
) ENGINE=OLAP
PRIMARY KEY(`seller_id`, `update_time`)
COMMENT "商家信息表"
DISTRIBUTED BY HASH(`seller_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);
3.1.3 数据同步

在 /export/server/apache-seatunnel-2.3.5/config/job 下新建一个 dim.config 文件

env {parallelism = 1job.mode = "STREAMING"checkpoint.interval = 10000
}source {MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.product"]table-names-config = [{table = "ame.product"primaryKeys = ["product_id"]}]result_table_name = "table1"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.category"]table-names-config = [{table = "ame.category"primaryKeys = ["product_category_name"]}]result_table_name = "table2"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.seller"]table-names-config = [{table = "ame.seller"primaryKeys = ["seller_id"]}]result_table_name = "table3"}
}sink {StarRocks {source_table_name = "table1"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_product_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}StarRocks {source_table_name = "table2"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_category_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}StarRocks {source_table_name = "table3"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_seller_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}
}

运行命令:

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/dim.config -e local

其中:(1)脚本中包含多个source和sink,分别写多段即可。因为要同步多张表并写入到多张表中,所以需要再source端配置 result_table_name 然后就可以在sink端使用 source_table_name 来指定。

(2)因为这里没有转换的操作,所以transform没有。在实际工作中,可以通过sql的方式,将简单的转换操作写在transform中。

(3)注意:为了能够保留所有的变化数据,需要将 enable_upsert_delete 设置为 false。否则在StarRocks中只会保留更新后的那条数据。

3.2 渐变维处理

(1)什么是渐变维?

维表会缓慢发生变化,如果我们获取历史维表状态,把历史状态保留下来是比较困难的。

(2)拉链表怎么处理?

在这里插入图片描述

(3)传统拉链表有哪些痛点?

传统拉链表每天都要更新,虽然可以解决渐变维问题,但是存在如下痛点:

(1)制作和维护的成本比较高,容易出错。

(2)如果源表中没有最后更新时间,在取出当天更新的数据时会非常困难。

(3)无法解决一条数据当天多次更新问题

(4)StarRocks怎么创建拉链表?

1)使用lead()开窗函数,去计算数据的end_time

2)使用物化视图,让数据进行自动更新

示例如下:

CREATE MATERIALIZED VIEW IF NOT EXISTS dim.`dim_product_mv`
DISTRIBUTED BY HASH(`product_id`)
REFRESH ASYNC
as
selectproduct_id,update_time,ifnull(product_category_name, 'other_category') as product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,create_time,update_time as start_time,LEAD(update_time, 1, '9999-12-31 00:00:00') OVER (PARTITION BY product_id ORDER BY update_time) as end_time
from `dim`.`dim_product_t`;

测试:

mysql更新:

update product
set product_description_lenght=869, update_time='2024-09-01 10:14:19'
where product_id = '0036bb031e69d915cd384d1b3838b5d3';

starrocks查询:

select * from dim.`dim_product_mv`
where product_id = '0036bb031e69d915cd384d1b3838b5d3'

在这里插入图片描述

(5)创建所有拉链表

然后把另外两张物化视图也创建一下:

需要注意的是,dim.dim_category_mv 在添加数据的时候,需要进行判断,如果源表里有数据了,就不要添加了。使用的方式如下:

union all
select 'portateis_cozinha_e_preparadores_de_alimentos', '2016-09-01 11:15:19','portable_kitchen_appliances_and_food_preparation_devices', '2016-09-01 11:15:19','2016-09-01 11:15:19', '9999-12-31 00:00:00'
where not exists (select 1 from `dim`.`dim_category_t`where product_category_name = 'portateis_cozinha_e_preparadores_de_alimentos'
)

完整代码如下:

CREATE MATERIALIZED VIEW IF NOT EXISTS dim.`dim_category_mv`
DISTRIBUTED BY HASH(`product_category_name`)
REFRESH ASYNC
as
selectproduct_category_name,update_time,product_category_name_english,create_time,update_time as start_time,LEAD(update_time, 1, '9999-12-31 00:00:00') OVER (PARTITION BY product_category_name ORDER BY update_time) as end_time
from `dim`.`dim_category_t`union all
select 'portateis_cozinha_e_preparadores_de_alimentos', '2016-09-01 11:15:19','portable_kitchen_appliances_and_food_preparation_devices', '2016-09-01 11:15:19','2016-09-01 11:15:19', '9999-12-31 00:00:00'
where not exists (select 1 from `dim`.`dim_category_t`where product_category_name = 'portateis_cozinha_e_preparadores_de_alimentos'
)union all
select 'pc_gamer', '2016-09-01 11:15:19','pc_gamer', '2016-09-01 11:15:19','2016-09-01 11:15:19', '9999-12-31 00:00:00'
where not exists (select 1 from `dim`.`dim_category_t`where product_category_name = 'pc_gamer'
)union all
select 'other_category', '2016-09-01 11:15:19','other_category', '2016-09-01 11:15:19','2016-09-01 11:15:19', '9999-12-31 00:00:00'
where not exists (select 1 from `dim`.`dim_category_t`where product_category_name = 'other_category'
);CREATE MATERIALIZED VIEW IF NOT EXISTS dim.`dim_seller_mv`
DISTRIBUTED BY HASH(`seller_id`)
REFRESH ASYNC
as
selectseller_id,update_time,seller_zip_code_prefix,seller_city,seller_state,create_time,update_time as start_time,LEAD(update_time, 1, '9999-12-31 00:00:00') OVER (PARTITION BY seller_id ORDER BY update_time) as end_time
from `dim`.`dim_seller_t`;

3.3 维表关联

如果想要使用星型模型的话,需要将维表进行关联,生成大的维表。因为在starrocks里,两张维表都是拉链表,同一条主键的数据,可能会有多条记录。但是,两个表的记录的周期不一样,所以,无法进行正确的关联

所以,不能够使用星型模型,只能使用雪花模型。让事实表一一去关联维表。

在这里插入图片描述

4、销售主题开发【掌握】

4.1 需求分析

需求:对不同商品、不同品类的销售情况进行分析。

主要计算的指标有:销售单量、销售数量、商品金额、运费金额、其他金额、支付金额、商品成本、运费成本、总成本、毛利润、净利润、取消商品单量、取消商品数量、取消商品金额。

其中:销售单量为单号数量。总成本=商品成本+运费成本。毛利润=商品金额-商品成本。净利润=支付金额-总成本。

主要包括的维度有:时间、区域、类别

主要包括的粒度有:时间维度(天)、区域维度(城市、州)、类别维度(商品、品类)

实现方式:伪实时

4.2 需求开发思路

(1)需求开发流程【全公司】
在这里插入图片描述

(2)需求分析和开发【技术人员】
在这里插入图片描述

4.3 数据探查

(1)orders表

select count(1) num,max(order_purchase_timestamp) as max_time,min(order_purchase_timestamp) as min_time,count(distinct order_id) as order_num
from orders;

结论:共99441条记录,99441单,最小支付时间2016-09-04 21:15:19,最大支付时间2018-10-17 17:30:18

(2)item表

select count(1) num,count(distinct order_id) as order_num
from item;

结论:共112650条记录,98666单,存在一单多条商品。并且有的订单没有明细。

selectcount(distinct o.order_id) as num
from orders o
inner join item i
on o.order_id = i.order_id;

结论:结果是98666,也就是说98666单都在orders表中。同时说明订单表中有775单没有商品明细。

理论上讲,有一单就应该有一单明细数据。出现这种情况,需要高度注意。首先去查看一下缺失的775单是什么情况。

select o.*
from orders o
left join item i
on o.order_id = i.order_id
where i.order_id is null;

如果从这775单中可以查到一些共性规律,则这种情况可能是正常情况,比如取消的订单没有明细。但是从这775单中没有发现这样的情况。这说明确实是一种异常情况。实际工作中,要去查看数据抽取过程中是否有问题,如果数据抽取没问题,那就要去看数据源头为什么会出现问题,然后让后端工程师进行修复。

所以这里需要注意,两表在关联时,需要使用orders作为主表,然后对关联不上的item表进行处理

需要注意的是,因为item中缺数据,所以需要给缺失的orders记录填一个默认的商品。然后将这个商品记录到product中。由于不能改源表(只有后端工程师才有这种权限,所以只能修改数仓中的表,也就是只能修改dim_product_mv

DROP MATERIALIZED VIEW IF EXISTS dim.`dim_product_mv`;CREATE MATERIALIZED VIEW IF NOT EXISTS dim.`dim_product_mv`
DISTRIBUTED BY HASH(`product_id`)
REFRESH ASYNC
as
selectproduct_id,update_time,ifnull(product_category_name, 'other_category') as product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,create_time,update_time as start_time,LEAD(update_time, 1, '9999-12-31 00:00:00') OVER (PARTITION BY product_id ORDER BY update_time) as end_time
from `dim`.`dim_product_t`union all
select 'other_product', '2016-09-01 10:15:19','other_category', 13, 1, 1, 1, 1, 1, 1, '2016-09-01 11:15:19','2016-09-01 11:15:19', '9999-12-31 00:00:00'
where not exists (select 1 from `dim`.`dim_product_t`where product_id = 'other_product'
);

(3)payment表

selectcount(1) as num,count(distinct order_id) as customer_num
from payment;

结论:共有103886条记录,99440个订单。说明存在一条订单支付多次的情况,需要使用payment_sequential来区分。另外,需要注意的是,比orders表中订单少了一条。

接下来需要验证剩下的99440单是否都在orders表中。

select o.*
from orders o
left join payment p on o.order_id = p.order_id
where p.order_id is null;

结果只有一条关联不上,说明只有order_id为bfbd0f9bdef84302105ad712db648a6c的这单在payment中没有。

实际工作中,需要检查数据同步有没有问题,如果没有问题,则查看源数据有没有问题,如果源数据有问题,则需要反馈给后端工程师进行修复。

假设给后端工程师反馈后,他们在payment中插入了一条记录(其中143.46是从item表加和得到的)。

-- 在mysql中运行
insert into payment values ('bfbd0f9bdef84302105ad712db648a6c', 1, 'credit_card', 2, 143.46, '2016-09-15 12:16:38');

现在所有订单都有了相应的支付单,接下来看一个核对一个问题,商品的费用加上运费是否等于支付的金额。

-- 15846423.63
select sum(pm.payment_value) from payment pm
inner join (select distinct order_id from item) i on pm.order_id = i.order_id
;
-- 15843553.24
select sum(price) + sum(freight_value) as total from item;

结果:支付表多2870.39

查询明细可以发现,有的是支付金额比明细金额高,可能存在其他费用,有的支付金额比明细金额低,可能存在优惠金额。在工作中遇到这种情况,应该去询问相关业务人员或者是后端工程师,看是否还有其他表,表中记录了其他的费用信息,比如优惠券表或者包装费表等。

select t1.order_id, t1.payment_value, t2.total from
(select order_id, sum(payment_value) as payment_value from payment group by order_id) t1
inner join (select order_id, sum(price) + sum(freight_value) as total from item group by order_id) t2on t1.order_id = t2.order_id
where  t2.total != t1.payment_value;

如果上述计算非常慢,可以进行使用临时表+索引的方式进行优化。

create temporary table temp1 as
select order_id, sum(payment_value) as payment_value from payment group by order_id;
create index idx_customer_id on temp1(order_id);create temporary table temp2 as
select order_id, sum(price) + sum(freight_value) as total from item group by order_id;
create index idx_customer_id on temp2(order_id);select t1.order_id, t1.payment_value, t2.total
from temp1 t1
inner join temp2 t2on t1.order_id = t2.order_id
where  t1.payment_value != t2.total
;

通过询问后端工程师,还有一张表other_amount表,用来解决payment表和item表中的金额不同的问题。这种表的字段包括order_id, order_purchase_timestamp,other_amount

其中费用和金额的公式为:商品费用+运费+其他费用=支付金额

这里需要将other_amount表导入到MySQL中,然后进行分析。可以直接运行数据中的other_amount.sql,也可以在linux中登录MySQL后,运行下面的命令。

use ame;
source /export/data/mysql/other_amount.sql

然后进行分析:

selectcount(1) as num,count(distinct order_id) as order_num
from other_amount;

结论:共有576条记录,576单。看上去这里只记录有差异的单子,接下来做一个验证。

查看一下金额不相等的有多少单。

select count(1) as num
from temp1 t1
inner join temp2 t2on t1.order_id = t2.order_id
where  t1.payment_value != t2.total;

结论:正好有576单。所以是可以匹配上的。所以在后续处理订单表时,需要将other_amount表关联上,来处理金额差异问题。

(4)orders_cost表

因为需求中涉及到了成本,此时就需要向后端工程师或产品经理询问,成本可以在哪个表里获取。从后端工程师的回复中得知,需要从orders_cost表中获取订单的成本。

这里需要将orders_cost表导入到MySQL中,然后进行分析。可以直接运行数据中的orders_cost.sql,也可以在linux中登录MySQL后,运行下面的命令。

use ame;
source /export/data/mysql/orders_cost.sql

然后进行分析:

selectcount(1) as num,count(distinct order_id) order_num
from orders_cost;

结论:共有113425条记录,99441单。

接下来看一下orders_cost和orders表的关系。

selectcount(distinct o.order_id) as num
from orders o
inner join orders_cost i
on o.order_id = i.order_id;

结论:结果是99441,也就是和orders表相同。

接下来看一下orders_cost和item表的关系。

selectcount(1) as num
from item i
left join orders_cost c
on i.order_id = c.order_id and i.order_item_id=c.order_item_id
where c.order_id is null;

结论:结果为0,所有item的记录都在orders_cost表中,orders_cost表更全。

(5)customer表

customer表共有99441条记录,和99441个订单一一对应。customer_id是为了联系orders和customer表的一个字段,真正的用户id应该是customer_unique_id。

所以,这里的customer表并不是一个维表。只是一个连接orders表和用户表的中间表。

(6)review表

selectcount(1) as num,count(distinct review_id) as review_num,count(distinct order_id) as order_num
from review;

结论:共有99224条记录,涉及到98410次评论和98673单。说明一次评论可以评论多个订单,另外说明有些订单没有评论,有些订单有多次评论(因为一共99224条记录,但是只有98673单)。

对于这98673单需要验证是否都是orders表中数据。

select r.* from review r 
left join orders o on r.order_id = o.order_id
where o.order_id is null;

结果为空,说明所有评论的订单都在orders表中,只是部分订单没有评论。所以在关联时,需要将orders表作为主表。对于没有评论的订单可以给个默认评分。

4.4 数据建模

逻辑建模:
在这里插入图片描述

如果上表看不清,也可以看下边这个。
在这里插入图片描述

可以生成一张订单明细宽表,将orders表、item表、customer表、payment表、review表、orders_cost表、other_amount表、product表、category表、seller表整合到一起。因为orders表与item表、payment表、review表都是1对多的关系,所以在关联时不能直接关联,否则就会笛卡尔积的错误。正确的处理方法是,因为要将商品等信息整合进来,所以需要保留orders表和item表之间的一对多的关系。然后将payment表和review表根据order_id进行聚合,一个order_id保留一条数据,从而将这种一对多的关系转成了一对一的关系。此时虽然关联不会出错,但是需要清楚的是payment和other_amount都是相对于整单来说的,与商品没有关系,与item表关联后需要注意指标的歧义性。

有需要可以生成一张支付宽表,将orders表、payment表、customer表、review表整合到一起。因为订单明细宽表中,将payment表进行了聚合,部分信息没有展示出来。所以如果深入分析支付的话,需要有一张支付宽表,同时将customer表、review表的信息整合进去,方便进行多角度分析。

同理,有需要也可以生成一张评论宽表,将orders表、customer表、review表信息整合在一起。保留review表的所有信息,从而便于深入分析。

物理建模:
在这里插入图片描述

注意:从这个建模的过程,可以发现,其实明细宽表是最难的。所以在写简历时,前几个项目,可以只写ADS层或DWS层。后几个项目再写整个主题的开发,包括ODS/DWD/DWM/DWS/ADS。

4.5 ods开发

4.5.1 建表设计

(1)建表设计

1)分区存储 【因为事实表数据量大;因为想实现分区更新】

2)按照主键更新【没必要保留所有的变更过程,只需要最后结果正确即可,即保留最新的记录,和维表不同,所以不需要再使用update_time作为联合主键】

3)更新模型【因为事实表它是经常发生变化的,属于写多读少的场景,所以使用更新模型】

(2)分区字段构造

1)分区字段的选择:一旦数据产生,这个分区就确定了,后续不能发生变化。所以不能使用更新时间。最好的时间,就是数据创建的时间

2)分区字段的计算:因为是在数据同步的时候进行的分区,所以需要在同步的时候就计算好。所以,可以在SeaTunnel同步时,写在transform里边
在这里插入图片描述

(3)建表

CREATE TABLE fact.`ods_sale_orders_t` (`order_id` varchar(255) NOT NULL,`dt` date NOT NULL COMMENT "使用order_purchase_timestamp作为分区时间",`customer_id` varchar(255) DEFAULT NULL,`order_status` varchar(255) DEFAULT NULL,`order_purchase_timestamp` datetime DEFAULT NULL,`order_approved_at` datetime DEFAULT NULL,`order_delivered_carrier_date` datetime DEFAULT NULL,`order_delivered_customer_date` datetime DEFAULT NULL,`order_estimated_delivery_date` datetime DEFAULT NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`, `dt`)
COMMENT "订单表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE fact.`ods_sale_item_t` (`order_id` varchar(255) NOT NULL,`order_item_id` int(11) NOT NULL,`dt` date NOT NULL COMMENT "使用order_purchase_timestamp作为分区时间",`product_id` varchar(255) DEFAULT NULL,`seller_id` varchar(255) DEFAULT NULL,`shipping_limit_date` datetime DEFAULT NULL,`price` decimal(6,2) DEFAULT NULL,`freight_value` decimal(6,2) DEFAULT NULL,`order_purchase_timestamp` datetime DEFAULT NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`, `order_item_id`, `dt`)
COMMENT "订单详情数据表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE fact.`ods_sale_orders_cost_t` (`order_id` varchar(255) NOT NULL,`order_item_id` int(11) NOT NULL,`dt` date NOT NULL COMMENT "使用order_purchase_timestamp作为分区时间",`product_id` varchar(255) NOT NULL DEFAULT '',`order_purchase_timestamp` datetime NULL DEFAULT NULL,`product_cost` double DEFAULT NULL,`freight_cost` decimal(6,2) DEFAULT NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`, `order_item_id`, `dt`)
COMMENT "订单成本表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE fact.`ods_sale_payment_t` (`order_id` varchar(255) NOT NULL,`payment_sequential` int(11) NOT NULL,`dt` date NOT NULL COMMENT "使用order_purchase_timestamp作为分区时间",`payment_type` varchar(255) DEFAULT NULL,`payment_installments` int(11) DEFAULT NULL,`payment_value` decimal(10,2) DEFAULT NULL,`order_purchase_timestamp` datetime DEFAULT NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`,`payment_sequential`, `dt`)
COMMENT "订单付款数据表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE fact.`ods_sale_customer_t` (`customer_id` varchar(255) NOT NULL,`dt` date NOT NULL COMMENT "使用order_purchase_timestamp作为分区时间",`customer_unique_id` varchar(255) DEFAULT NULL,`customer_zip_code_prefix` int(11) DEFAULT NULL,`customer_city` varchar(255) DEFAULT NULL,`customer_state` varchar(255) DEFAULT NULL,`order_purchase_timestamp` datetime DEFAULT NULL
) ENGINE=OLAP 
UNIQUE KEY(`customer_id`, `dt`)
COMMENT "客户信息表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`customer_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE fact.`ods_sale_other_amount_t` (`order_id` varchar(255) NOT NULL,`dt` date NOT NULL COMMENT "使用order_purchase_timestamp作为分区时间",`order_purchase_timestamp` datetime NULL DEFAULT NULL,`other_amount` decimal(10,2) DEFAULT '0.00'
) ENGINE=OLAP
UNIQUE KEY(`order_id`, `dt`)
COMMENT "其他金额表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);CREATE TABLE fact.`ods_sale_review_t` (`review_id` varchar(255) NOT NULL,`order_id` varchar(255) NOT NULL,`dt` date NOT NULL COMMENT "使用review_creation_date作为分区时间",`review_score` int(11) DEFAULT NULL,`review_comment_title` varchar(255) DEFAULT NULL,`review_comment_message` varchar(255) DEFAULT NULL,`review_creation_date` date DEFAULT NULL,`review_answer_timestamp` datetime DEFAULT NULL
) ENGINE=OLAP
UNIQUE KEY(`review_id`,`order_id`, `dt`)
COMMENT "用户评论表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`review_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);
4.5.2 数据同步

在 /export/server/apache-seatunnel-2.3.5/config/job 下新建一个 sale.config 文件

env {parallelism = 1job.mode = "STREAMING"checkpoint.interval = 600000
}source {MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.orders"]table-names-config = [{table = "ame.orders"primaryKeys = ["order_id"]}]result_table_name = "table1"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.item"]table-names-config = [{table = "ame.item"primaryKeys = ["order_id", "order_item_id"]}]result_table_name = "table2"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.orders_cost"]table-names-config = [{table = "ame.orders_cost"primaryKeys = ["order_id", "order_item_id"]}]result_table_name = "table3"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.payment"]table-names-config = [{table = "ame.payment"primaryKeys = ["order_id", "payment_sequential"]}]result_table_name = "table4"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.customer"]table-names-config = [{table = "ame.customer"primaryKeys = ["customer_id"]}]result_table_name = "table5"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.other_amount"]table-names-config = [{table = "ame.other_amount"primaryKeys = ["order_id"]}]result_table_name = "table6"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"startup.mode = "initial"table-names = ["ame.review"]table-names-config = [{table = "ame.review"primaryKeys = ["review_id", "order_id"]}]result_table_name = "table7"}
}transform {Sql {source_table_name = "table1"result_table_name = "tf1"query = "select *, DATE_TRUNC(order_purchase_timestamp, 'DAY') as dt from table1"}Sql {source_table_name = "table2"result_table_name = "tf2"query = "select *, DATE_TRUNC(order_purchase_timestamp, 'DAY') as dt from table2"}Sql {source_table_name = "table3"result_table_name = "tf3"query = "select *, DATE_TRUNC(order_purchase_timestamp, 'DAY') as dt from table3"}Sql {source_table_name = "table4"result_table_name = "tf4"query = "select *, DATE_TRUNC(order_purchase_timestamp, 'DAY') as dt from table4"}Sql {source_table_name = "table5"result_table_name = "tf5"query = "select *, DATE_TRUNC(order_purchase_timestamp, 'DAY') as dt from table5"}Sql {source_table_name = "table6"result_table_name = "tf6"query = "select *, DATE_TRUNC(order_purchase_timestamp, 'DAY') as dt from table6"}Sql {source_table_name = "table7"result_table_name = "tf7"query = "select *, review_creation_date as dt from table7"}
}sink {StarRocks {source_table_name = "tf1"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_orders_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}StarRocks {source_table_name = "tf2"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_item_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}StarRocks {source_table_name = "tf3"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_orders_cost_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}StarRocks {source_table_name = "tf4"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_payment_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}StarRocks {source_table_name = "tf5"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_customer_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}StarRocks {source_table_name = "tf6"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_other_amount_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}StarRocks {source_table_name = "tf7"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_sale_review_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}
}

注意点:

(1)因为StarRocks表中多了一个dt字段,所以在数据同步时,需要构造出一个新的字段。在SeaTunnel中,支持使用transform来做一些转换工作。在这里,通过使用sql的方式,将作为分区时间的字段转成日期类型。

关于SeaTunnel的函数文档如下:https://seatunnel.apache.org/zh-CN/docs/2.3.5/transform-v2/sql-functions#time-and-date-functions

在使用Sql时,可以选择相应的函数对数据进行加工。

(2)注意:为了能够保留最新的数据,需要将 enable_upsert_delete 设置为 true,这样新来的数据就会根据主键更新掉原有的数据,只保留最新的数据

(3)在企业中会有初始化采集、增量采集、自定义采集的情况,所以需要在配置文件中将startup.mode加上。这样就可以灵活配置了。比如在项目中,我们先设置startup.mode为initial,然后采集完成后,可以正常关机,后续改成latest接着采集增量数据就可以了。
在这里插入图片描述

运行命令:

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/sale.config -e local

注意:因为数据量大,运行时间长,消耗资源多,如果电脑资源不够,则一张表一张表拆开同步即可。

另外,如果报checkpoint超时,则可以修改/export/server/apache-seatunnel-2.3.5/config/seatunnel.yaml,将timeout调大,比如调成600000。

4.6 DWD开发

4.6.1 需求分析

需要去构建一张订单明细宽表,所以以orders和item表做为核心表。

1)首先处理订单的相关信息。首先关联customer表(inner join),然后other_amount表(left join),然后payment表(先聚合再inner join),再然后review表(先聚合,再left join)

2)再处理明细的相关信息。首先关联item表(left join,需要处理空值);再关联orders_cost表(理论上inner join,但是因为item表是左关联,orders_cost表需要使用item表的信息,所以orders_cost表也只能是左关联,这个是starrocks自己的优化问题

3)再关联维表。需要注意一是使用拉链表的方式进行关联,二是需要对关联的空值进行处理。

需要将seller维表重建:

DROP MATERIALIZED VIEW IF EXISTS dim.`dim_seller_mv`;CREATE MATERIALIZED VIEW IF NOT EXISTS dim.`dim_seller_mv`
DISTRIBUTED BY HASH(`seller_id`)
REFRESH ASYNC
as
selectseller_id,update_time,seller_zip_code_prefix,seller_city,seller_state,create_time,update_time as start_time,LEAD(update_time, 1, '9999-12-31 00:00:00') OVER (PARTITION BY seller_id ORDER BY update_time) as end_time
from `dim`.`dim_seller_t`union all
select 'other_seller', '2016-09-01 12:15:19', 0,'other_city', 'other_state', '2016-09-01 12:15:19','2016-09-01 12:15:19', '9999-12-31 00:00:00'
where not exists (select 1 from `dim`.`dim_seller_t`where seller_id = 'other_seller'
);
4.6.2 计算

初始版:

CREATE VIEW fact.temp_order_dtl
as
selecto.order_id,o.customer_id,o.order_status,o.order_purchase_timestamp,o.order_approved_at,o.order_delivered_carrier_date,o.order_delivered_customer_date,o.order_estimated_delivery_date,c.customer_unique_id,c.customer_zip_code_prefix,c.customer_city,c.customer_state,ifnull(oa.other_amount, 0) as other_amount,pm.payment_times,pm.payment_value,ifnull(r.rewiews_num, 0) as rewiews_num,ifnull(r.review_score, 5) as review_score,ifnull(i.order_item_id, 1) as order_item_id,ifnull(i.product_id, 'other_product') as product_id,ifnull(i.seller_id, 'other_seller') as seller_id,i.shipping_limit_date,ifnull(i.price, pm.payment_value * 0.858) as price,ifnull(i.freight_value, pm.payment_value * 0.142) as freight_value,oc.product_cost,oc.freight_cost,s.seller_zip_code_prefix,s.seller_city,s.seller_state,p.product_category_name,p.product_name_lenght,p.product_description_lenght,p.product_photos_qty,p.product_weight_g,p.product_length_cm,p.product_height_cm,p.product_width_cm,ct.product_category_name_english,o.dt,date_trunc('month', o.dt) as month_day,date(date_sub(o.dt, dayofweek(o.dt)-1)) as week_day
from fact.ods_sale_orders_t o
inner join fact.ods_sale_customer_t c on o.customer_id = c.customer_id
left join fact.ods_sale_other_amount_t oa on o.order_id = oa.order_id
inner join (select order_id, count(1) as payment_times, sum(payment_value) as payment_value from fact.ods_sale_payment_t group by order_id) pmon o.order_id = pm.order_id
left join (select order_id, count(1) as rewiews_num, avg(review_score) as review_score from fact.ods_sale_review_t group by order_id) ron o.order_id = r.order_idleft join fact.ods_sale_item_t i on o.order_id = i.order_id
left join fact.ods_sale_orders_cost_t oc on o.order_id = oc.order_id and ifnull(i.order_item_id, 1) = oc.order_item_idleft join dim.dim_seller_mv son ifnull(i.seller_id, 'other_seller') = s.seller_idand o.order_purchase_timestamp >= s.start_time and o.order_purchase_timestamp < s.end_time
left join dim.dim_product_mv pon ifnull(i.product_id, 'other_product') = p.product_idand o.order_purchase_timestamp >= p.start_time and o.order_purchase_timestamp < p.end_time
left join dim.dim_category_mv cton p.product_category_name = ct.product_category_nameand o.order_purchase_timestamp >= ct.start_time and o.order_purchase_timestamp < ct.end_time
;

优化点:

1)所以为了方便数据核验,以及避免数据歧义,所以将订单粒度的指标进行拆分,拆分到每个商品上。

2)创建物化视图,创建成分区的方式

3)物化视图的刷新方式:因为实时刷新,会大量耗费计算资源。所以采取的刷新的方式是,每隔固定的时间进行刷新。

REFRESH ASYNC START(‘2025-01-01 01:00:00’) EVERY (interval 4 hour)

4)我们不希望维表变化时,刷新整个物化视图,所以需要把维表在更新依赖中去除掉。

PROPERTIES (
“excluded_trigger_tables” = “dim.dim_seller_mv,dim.dim_product_mv,dim.dim_category_mv”
)

最终版本:

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`dwd_sale_order_dtl_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`order_id`)
REFRESH ASYNC START('2025-01-01 01:00:00') EVERY (interval 4 hour)
PROPERTIES ("excluded_trigger_tables" = "dim.dim_seller_mv,dim.dim_product_mv,dim.dim_category_mv"
) 
as
selecto.order_id,o.customer_id,o.order_status,o.order_purchase_timestamp,o.order_approved_at,o.order_delivered_carrier_date,o.order_delivered_customer_date,o.order_estimated_delivery_date,c.customer_unique_id,c.customer_zip_code_prefix,c.customer_city,c.customer_state,if(i.order_id is not null, (i.price+i.freight_value)/(pm.payment_value-ifnull(oa.other_amount, 0)) * ifnull(oa.other_amount, 0), 0.00) as other_amount,pm.payment_times,if(i.order_id is not null, (i.price+i.freight_value)/(pm.payment_value-ifnull(oa.other_amount, 0)) * pm.payment_value, pm.payment_value) as payment_value,ifnull(r.rewiews_num, 0) as rewiews_num,ifnull(r.review_score, 5) as review_score,ifnull(i.order_item_id, 1) as order_item_id,ifnull(i.product_id, 'other_product') as product_id,ifnull(i.seller_id, 'other_seller') as seller_id,i.shipping_limit_date,ifnull(i.price, pm.payment_value * 0.858) as price,ifnull(i.freight_value, pm.payment_value * 0.142) as freight_value,oc.product_cost,oc.freight_cost,s.seller_zip_code_prefix,s.seller_city,s.seller_state,p.product_category_name,p.product_name_lenght,p.product_description_lenght,p.product_photos_qty,p.product_weight_g,p.product_length_cm,p.product_height_cm,p.product_width_cm,ct.product_category_name_english,o.dt,date_trunc('month', o.dt) as month_day,date(date_sub(o.dt, dayofweek(o.dt)-1)) as week_dayfrom fact.ods_sale_orders_t o
inner join fact.ods_sale_customer_t c on o.customer_id = c.customer_id
left join fact.ods_sale_other_amount_t oa on o.order_id = oa.order_id
inner join (select order_id, count(1) as payment_times, sum(payment_value) as payment_value from fact.ods_sale_payment_t group by order_id) pmon o.order_id = pm.order_id
left join (select order_id, count(1) as rewiews_num, avg(review_score) as review_score from fact.ods_sale_review_t group by order_id) ron o.order_id = r.order_idleft join fact.ods_sale_item_t i on o.order_id = i.order_id
left join fact.ods_sale_orders_cost_t oc on o.order_id = oc.order_id and ifnull(i.order_item_id, 1) = oc.order_item_idleft join dim.dim_seller_mv son ifnull(i.seller_id, 'other_seller') = s.seller_idand o.order_purchase_timestamp >= s.start_time and o.order_purchase_timestamp < s.end_time
left join dim.dim_product_mv pon ifnull(i.product_id, 'other_product') = p.product_idand o.order_purchase_timestamp >= p.start_time and o.order_purchase_timestamp < p.end_time
left join dim.dim_category_mv cton p.product_category_name = ct.product_category_nameand o.order_purchase_timestamp >= ct.start_time and o.order_purchase_timestamp < ct.end_time
;

验证:

注意:生成物化视图后,需要将数据落盘,所以速度较慢,建好物化视图后需要等一下,等数据稳定后在进行验证。

(1)检查数据条数

select count(1) ct from fact.ods_sale_orders_cost_t; -- 113425
select count(1) ct from fact.dwd_sale_order_dtl_mv; -- 113425

结果是正确的

(2)检查重要指标-金额

select sum(payment_value) as payment_value,sum(other_amount) as other_amount,sum(price) as price,sum(freight_value) as freight_value
from fact.dwd_sale_order_dtl_mv;

结果:
在这里插入图片描述

payment_value = other_amount + price + freight_value

所以计算是没有问题的。

4.6.3 支付明细表

有需要可以生成一张支付宽表,将orders表、payment表、customer表、review表整合到一起。

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`dwd_sale_payment_dtl_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`order_id`)
REFRESH ASYNC START('2025-01-01 01:15:00') EVERY (interval 4 hour)
as
selecto.order_id,o.customer_id,o.order_status,o.order_purchase_timestamp,o.order_approved_at,o.order_delivered_carrier_date,o.order_delivered_customer_date,o.order_estimated_delivery_date,c.customer_unique_id,c.customer_zip_code_prefix,c.customer_city,c.customer_state,p.payment_sequential,p.payment_type,p.payment_installments,p.payment_value,ifnull(r.rewiews_num, 0) as rewiews_num,ifnull(r.review_score, 5) as review_score,o.dt,date_trunc('month', o.dt) as month_day,date(date_sub(o.dt, dayofweek(o.dt)-1)) as week_dayfrom fact.ods_sale_orders_t o
inner join fact.ods_sale_customer_t c on o.customer_id = c.customer_id
inner join fact.ods_sale_payment_t p on o.order_id = p.order_id
left join (select order_id, count(1) as rewiews_num, avg(review_score) as review_scorefrom fact.ods_sale_review_t group by order_id) ron o.order_id = r.order_id
;

4.7 DWS开发

(一)需求分析:

(1)需要DWS层要取维度中的最细粒度进行聚合

(2)指标:要包含需求中所有的指标,还可以加入一些未来可能需要的指标

(二)计算

==注意:需要预留订单明细表的计算时间。比如订单明细表更新需要10分钟,就可以等订单明细表计算开始15分钟后开始计算DWS这张宽表。==还有一个好处就是避免集群同时运行过多任务,造成负载过大。

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`dws_sale_city_goods_statistics_day_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`product_id`)
REFRESH ASYNC START('2025-01-01 01:30:00') EVERY (interval 4 hour)
as
selectdt,month_day,week_day,product_id,product_category_name,product_category_name_english,product_weight_g,product_length_cm,product_height_cm,product_width_cm,customer_city,customer_state,count(order_id) as order_num,sum(1) as product_num -- 这里因为item表中没有数量的字段,所以默认只买了一个商品,round(sum(price), 2) as product_value,round(sum(freight_value), 2) as freight_value,round(sum(other_amount), 2) as other_amount,round(sum(payment_value), 2) as payment_value,round(sum(product_cost), 2) as product_cost,round(sum(freight_cost), 2) as freight_cost,round(sum(product_cost) + sum(freight_cost), 2) as total_cost,round(sum(price) - sum(product_cost), 2) as gross_profit,round(sum(payment_value) - sum(product_cost) - sum(freight_cost), 2) as net_profit,count(if(order_status='canceled',order_id,null)) as cancel_order_num,sum(if(order_status='canceled', 1, 0)) as cancel_product_num,round(sum(if(order_status='canceled', price, 0)), 2) as cancel_product_value,round(sum(if(order_status='canceled', freight_value, 0)), 2) as cancel_freight_value,round(sum(if(order_status='canceled', other_amount, 0)), 2) as cancel_other_amount,round(sum(if(order_status='canceled', payment_value, 0)), 2) as cancel_payment_value,round(sum(if(order_status='canceled', product_cost, 0)), 2) as cancel_product_cost,round(sum(if(order_status='canceled', freight_cost, 0)), 2) as cancel_freight_cost
from fact.`dwd_sale_order_dtl_mv`
group bydt,month_day,week_day,product_id,product_category_name,product_category_name_english,product_weight_g,product_length_cm,product_height_cm,product_width_cm,customer_city,customer_state
;

(三)数据核验

select sum(product_value) product_value,sum(freight_value) freight_value,sum(other_amount) other_amount,sum(payment_value) as payment_value
from fact.` dws_sale_city_goods_statistics_day_mv`;

4.8 ADS开发

(1)ads_sale_state_goods_statistics_day_mv 每天州商品分析物化视图

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`ads_sale_state_goods_statistics_day_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`product_id`)
REFRESH ASYNC START('2025-01-01 01:35:00') EVERY (interval 4 hour)
as
selectdt,month_day,week_day,product_id,product_category_name,product_category_name_english,product_weight_g,product_length_cm,product_height_cm,product_width_cm,customer_state,sum(order_num) as order_num,sum(product_num) as product_num -- 这里因为item表中没有数量的字段,所以默认只买了一个商品,sum(product_value) as product_value,sum(freight_value) as freight_value,sum(other_amount) as other_amount,sum(payment_value) as payment_value,sum(product_cost) as product_cost,sum(freight_cost) as freight_cost,sum(total_cost) as total_cost,sum(gross_profit) as gross_profit,sum(gross_profit) as net_profit,sum(cancel_order_num) as cancel_order_num,sum(cancel_product_num) as cancel_product_num,sum(cancel_product_value) as cancel_product_value,sum(cancel_freight_value) as cancel_freight_value,sum(cancel_other_amount) as cancel_other_amount,sum(cancel_payment_value) as cancel_payment_value,sum(cancel_product_cost) as cancel_product_cost,sum(cancel_freight_cost) as cancel_freight_cost
from fact.`dws_sale_city_goods_statistics_day_mv`
group bydt,month_day,week_day,product_id,product_category_name,product_category_name_english,product_weight_g,product_length_cm,product_height_cm,product_width_cm,customer_state
;

(2)ads_sale_city_category_statistics_day_mv 每天城市品类分析物化视图

这里需要注意:order_num、cancel_order_num这种是半累加值,不能从每天城市商品分析物化视图中得出。比如,一个人买了1斤苹果,1斤梨,在每天城市商品分析物化视图中因为商品是最细粒度,所以在算订单时,一共是2单,苹果对应1单,梨对应1单,加合起来就是2单。而如果按照品类来看,他们属于同一个品类,这时计算的时候,就会合并成1单。所以这三个字段需要从dwd_sale_ order_dtl_mv计算得出。

注意:虽然这里所有指标都可以直接从dwd_sale_ order_dtl_mv中计算得到,但是这样会使同一指标有两种计算方式,容易产生数据不一致。所以只要能出的指标都从DWS层的宽表中出,保证指标的一致性。不能出的再从其他表中获取。

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`ads_sale_city_category_statistics_day_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`product_category_name`)
REFRESH ASYNC START('2025-01-01 01:40:00') EVERY (interval 4 hour)
as
select x.*, y.order_num, y.cancel_order_num
from (selectdt,month_day,week_day,product_category_name,product_category_name_english,customer_city,customer_state,sum(product_num) as product_num -- 这里因为item表中没有数量的字段,所以默认只买了一个商品,sum(product_value) as product_value,sum(freight_value) as freight_value,sum(other_amount) as other_amount,sum(payment_value) as payment_value,sum(product_cost) as product_cost,sum(freight_cost) as freight_cost,sum(total_cost) as total_cost,sum(gross_profit) as gross_profit,sum(gross_profit) as net_profit,sum(cancel_product_num) as cancel_product_num,sum(cancel_product_value) as cancel_product_value,sum(cancel_freight_value) as cancel_freight_value,sum(cancel_other_amount) as cancel_other_amount,sum(cancel_payment_value) as cancel_payment_value,sum(cancel_product_cost) as cancel_product_cost,sum(cancel_freight_cost) as cancel_freight_costfrom fact.`dws_sale_city_goods_statistics_day_mv`group bydt,month_day,week_day,product_category_name,product_category_name_english,customer_city,customer_state) x
left join (selectdt,product_category_name,customer_city,count(distinct order_id) as order_num,count(distinct if(order_status='canceled',order_id,null)) as cancel_order_numfrom fact.`dwd_sale_order_dtl_mv`group bydt,product_category_name,customer_city) y
on x.dt=y.dt and x.product_category_name=y.product_category_name and x.customer_city=y.customer_city
;

(3)ads_sale_state_category_statistics_day_mv 每天州品类分析物化视图

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`ads_sale_state_category_statistics_day_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`product_category_name`)
REFRESH ASYNC START('2025-01-01 01:45:00') EVERY (interval 4 hour)
as
selectdt,month_day,week_day,product_category_name,product_category_name_english,customer_state,sum(order_num) as order_num,sum(product_num) as product_num -- 这里因为item表中没有数量的字段,所以默认只买了一个商品,sum(product_value) as product_value,sum(freight_value) as freight_value,sum(other_amount) as other_amount,sum(payment_value) as payment_value,sum(product_cost) as product_cost,sum(freight_cost) as freight_cost,sum(total_cost) as total_cost,sum(gross_profit) as gross_profit,sum(gross_profit) as net_profit,sum(cancel_order_num) as cancel_order_num,sum(cancel_product_num) as cancel_product_num,sum(cancel_product_value) as cancel_product_value,sum(cancel_freight_value) as cancel_freight_value,sum(cancel_other_amount) as cancel_other_amount,sum(cancel_payment_value) as cancel_payment_value,sum(cancel_product_cost) as cancel_product_cost,sum(cancel_freight_cost) as cancel_freight_cost
from fact.`ads_sale_city_category_statistics_day_mv`
group bydt,month_day,week_day,product_category_name,product_category_name_english,customer_state
;

5、会员主题开发【掌握】

5.1 需求分析

需求:对不同会员的注册、登录、消费、复购等情况进行分析。

主要计算的指标有:销售单量、商品金额、运费金额、其他金额、支付金额、商品成本、运费成本、总成本、毛利润、净利润、取消商品单量、取消商品金额、取消运费金额、取消其他金额、取消支付金额、取消商品成本、取消运费成本。消费人数、总评论次数、评价评论分数、会员注册人数、登录人数、首单人数。

主要包括的维度有:时间、区域

主要包括的粒度有:时间维度(天)、区域维度(城市、州)

实现方式:离线

5.2 数据探查

为完成会员主题的开发,还需要会员的注册表以及会员登录表。

(1)register表

这张表除了注册外,也记录了会员的基本信息。

建表语句如下:

CREATE TABLE `register`  (`customer_unique_id` varchar(255) NOT NULL,`customer_city` varchar(255),`customer_state` varchar(255),`register_date` datetime,`name` varchar(100),`birthday` date,`phone` varchar(20),`gender` varchar(5),`update_time` datetime,PRIMARY KEY (`customer_unique_id`) USING BTREE,INDEX `idx_product_category`(`customer_city`) USING BTREE
);

这里需要将register表导入到MySQL中,然后进行分析。可以直接运行数据中的register.sql,也可以在linux中登录MySQL后,运行下面的命令。

use ame;
source /export/data/mysql/register.sql

然后进行分析:

select count(1) ct,max(register_date) as register_date_max,min(register_date) as register_date_min
from register;

结果:
在这里插入图片描述

共有296096条数据,最早的注册时间是2015-09-01,最晚的注册时间是2018-10-16。

接下来验证一下customer表中的所有用户是否都在register表中。

select distinct c.customer_unique_id
from customer c
left join register r on c.customer_unique_id = r.customer_unique_id
where r.customer_unique_id is null;

结果为空,说明都在,说明register表是完整的。

但是register表中有296096条数据,说明有一部分会员是没有消费记录的(可能从没消费过,或者提供的数据中不包括)。

(2)logins表

会员每次登录到软件或网站时,会记录一次登录。记录的具体信息详见建表语句。

建表语句如下:

CREATE TABLE `logins`  (`customer_unique_id` varchar(100) NOT NULL,`login_time` datetime NOT NULL,`ip_address` varchar(50),PRIMARY KEY (`customer_unique_id`, `login_time`) USING BTREE
);

这里需要将logins表导入到MySQL中,然后进行分析。可以直接运行数据中的logins.sql,也可以在linux中登录MySQL后,运行下面的命令。

use ame;
source /export/data/mysql/logins.sql

然后进行分析:

select count(1) ct,count(distinct customer_unique_id) customer_num
from logins;

结果:共有400419条记录,有223737个会员。也就是存在一个会员多次登录的情况,这个是符合实际情况的。

同样,可以验证下这里的会员是否都在register表中。

select distinct c.customer_unique_id
from logins c
left join register r on c.customer_unique_id = r.customer_unique_id
where r.customer_unique_id is null;

结果为空,也就是都在register表中。

5.3 数据建模

逻辑建模:

register表相当于维表,而logins表相当于事实表。

物理建模:

需要注意的是:因为分析会员的绝大部分指标是不需要细化到商品的,所以需要一张订单粒度的销售表,所以需要一张 会员订单物化视图。再基于这张物化视图生成会员首单表,再去计算首单的指标。

另外一个注意点:除了register表和会员订单物化视图外,其他都是离线更新的表。

在这里插入图片描述

5.4 ODS开发

5.4.1 dim_register_t

(1)建表

注册表中包含会员的基本信息,可以做维表处理

CREATE TABLE dim.dim_register_t (`customer_unique_id` varchar(255) NOT NULL,`update_time` datetime NOT NULL,`customer_city` varchar(255) ,`customer_state` varchar(255),`register_date` datetime,`name` varchar(100),`birthday` date,`phone` varchar(20),`gender` varchar(5)
) ENGINE=OLAP
PRIMARY KEY(`customer_unique_id`, `update_time`)
COMMENT "会员注册表"
DISTRIBUTED BY HASH(`customer_unique_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);

(2)数据同步

可以将同步任务合并到之前的维表开发的任务中。

修改 /export/server/apache-seatunnel-2.3.5/config/job 下的 dim.config 文件

env {parallelism = 1job.mode = "STREAMING"checkpoint.interval = 10000
}source {MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.product"]table-names-config = [{table = "ame.product"primaryKeys = ["product_id"]}]result_table_name = "table1"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.category"]table-names-config = [{table = "ame.category"primaryKeys = ["product_category_name"]}]result_table_name = "table2"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.seller"]table-names-config = [{table = "ame.seller"primaryKeys = ["seller_id"]}]result_table_name = "table3"}MySQL-CDC {base-url = "jdbc:mysql://node1:3306/ame"username = "root"password = "123456"table-names = ["ame.register"]table-names-config = [{table = "ame.register"primaryKeys = ["customer_unique_id"]}]result_table_name = "table4"}
}sink {StarRocks {source_table_name = "table1"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_product_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}StarRocks {source_table_name = "table2"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_category_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}StarRocks {source_table_name = "table3"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_seller_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}StarRocks {source_table_name = "table4"nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "dim"table = "dim_register_t"batch_max_rows = 1000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = false}
}

运行命令:

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/dim.config -e local
5.4.2 ods_member_logins_t

实际工作中,有些不需要实时同步的表,都可以配置成离线的方式,每天调度一次,抽取增量的数据即可

(1)建表

建在fact库中,同样使用unique模型,分区存储。

CREATE TABLE fact.`ods_member_logins_t` (`customer_unique_id` varchar(255) NOT NULL,`login_time` datetime NOT NULL,`dt` date NOT NULL COMMENT "使用login_time作为分区时间",`ip_address` varchar(50)
) ENGINE=OLAP
UNIQUE KEY(`customer_unique_id`, `login_time`, `dt`)
COMMENT "用户登录表"
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`customer_unique_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "1"
);

(2)数据同步

因为是离线同步,所以不能像实时一样,同步完历史数据,可以接着同步新数据。这里的做法是,先同步全量数据,然后再每天同步增量数据

1)全量数据同步

在 /export/server/apache-seatunnel-2.3.5/config/job 下新建一个 member.config 文件。

注意:因为可以在source中写query,所以,可以直接将构建dt字段的计算放在source中。

env {job.name = "loadData"execution.parallelism = 1job.mode = "BATCH"
}source {Jdbc {url = "jdbc:mysql://node1:3306/ame?useUnicode=true&characterEncoding=utf8&useSSL=false"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "123456"query = "select *, date(login_time) as dt from ame.logins"}
}sink {StarRocks {nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_member_logins_t"batch_max_rows = 10000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}
}

运行命令

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/member.config -e local

2)增量同步

在实际工作中,每天需要进行增量同步,可以需要通过传参的方式来同步增量数据。

增量同步的方式:

①在query的where条件里进行判断,只当天的数据

通过在 SeaTunnel 的配置中声明一个变量,然后在运行时动态替换该变量的值,实现同步增量数据的需求。

全量任务运行完后,需要将member.config 文件修改成如下:

env {job.name = "loadData"execution.parallelism = 1job.mode = "BATCH"
}source {Jdbc {url = "jdbc:mysql://node1:3306/ame?useUnicode=true&characterEncoding=utf8&useSSL=false"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "123456"query = "select *, date(login_time) as dt from ame.logins where login_time >= '"${date}"' and login_time < date_add('"${date}"', interval 1 day)"}
}sink {StarRocks {nodeUrls = ["node1:8130"]base-url = "jdbc:mysql://node1:9030/"username = rootpassword = "123456"database = "fact"table = "ods_member_logins_t"batch_max_rows = 10000starrocks.config = {format = "JSON"strip_outer_array = true}enable_upsert_delete = true}
}

运行的命令变成:

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/member.config -e local -i date=2025-01-01

下面可以做个测试:

在MySQL中插入一条数据,模拟2019年10月17日的新增数据。

insert into logins values ('0000366f3b9a7992bf8c76cfdf3221e2', '2019-10-17 15:40:41', '126.148.110.105');

然后运行同步命令:

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/member.config -e local -i date=2019-10-17

然后查询fact.ods_member_logins_t表是否有这条数据。

select * from fact.ods_member_logins_t
where customer_unique_id = '0000366f3b9a7992bf8c76cfdf3221e2';

结果如下:
在这里插入图片描述

说明这种方式是可以的,以后每天只需要替换日期,定时运行即可

5.5 DWD开发

5.5.1 dim_register_mv 会员注册物化视图

跟其他维表一样,构造成拉链表。

CREATE MATERIALIZED VIEW IF NOT EXISTS dim.`dim_register_mv`
DISTRIBUTED BY HASH(`customer_unique_id`)
REFRESH ASYNC
as
selectcustomer_unique_id,update_time,customer_city,customer_state,register_date,name,birthday,phone,gender,update_time as start_time,LEAD(update_time, 1, '9999-12-31 00:00:00') OVER (PARTITION BY customer_unique_id ORDER BY update_time) as end_time
from `dim`.`dim_register_t`;
5.5.2 dwd_member_order_mv 会员订单物化视图

注意点:需要一张订单粒度的宽表,将其他表的信息整合起来,所以还是以orders表作为主表。在关联1:n的表时,需要先聚合再进行关联。

CREATE MATERIALIZED VIEW IF NOT EXISTS fact.`dwd_member_order_mv`
PARTITION BY date_trunc('day', `dt`)
DISTRIBUTED BY HASH(`order_id`)
REFRESH ASYNC START('2025-01-01 01:10:00') EVERY (interval 4 hour)
PROPERTIES ("excluded_trigger_tables" = "dim.dim_register_mv"
)
as
selecto.order_id,o.customer_id,o.order_status,o.order_purchase_timestamp,o.order_approved_at,o.order_delivered_carrier_date,o.order_delivered_customer_date,o.order_estimated_delivery_date,c.customer_unique_id,c.customer_zip_code_prefix,c.customer_city,c.customer_state,ifnull(oa.other_amount, 0) as other_amount,pm.payment_times,pm.payment_value,ifnull(r.rewiews_num, 0) as rewiews_num,ifnull(r.review_score, 5) as review_score,ifnull(i.product_num, 1) as product_num,ifnull(i.product_value, pm.payment_value * 0.858) as product_value,ifnull(i.freight_value, pm.payment_value * 0.142) as freight_value,oc.product_cost,oc.freight_cost,rg.name,rg.register_date,rg.birthday,rg.gender,o.dt,date_trunc('month', o.dt) as month_day,date(date_sub(o.dt, dayofweek(o.dt)-1)) as week_dayfrom fact.ods_sale_orders_t o
inner join fact.ods_sale_customer_t c on o.customer_id = c.customer_id
left join fact.ods_sale_other_amount_t oa on o.order_id = oa.order_id
inner join (select order_id, count(1) as payment_times, sum(payment_value) as payment_value from fact.ods_sale_payment_t group by order_id) pmon o.order_id = pm.order_id
left join (select order_id, count(1) as rewiews_num, avg(review_score) as review_score from fact.ods_sale_review_t group by order_id) ron o.order_id = r.order_id
left join (select order_id, count(1) as product_num, sum(price) as product_value, sum(freight_value) as freight_valuefrom fact.ods_sale_item_t group by order_id) ion o.order_id = i.order_id
inner join (select order_id, sum(product_cost) as product_cost, sum(freight_cost)  as freight_costfrom fact.ods_sale_orders_cost_t group by order_id) ocon o.order_id = oc.order_id
inner join dim.dim_register_mv rgon c.customer_unique_id = rg.customer_unique_idand c.order_purchase_timestamp >= rg.start_time and c.order_purchase_timestamp < rg.end_time
;
5.5.3 dwd_member_first_buy_t 会员首单表

(1)建表

注意:因为是离线更新,写少查多,所以可以设置为PRIMARY模型

create table if not exists fact.dwd_member_first_buy_t
(customer_unique_id varchar(255),customer_zip_code_prefix int,customer_city varchar(255),customer_state varchar(255),name varchar(100),register_date datetime,birthday date,gender varchar(10),order_id varchar(255),customer_id varchar(255),order_status varchar(30),order_purchase_timestamp datetime comment '订单购买时间戳',order_approved_at datetime comment '付款审批时间戳',order_delivered_carrier_date datetime comment '订单发给物流商的时间戳',order_delivered_customer_date datetime comment '订单实际交付给客户的时间戳',order_estimated_delivery_date datetime comment '在购买时告知客户的预计交货时间',other_amount decimal(10,2) comment '其他金额',payment_times int comment '支付次数',payment_value decimal(10,2) comment '支付金额',rewiews_num int comment '评论次数',review_score decimal(10,2) comment '评论分数',product_num int comment '该单购买的商品样数',product_value decimal(10,2) comment '商品价值',freight_value decimal(10,2) comment '运费',product_cost decimal(10,2) comment '商品成本',freight_cost decimal(10,2) comment '运费成本',dt date,month_day date,week_day date
)
PRIMARY KEY(customer_unique_id)
COMMENT '会员首单表'
DISTRIBUTED BY HASH(customer_unique_id)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"replication_num" = "1"
);

(2)计算

使用开窗函数即可

insert into fact.dwd_member_first_buy_t
selectcustomer_unique_id,customer_zip_code_prefix,customer_city,customer_state,name,register_date,birthday,gender,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,other_amount,payment_times,payment_value,rewiews_num,review_score,product_num,product_value,freight_value,product_cost,freight_cost,dt,month_day,week_day
from (select *, row_number() over(partition by customer_unique_id order by order_purchase_timestamp) as rnfrom fact.dwd_member_order_mv) t
where rn = 1;
5.5.4 dwd_member_logins_t 会员登录表

(1)建表

因为是离线更新,写少查多,所以可以设置为PRIMARY模型。

create table if not exists fact.dwd_member_logins_t
(customer_unique_id varchar(255) NOT NULL,login_time datetime NOT NULL,dt date NOT NULL COMMENT "使用login_time作为分区时间",ip_address varchar(50),customer_city varchar(255),customer_state varchar(255), register_date datetime,name varchar(100),birthday date,gender varchar(10),month_day date,week_day date
)
PRIMARY KEY(customer_unique_id,login_time,dt)
COMMENT '会员登录表'
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(customer_unique_id)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"replication_num" = "1"
);

(2)计算

insert into fact.dwd_member_logins_t
selectlg.customer_unique_id,lg.login_time,lg.dt,lg.ip_address,rg.customer_city,rg.customer_state,rg.register_date,rg.name,rg.birthday,rg.gender,date_trunc('month', lg.dt) as month_day,date(date_sub(lg.dt, dayofweek(lg.dt)-1)) as week_day
from fact.ods_member_logins_t lg
inner join dim.dim_register_mv rgon lg.customer_unique_id = rg.customer_unique_idand lg.login_time >= rg.start_time and lg.login_time < rg.end_time
where lg.dt = '${query_date}'

首先是全量运行,将where条件去掉即可。

跑完全量运行的任务后,以后每天运行,将query_date替换成相应的日期就可以了。

5.6 DWS开发

dws_member_city_analysis_day_t 每天城市会员分析表

(1)建表

因为是离线更新,写少查多,所以可以设置为PRIMARY模型。

create table if not exists fact.dws_member_city_analysis_day_t
(dt date,customer_city varchar(255), customer_state varchar(255),month_day date,week_day date,order_num int comment '销售单量',product_value decimal(10,2) comment '商品金额',freight_value decimal(10,2) comment '运费金额',other_amount decimal(10,2) comment '其他金额',payment_value decimal(10,2) comment '支付金额',product_cost decimal(10,2) comment '商品成本',freight_cost decimal(10,2) comment '运费成本',total_cost decimal(10,2) comment '总成本',gross_profit decimal(10,2) comment '毛利润',net_profit decimal(10,2) comment '净利润',cancel_order_num int comment '取消销售单量',cancel_product_value decimal(10,2) comment '取消商品金额',cancel_freight_value decimal(10,2) comment '取消运费金额',cancel_other_amount decimal(10,2) comment '取消其他金额',cancel_payment_value decimal(10,2) comment '取消支付金额',cancel_product_cost decimal(10,2) comment '取消商品成本',cancel_freight_cost decimal(10,2) comment '取消运费成本',buy_num int comment '消费人数',rewiews_num int comment '总评论次数',review_score decimal(10,2) comment '平均评论分数',register_num int comment '注册人数',login_num int comment '登录人数',first_order_num int comment '首单人数'
)
PRIMARY KEY(dt,customer_city)
COMMENT '每天城市会员分析表'
PARTITION BY(dt)
DISTRIBUTED BY HASH(customer_city)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"replication_num" = "1"
);

(2)需求分析

从fact.dwd_member_order_mv中获取消费信息和评论信息,

从dim.dim_register_mv中获取注册信息,

从fact.dwd_member_logins_t中获取登录信息,

从fact.dwd_member_first_buy_t中获取首单信息。

计算完指标后再进行合并即可。

(3)计算

insert into fact.dws_member_city_analysis_day_t
with sale as (select dt, customer_city, customer_state, month_day, week_day, count(order_id)                                                      as order_num, round(sum(product_value), 2)                                         as product_value, round(sum(freight_value), 2)                                         as freight_value, round(sum(other_amount), 2)                                          as other_amount, round(sum(payment_value), 2)                                         as payment_value, round(sum(product_cost), 2)                                          as product_cost, round(sum(freight_cost), 2)                                          as freight_cost, round(sum(product_cost) + sum(freight_cost), 2)                      as total_cost, round(sum(product_value) - sum(product_cost), 2)                     as gross_profit, round(sum(payment_value) - sum(product_cost) - sum(freight_cost), 2) as net_profit, count(if(order_status = 'canceled', order_id, null))                 as cancel_order_num, round(sum(if(order_status = 'canceled', product_value, 0)), 2)       as cancel_product_value, round(sum(if(order_status = 'canceled', freight_value, 0)), 2)       as cancel_freight_value, round(sum(if(order_status = 'canceled', other_amount, 0)), 2)        as cancel_other_amount, round(sum(if(order_status = 'canceled', payment_value, 0)), 2)       as cancel_payment_value, round(sum(if(order_status = 'canceled', product_cost, 0)), 2)        as cancel_product_cost, round(sum(if(order_status = 'canceled', freight_cost, 0)), 2)        as cancel_freight_cost, count(distinct customer_unique_id)                                   as buy_num, sum(rewiews_num)                                                     as rewiews_num, avg(review_score)                                                    as review_score, 0                                                                    as register_num, 0                                                                    as login_num, 0                                                                    as first_order_numfrom fact.dwd_member_order_mvwhere dt = '${query_date}'group by dt, customer_city, customer_state, month_day, week_day), rg as (selectdate(register_date) as dt,customer_city,customer_state,date(date_trunc('month', register_date)) as month_day,date(date_sub(register_date, dayofweek(register_date)-1)) as week_day,0 as order_num,0 as product_value,0 as freight_value,0 as other_amount,0 as payment_value,0 as product_cost,0 as freight_cost,0 as total_cost,0 as gross_profit,0 as net_profit,0 as cancel_order_num,0 as cancel_product_value,0 as cancel_freight_value,0 as cancel_other_amount,0 as cancel_payment_value,0 as cancel_product_cost,0 as cancel_freight_cost,0 as buy_num,0 as rewiews_num,0 as review_score,sum(1) as register_num,0 as login_num,0 as first_order_numfrom dim.dim_register_mvwhere end_time = '9999-12-31 00:00:00' and date(register_date) = '${query_date}'group byregister_date,customer_city,customer_state), lg as (selectdt,customer_city,customer_state,date(date_trunc('month', dt)) as month_day,date(date_sub(dt, dayofweek(dt)-1)) as week_day,0 as order_num,0 as product_value,0 as freight_value,0 as other_amount,0 as payment_value,0 as product_cost,0 as freight_cost,0 as total_cost,0 as gross_profit,0 as net_profit,0 as cancel_order_num,0 as cancel_product_value,0 as cancel_freight_value,0 as cancel_other_amount,0 as cancel_payment_value,0 as cancel_product_cost,0 as cancel_freight_cost,0 as buy_num,0 as rewiews_num,0 as review_score,0 as register_num,count(distinct customer_unique_id) as login_num,0 as first_order_numfrom fact.dwd_member_logins_twhere dt = '${query_date}'group bydt,customer_city,customer_state), ft as (selectdt,customer_city,customer_state,date(date_trunc('month', dt)) as month_day,date(date_sub(dt, dayofweek(dt)-1)) as week_day,0 as order_num,0 as product_value,0 as freight_value,0 as other_amount,0 as payment_value,0 as product_cost,0 as freight_cost,0 as total_cost,0 as gross_profit,0 as net_profit,0 as cancel_order_num,0 as cancel_product_value,0 as cancel_freight_value,0 as cancel_other_amount,0 as cancel_payment_value,0 as cancel_product_cost,0 as cancel_freight_cost,0 as buy_num,0 as rewiews_num,0 as review_score,0 as register_num,0 as login_num,sum(1) as first_order_numfrom fact.dwd_member_first_buy_twhere dt = '${query_date}'group bydt,customer_city,customer_state)
selectdt,customer_city,customer_state,month_day,week_day,sum(order_num) as order_num,sum(product_value) as product_value,sum(freight_value) as freight_value,sum(other_amount) as other_amount,sum(payment_value) as payment_value,sum(product_cost) as product_cost,sum(freight_cost) as freight_cost,sum(total_cost) as total_cost,sum(gross_profit) as gross_profit,sum(net_profit) as net_profit,sum(cancel_order_num) as cancel_order_num,sum(cancel_product_value) as cancel_product_value,sum(cancel_freight_value) as cancel_freight_value,sum(cancel_other_amount) as cancel_other_amount,sum(cancel_payment_value) as cancel_payment_value,sum(cancel_product_cost) as cancel_product_cost,sum(cancel_freight_cost) as cancel_freight_cost,sum(buy_num) as buy_num,sum(rewiews_num) as rewiews_num,sum(review_score) as review_score,sum(register_num) as register_num,sum(login_num) as login_num,sum(first_order_num) as first_order_num
from(select * from saleunion all select * from rgunion all select * from lgunion all select * from ft) t
group bydt,customer_city,customer_state,month_day,week_day
;

首先是全量运行,将where条件去掉即可。

跑完全量运行的任务后,以后每天运行,将query_date替换成相应的日期就可以了。

5.7 ADS开发

ads_member_state_analysis_day_t 每天州会员分析表

(1)建表

因为是离线更新,写少查多,所以可以设置为PRIMARY模型。

create table if not exists fact.ads_member_state_analysis_day_t
(dt date,customer_state varchar(255),month_day date,week_day date,order_num int comment '销售单量',product_value decimal(10,2) comment '商品金额',freight_value decimal(10,2) comment '运费金额',other_amount decimal(10,2) comment '其他金额',payment_value decimal(10,2) comment '支付金额',product_cost decimal(10,2) comment '商品成本',freight_cost decimal(10,2) comment '运费成本',total_cost decimal(10,2) comment '总成本',gross_profit decimal(10,2) comment '毛利润',net_profit decimal(10,2) comment '净利润',cancel_order_num int comment '取消销售单量',cancel_product_value decimal(10,2) comment '取消商品金额',cancel_freight_value decimal(10,2) comment '取消运费金额',cancel_other_amount decimal(10,2) comment '取消其他金额',cancel_payment_value decimal(10,2) comment '取消支付金额',cancel_product_cost decimal(10,2) comment '取消商品成本',cancel_freight_cost decimal(10,2) comment '取消运费成本',buy_num int comment '消费人数',rewiews_num int comment '总评论次数',review_score decimal(10,2) comment '平均评论分数',register_num int comment '注册人数',login_num int comment '登录人数',first_order_num int comment '首单人数'
)
PRIMARY KEY(dt, customer_state)
COMMENT '每天州会员分析表'
PARTITION BY(dt)
DISTRIBUTED BY HASH(customer_state)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"replication_num" = "1"
);

(2)计算

insert into fact.ads_member_state_analysis_day_t
selectdt,customer_state,month_day,week_day,sum(order_num) as order_num,sum(product_value) as product_value,sum(freight_value) as freight_value,sum(other_amount) as other_amount,sum(payment_value) as payment_value,sum(product_cost) as product_cost,sum(freight_cost) as freight_cost,sum(total_cost) as total_cost,sum(gross_profit) as gross_profit,sum(net_profit) as net_profit,sum(cancel_order_num) as cancel_order_num,sum(cancel_product_value) as cancel_product_value,sum(cancel_freight_value) as cancel_freight_value,sum(cancel_other_amount) as cancel_other_amount,sum(cancel_payment_value) as cancel_payment_value,sum(cancel_product_cost) as cancel_product_cost,sum(cancel_freight_cost) as cancel_freight_cost,sum(buy_num) as buy_num,sum(rewiews_num) as rewiews_num,sum(review_score) as review_score,sum(register_num) as register_num,sum(login_num) as login_num,sum(first_order_num) as first_order_num
from fact.dws_member_city_analysis_day_t
where dt = '${query_date}'
group by  dt,customer_state,month_day,week_day
;

6、Kettle及调度【掌握】

6.1 Ktttle简介

在会员主题数仓开发部分,遇到了离线作业。这些作业跟使用MySQL CDC的实时作业,以及StarRocks的物化视图不一样。它需要我们定时去运行。这些作业每天手动去执行肯定是不现实的,一般是通过调度工具来完成。

在大数据领域常见的调度工具有很多,比如DolphinScheduler,Azkaban,Airflow,Oozie等等。每种工具都有其优缺点,对比如下。相对来说,Kettle更轻量化一些,通过可视化操作就可以达到调度的功能,还可以进行复杂的ETL操作。所以本文使用Kettle来进行调度。
在这里插入图片描述

Kettle 是一款国外开源的 ETL 工具,纯 java 编写,可以在 Window、Linux、Unix 上运行,绿色无需安装,数据抽取高效稳定

6.2 Ktttle安装

详见拓展中的 win版本kettle环境安装.pdf

6.3 调度

6.3.1 设置参数

因为我们希望实现的效果是,每天作业定时调度,并且自动传参。每天凌晨开始调度,传入的参数根据时间自动调整成昨天的日期。比如今天是2025-03-02,参数就为2025-03-01。

为了实现自动生成固定格式的日期参数,需要使用“获取系统信息”。

1)首先新建一个转换

2)新建数据库连接。

在主对象树种,右键DB连接,点击新建。

在这里插入图片描述

设置连接名称为fact。主机名称为node1(StarRocks的主机名),数据库名称为fact,端口号为9030,用户名为root,密码为123456。

点击测试,没问题后点击确定。

然后右键数据库图标,设置成共享。
在这里插入图片描述

3)设置日期参数

在“输入”中拖拽一个“表输入”

将步骤名称改成 设置日期参数,数据库连接选择fact,SQL改成

select date_format(date_sub(current_date, 1), ‘yyyy-MM-dd’) as yesterday

点击预览,没问题后点击确定。
在这里插入图片描述

4)然后在“作业”中拖拽一个“设置变量”,按住shift将两者连在一起

5)双击设置变量,点击获取字段,将变量名设置为query_date
在这里插入图片描述

6)保存转换

设置名称为 设置日期参数

6.3.2 设置调度作业

1)新建一个作业

2)在“通用”中拖拽一个“Start”

3)在“通用”中拖拽一个“转换”,右键编辑作业入口,设置名称为 设置日期参数,点击浏览选择刚刚配置的转换,点击确定。

然后将Start和这个转换连接起来。
在这里插入图片描述

4)保存作业

设置名称为 member

6.3.3 设置SeaTunnel任务

1)因为SeaTunnel任务在Linux上,而调度Linux上的任务需要使用SSH,SSH在转换中,所以需要新建一个转换。

2)在“应用”中拖拽一个“运行SSH命令”

3)双击打开,设置连接参数

设置step name为 离线增量同步

Server name为 node1

Server port 为 22

Username 为 root

Password 为 123456

取消勾选 Use key

点击 Test connect,显示成功后则没问题。
在这里插入图片描述

4)设置脚本内容

点击Settings

因为这里不能使用Kettle流程里的参数, 所以使用linux中的参数即可。

query_date=$(date -d “yesterday” +“%Y-%m-%d”)

为了将命令和参数结合起来,将下面的命令写成脚本。

cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/member.config -e local -i date=2019-10-17

在Linux中创建脚本:

cd /export/server/apache-seatunnel-2.3.5/
mkdir shelljob
touch member.sh
chmod +x member.sh

脚本内容如下:

#!/bin/bashsource /etc/profile# 计算昨天的日期
query_date=$(date -d "yesterday" +"%Y-%m-%d")# 切换到指定目录
cd /export/server/apache-seatunnel-2.3.5/ || exit 1# 运行 Seatunnel 并将输出重定向到日志文件
nohup ./bin/seatunnel.sh --config ./config/job/member.config -e local -i date=$query_date > seatunnel.log 2>&1

对脚本的解释如下:

  • #!/bin/bash:指定脚本使用 Bash 解释器运行
  • source /etc/profile 同步环境变量【因为是SSH调用,所以必须加上
  • query_date=$(date -d “yesterday” +“%Y-%m-%d”):获取昨天的日期。
  • cd /export/server/apache-seatunnel-2.3.5/ || exit 1:
    • 先 cd 到目标目录。
    • || exit 1:如果 cd 失败,退出脚本,防止后续命令错误执行。
  • nohup … > seatunnel.log 2>&1:
    • nohup 让任务在后台运行,即使关闭终端也不会终止。
    • > seatunnel.log 2>&1 记录日志,包含标准输出和错误信息。
    • 注意:最后不加&,即不让任务在后台运行,从而阻塞终端。使任务跑完后再结束这个shell任务

然后在kettle中,只需要调用这个脚本即可。

在commands中填写

sh /export/server/apache-seatunnel-2.3.5/shelljob/member.sh

在这里插入图片描述

5)保存转换

设置名称为SeaTunnel任务,运行测试一下。

6)将SeaTunnel任务加到member任务中

在这里插入图片描述

6.3.4 设置SQL任务

数据同步完成后,就可以执行SQL任务了。

(1)fact.dwd_member_first_buy_t

从“脚本”中拖拽一个“SQL”,然后双击进行修改。

因为fact.dwd_member_first_buy_t依赖fact.dwd_member_order_mv,为防止fact.dwd_member_order_mv没有刷新成最新的状态,这里手动进行刷新一下。

REFRESH MATERIALIZED VIEW fact.dwd_member_order_mv;

在这里插入图片描述

修改完成后,将SeaTunnel任务和dwd_member_order_mv连接起来。

(2)fact.dwd_member_logins_t

从“脚本”中拖拽一个“SQL”,然后双击进行修改。

这张表计算时不再是全量计算,所以需要配置参数,只需勾选使用变量替换即可
在这里插入图片描述

配置完成后,连接起来即可。

(3)fact.dws_member_city_analysis_day_t和fact.ads_member_state_analysis_day_t

配置方式和fact.dwd_member_logins_t相同。

最终配置完成后的job如图所示。
在这里插入图片描述

6.3.5 设置定时

右键Start,点击编辑作业入口
在这里插入图片描述

勾选重复,类型设置为天,每天2点开始调度即可。
在这里插入图片描述

配置完成后,点击开始,则每天会定时进行调度。

为了测试配置的是否正确。可以将2点设置成即将到来的时间,看能否正常运行。比如现在是13:10,可以设置成13:13,然后查看调度情况。

经过测试,没有问题后,再改回2点即可。

一般kettle会部署到公司的专用电脑上,供开发人员使用。

http://www.lryc.cn/news/577241.html

相关文章:

  • VMware vSphere 9与ESXi 9正式发布:云原生与AI驱动的虚拟化平台革新
  • QT控件 使用Font Awesome开源图标库修改QWidget和QML两种界面框架的控件图标
  • Maven 中,dependencies 和 dependencyManagement
  • 基于C++实现 bp 神经网络的手写数字识别
  • 【LeetCode 热题 100】239. 滑动窗口最大值——(解法一)滑动窗口+暴力解
  • 从0开始学习计算机视觉--Day06--反向传播算法
  • 【FR801xH】富芮坤FR801xH之PMU GPIO
  • Stable Diffusion 项目实战落地:从0到1 掌握ControlNet 第三篇: 打造光影字形的创意秘技-文字与自然共舞
  • [面试] js手写题-树转数组
  • 文心大模型 4.5 系列开源首发:技术深度解析与应用指南
  • uni-app使用uview2自定义tabber
  • PHP 全面解析:从入门到实践的服务器端脚本语言
  • 计算机网络中那些常见的路径搜索算法(一)——DFS、BFS、Dijkstra
  • Qt Quick 与 QML(四)qml中的Delegate系列委托组件
  • Python I/O 库 包 iopath
  • ExGeo代码理解(七)main.py(运行模型进行训练和测试)
  • 生成式人工智能实战 | 变分自编码器(Variational Auto-Encoder, VAE)
  • 如何让Excel自动帮我们算加减乘除?
  • PHP语法基础篇(七):函数
  • 电脑开机加速工具,优化启动项管理
  • 深入比较 Gin 与 Beego:Go Web 框架的两大选择
  • 深度学习04 卷积神经网络CNN
  • 国科大深度学习作业2-基于 ViT 的 CIFAR10 图像分类
  • 工业级PHP任务管理系统开发:模块化设计与性能调优实践
  • DBeaver 设置阿里云中央仓库地址的操作步骤
  • 提示技术系列——链式提示
  • 数据结构入门-图的基本概念与存储结构
  • 【软考高项论文】论信息系统项目的干系人管理
  • 利用不坑盒子的Copilot,快速排值班表
  • upload-labs靶场通关详解:第15-16关