当前位置: 首页 > news >正文

物流实时数仓:数仓搭建(DWD)一

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一


文章目录

  • 系列文章目录
  • 前言
  • 一、文件编写
    • 1.目录创建
    • 2.bean文件
      • 1.DwdOrderDetailOriginBean
      • 2.DwdOrderInfoOriginBean
      • 3.DwdTradeCancelDetailBean
      • 4.DwdTradeOrderDetailBean
      • 5.DwdTradePaySucDetailBean
      • 6.DwdTransBoundFinishDetailBean
      • 7.DwdTransDeliverSucDetailBean
      • 8.DwdTransDispatchDetailBean
      • 9.DwdTransReceiveDetailBean
      • 10.DwdTransSignDetailBean
    • 3.DwdOrderRelevantApp
  • 二、代码测试
    • 1.环境启动
    • 2.kafka消费者
    • 3.修改配置
    • 4.测试结果
  • 总结


前言

这次博客我们进行DWD层的搭建,内容比较多,一次可能写不完。
在这里插入图片描述
以上就是本次博客需要完成的内容,简单来说就是,从kafka读取数据,然后根据不同的关键字,将其从主流中进行分离,然后在写入各自的kafka中以便后续的操作


一、文件编写

1.目录创建

我们现在beans中创建后边需要的的bean
在这里插入图片描述
然后在dwd目录中创建此次需要的app
在这里插入图片描述

2.bean文件

1.DwdOrderDetailOriginBean

package com.atguigu.tms.realtime.beans;import lombok.Data;import java.math.BigDecimal;/***订单货物明细实体类*/@Data
public class DwdOrderDetailOriginBean {// 编号(主键)String id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumnLength;// 宽cmInteger volumnWidth;// 高cmInteger volumnHeight;// 重量 kgBigDecimal weight;// 创建时间String createTime;// 更新时间String updateTime;// 是否删除String isDeleted;
}

2.DwdOrderInfoOriginBean

package com.atguigu.tms.realtime.beans;import lombok.Data;import java.math.BigDecimal;/*** 订单实体类*/
@Data
public class DwdOrderInfoOriginBean {// 编号(主键)String id;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间Long estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 创建时间String createTime;// 更新时间String updateTime;// 是否删除String isDeleted;
}

3.DwdTradeCancelDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/*** 交易域:取消运单事务事实表实体类*/
@Data
public class DwdTradeCancelDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 取消时间String cancelTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.cancelTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

4.DwdTradeOrderDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/***交易域:下单事务事实表实体类*/
@Data
public class DwdTradeOrderDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 下单时间String orderTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;this.orderTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(detailOriginBean.createTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(detailOriginBean.createTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;}
}

5.DwdTradePaySucDetailBean

package com.atguigu.tms.realtime.beans;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/***交易域:支付成功事务事实表实体类*/
@Data
public class DwdTradePaySucDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 支付时间String payTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.payTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

6.DwdTransBoundFinishDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;
/***物流域:转运完成事务事实表实体类*/
@Data
public class DwdTransBoundFinishDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 发单时间String boundFinishTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.boundFinishTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

7.DwdTransDeliverSucDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;
/***物流域:派送成功事务事实表实体类*/
@Data
public class DwdTransDeliverSucDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 派送成功时间String deliverTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.deliverTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

8.DwdTransDispatchDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;import lombok.Data;import java.math.BigDecimal;/***物流域:发单事务事实表实体类*/
@Data
public class DwdTransDispatchDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 发单时间String dispatchTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.dispatchTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

9.DwdTransReceiveDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/***物流域:揽收(接单)事务事实表实体类*/
@Data
public class DwdTransReceiveDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 揽收时间String receiveTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.receiveTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

10.DwdTransSignDetailBean

package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/*** 物流域:签收事务事实表实体类*/
@Data
public class DwdTransSignDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 签收时间String signTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.signTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}

3.DwdOrderRelevantApp

package com.atguigu.tms.realtime.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.*;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class DwdOrderRelevantApp {public static void main(String[] args) throws Exception {// 1.环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2.从Kafka读数据String topic = "tms_ods";String groupId = "dwd_order_relevant_group";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 3.筛选订单和订单明细数据SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter((FilterFunction<String>) jsonStr -> {JSONObject jsonObj = JSON.parseObject(jsonStr);String tableName = jsonObj.getJSONObject("source").getString("table");return "order_info".equals(tableName) || "order_cargo".equals(tableName);});
//        filterDS.print(">>>");// 4.对流中的数据类型进行转换 jsonStr->jsonObjSingleOutputStreamOperator<JSONObject> jsonObjDS = filterDS.map((MapFunction<String, JSONObject>) jsonStr -> {JSONObject jsonObj = JSON.parseObject(jsonStr);String tableName = jsonObj.getJSONObject("source").getString("table");jsonObj.put("table", tableName);jsonObj.remove("source");jsonObj.remove("transaction");return jsonObj;});//        jsonObjDS.print(">>>");// 5.按照order_id进行分组KeyedStream<JSONObject, String> keyDS = jsonObjDS.keyBy((KeySelector<JSONObject, String>) jsonObj -> {String table = jsonObj.getString("table");if ("order_info".equals(table)) {return jsonObj.getJSONObject("after").getString("id");}return jsonObj.getJSONObject("after").getString("order_id");});
//        keyDS.print(">>>");// 6.定义侧输出流标签 下单放到主流,支付成功、取消运单、揽收(接单)、发单 转运完成、派送成功、签收放到侧输出流// 支付成功明细流标签OutputTag<String> paySucTag = new OutputTag<String>("dwd_trade_pay_suc_detail") {};// 取消运单明细流标签OutputTag<String> cancelDetailTag = new OutputTag<String>("dwd_trade_cancel_detail") {};// 揽收明细流标签OutputTag<String> receiveDetailTag = new OutputTag<String>("dwd_trans_receive_detail") {};// 发单明细流标签OutputTag<String> dispatchDetailTag = new OutputTag<String>("dwd_trans_dispatch_detail") {};// 转运完成明细流标签OutputTag<String> boundFinishDetailTag = new OutputTag<String>("dwd_trans_bound_finish_detail") {};// 派送成功明细流标签OutputTag<String> deliverSucDetailTag = new OutputTag<String>("dwd_trans_deliver_detail") {};// 签收明细流标签OutputTag<String> signDetailTag = new OutputTag<String>("dwd_trans_sign_detail") {};// 7.分流SingleOutputStreamOperator<String> orderDetailDS = keyDS.process(new KeyedProcessFunction<String, JSONObject, String>() {private ValueState<DwdOrderInfoOriginBean> infoBeanState;private ValueState<DwdOrderDetailOriginBean> detailBeanState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<DwdOrderInfoOriginBean> InfoOriginBeanStateDescriptor= new ValueStateDescriptor<>("infoBeanState", DwdOrderInfoOriginBean.class);InfoOriginBeanStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(5)).build());infoBeanState = getRuntimeContext().getState(InfoOriginBeanStateDescriptor);ValueStateDescriptor detailBeanStateDescriptor= new ValueStateDescriptor<>("detailBeanState", DwdOrderDetailOriginBean.class);detailBeanState = getRuntimeContext().getState(detailBeanStateDescriptor);}@Overridepublic void processElement(JSONObject jsonObj, KeyedProcessFunction<String, JSONObject, String>.Context ctx, Collector<String> out) throws Exception {String table = jsonObj.getString("table");String op = jsonObj.getString("op");JSONObject data = jsonObj.getJSONObject("after");if ("order_info".equals(table)) {//处理的是订单数据DwdOrderInfoOriginBean infoOriginBean = data.toJavaObject(DwdOrderInfoOriginBean.class);// 脱敏String senderName = infoOriginBean.getSenderName();String receiverName = infoOriginBean.getReceiverName();senderName = senderName.charAt(0) + senderName.substring(1).replaceAll(".", "\\*");receiverName = receiverName.charAt(0) + receiverName.substring(1).replaceAll(".", "\\*");infoOriginBean.setSenderName(senderName);infoOriginBean.setReceiverName(receiverName);DwdOrderDetailOriginBean detailOriginBean = detailBeanState.value();if ("c".equals(op)) {// 下单操作if (detailOriginBean == null) {// 订单数据 比明细数据先到,将订单数据放到状态中infoBeanState.update(infoOriginBean);} else {// 说明订单数据来之前,明细数据已经来到了,直接关联DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);// 将下单业务过程数据 放到主流中out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));}} else if ("u".equals(op) && detailOriginBean != null) {// 其它操作// 获取修改前的数据JSONObject oldData = jsonObj.getJSONObject("before");// 获取修改前的状态值String oldStatus = oldData.getString("status");String status = infoOriginBean.getStatus();if (!oldStatus.equals(status)) {// 说明修改的是status字段String changeLog = oldStatus + " -> " + status;switch (changeLog) {case "60010 -> 60020":// 处理支付成功数据DwdTradePaySucDetailBean dwdTradePaySucDetailBean = new DwdTradePaySucDetailBean();dwdTradePaySucDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(paySucTag, JSON.toJSONString(dwdTradePaySucDetailBean));break;case "60020 -> 60030":// 处理揽收明细数据DwdTransReceiveDetailBean dwdTransReceiveDetailBean = new DwdTransReceiveDetailBean();dwdTransReceiveDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(receiveDetailTag, JSON.toJSONString(dwdTransReceiveDetailBean));break;case "60040 -> 60050":// 处理发单明细数据DwdTransDispatchDetailBean dispatchDetailBean = new DwdTransDispatchDetailBean();dispatchDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(dispatchDetailTag, JSON.toJSONString(dispatchDetailBean));break;case "60050 -> 60060":// 处理转运完成明细数据DwdTransBoundFinishDetailBean boundFinishDetailBean = new DwdTransBoundFinishDetailBean();boundFinishDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(boundFinishDetailTag, JSON.toJSONString(boundFinishDetailBean));break;case "60060 -> 60070":// 处理派送成功数据DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = new DwdTransDeliverSucDetailBean();dwdTransDeliverSucDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(deliverSucDetailTag, JSON.toJSONString(dwdTransDeliverSucDetailBean));break;case "60070 -> 60080":// 处理签收明细数据DwdTransSignDetailBean dwdTransSignDetailBean = new DwdTransSignDetailBean();dwdTransSignDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(signDetailTag, JSON.toJSONString(dwdTransSignDetailBean));// 签收后订单数据不会再发生变化,状态可以清除detailBeanState.clear();break;default:if (status.equals("60999")) {DwdTradeCancelDetailBean dwdTradeCancelDetailBean = new DwdTradeCancelDetailBean();dwdTradeCancelDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(cancelDetailTag, JSON.toJSONString(dwdTradeCancelDetailBean));// 取消后订单数据不会再发生变化,状态可以清除detailBeanState.clear();}break;}}}} else {// 处理订单明细DwdOrderDetailOriginBean detailOriginBean = data.toJavaObject(DwdOrderDetailOriginBean.class);if ("c".equals(op)) {detailBeanState.update(detailOriginBean);// 获取状态中存放的订单数据 注意:只有下单操作,并且订单数据先到,明细数据后到的情况,才会从状态中拿到订单数据DwdOrderInfoOriginBean infoOriginBean = infoBeanState.value();if (infoOriginBean != null) {//属于下单业务过程DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);// 将下单业务过程数据 放到主流中out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));}}}}}).uid("process_data");// 8.从主流中提取侧输出流// 支付成功明细流//8.1 支付成功明细流SideOutputDataStream<String> paySucDS = orderDetailDS.getSideOutput(paySucTag);// 8.2 取消运单明细流SideOutputDataStream<String> cancelDetailDS = orderDetailDS.getSideOutput(cancelDetailTag);// 8.3 揽收明细流SideOutputDataStream<String> receiveDetailDS = orderDetailDS.getSideOutput(receiveDetailTag);// 8.4 发单明细流SideOutputDataStream<String> dispatchDetailDS = orderDetailDS.getSideOutput(dispatchDetailTag);// 8.5 转运成功明细流SideOutputDataStream<String> boundFinishDetailDS = orderDetailDS.getSideOutput(boundFinishDetailTag);// 8.6 派送成功明细流SideOutputDataStream<String> deliverSucDetailDS = orderDetailDS.getSideOutput(deliverSucDetailTag);// 8.7 签收明细流SideOutputDataStream<String> signDetailDS = orderDetailDS.getSideOutput(signDetailTag);// 9.将不同流的数据写到kafka的不同主题中// 9.1.1 交易域下单明细主题String detailTopic = "tms_dwd_trade_order_detail";// 9.1.2 交易域支付成功明细主题String paySucDetailTopic = "tms_dwd_trade_pay_suc_detail";// 9.1.3 交易域取消运单明细主题String cancelDetailTopic = "tms_dwd_trade_cancel_detail";// 9.1.4 物流域接单(揽收)明细主题String receiveDetailTopic = "tms_dwd_trans_receive_detail";// 9.1.5 物流域发单明细主题String dispatchDetailTopic = "tms_dwd_trans_dispatch_detail";// 9.1.6 物流域转运完成明细主题String boundFinishDetailTopic = "tms_dwd_trans_bound_finish_detail";// 9.1.7 物流域派送成功明细主题String deliverSucDetailTopic = "tms_dwd_trans_deliver_detail";// 9.1.8 物流域签收明细主题String signDetailTopic = "tms_dwd_trans_sign_detail";// 9.2 发送数据到 Kafka// 9.2.1 运单明细数据KafkaSink<String> kafkaProducer = KafkaUtil.getKafkaSink(detailTopic, args);orderDetailDS.print("~~");orderDetailDS.sinkTo(kafkaProducer).uid("order_detail_sink");// 9.2.2 支付成功明细数据KafkaSink<String> paySucKafkaProducer = KafkaUtil.getKafkaSink(paySucDetailTopic, args);paySucDS.print("!!");paySucDS.sinkTo(paySucKafkaProducer).uid("pay_suc_detail_sink");// 9.2.3 取消运单明细数据KafkaSink<String> cancelKafkaProducer = KafkaUtil.getKafkaSink(cancelDetailTopic, args);cancelDetailDS.print("@@");cancelDetailDS.sinkTo(cancelKafkaProducer).uid("cancel_detail_sink");// 9.2.4 揽收明细数据KafkaSink<String> receiveKafkaProducer = KafkaUtil.getKafkaSink(receiveDetailTopic, args);receiveDetailDS.print("##");receiveDetailDS.sinkTo(receiveKafkaProducer).uid("reveive_detail_sink");// 9.2.5 发单明细数据KafkaSink<String> dispatchKafkaProducer = KafkaUtil.getKafkaSink(dispatchDetailTopic, args);dispatchDetailDS.print("$$");dispatchDetailDS.sinkTo(dispatchKafkaProducer).uid("dispatch_detail_sink");// 9.2.6 转运完成明细主题KafkaSink<String> boundFinishKafkaProducer = KafkaUtil.getKafkaSink(boundFinishDetailTopic, args);boundFinishDetailDS.print("%%");boundFinishDetailDS.sinkTo(boundFinishKafkaProducer).uid("bound_finish_detail_sink");// 9.2.7 派送成功明细数据KafkaSink<String> deliverSucKafkaProducer = KafkaUtil.getKafkaSink(deliverSucDetailTopic, args);deliverSucDetailDS.print("^^");deliverSucDetailDS.sinkTo(deliverSucKafkaProducer).uid("deliver_suc_detail_sink");// 9.2.8 签收明细数据KafkaSink<String> signKafkaProducer = KafkaUtil.getKafkaSink(signDetailTopic, args);signDetailDS.print("&&");signDetailDS.sinkTo(signKafkaProducer).uid("sign_detail_sink");env.execute();}
}

二、代码测试

1.环境启动

hadoop,zk,kf全部启动
根据流程图可以看到,流程中没有使用到dim层的内容,所以我们不需要启动hbase。

2.kafka消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail

一共需要查看8个消费者主题,你可以开8个窗口,也可以一个一个看,kafka如果没有消费者,会先将数据保存,等待消费,所以不需要8个主题同时消费。

3.修改配置

在这里插入图片描述

4.测试结果

先启动OdsApp和DwdOrderRelevantApp,然后生成模拟数据,之后查看kakfa消费者,有些数据可能要多生成几次才行。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
在这里插入图片描述

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
这个主题是特殊情况,正常可能没有输出。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
在这里插入图片描述

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail
在这里插入图片描述


总结

至此这篇博客的内容结束。

http://www.lryc.cn/news/262033.html

相关文章:

  • MATLAB安装
  • C语言——预处理详解(#define用法+注意事项)
  • Linux(23):Linux 核心编译与管理
  • Oracle RAC环境下redo log 文件的扩容
  • Java入门学习笔记一
  • 分布式块存储 ZBS 的自主研发之旅|元数据管理
  • 六大设计原则
  • dockerfile创建镜像 lNMP+wordpress
  • 深入理解——快速排序
  • 【代码随想录】算法训练计划50
  • 【数据分享】2019-2023年我国区县逐年二手房房价数据(Excel/Shp格式)
  • Redis设计与实现之整数集合
  • [Kubernetes]2. k8s集群中部署基于nodejs golang的项目以及Pod、Deployment详解
  • 讯飞星火大模型api调用
  • TCP与UDP:网络世界中的“顺丰快递”与“广播电台”
  • 升级Xcode15,iOS17后问题解决
  • RabbitMQ搭建集群环境、配置镜像集群、负载均衡
  • leetcode:457. 环形数组是否存在循环
  • Kafka集成springboot
  • Unity中实现ShaderToy卡通火(移植篇)
  • 指针相关知识(进阶)
  • 怎么将文件变为可执行文件
  • 5373. 中等计算
  • 极智一周 | 两系列汇总、MI300X、H100、特供芯片、GPT-4、火灾检测、酷睿Ultra And so on
  • leetcode刷题日志-383赎金信
  • K8s(九)—volume.md
  • python N个人围成一圈报数 报到3出列 直到只剩下最后一人
  • RFC4861 中文版下
  • 用友时空 KSOA 多处SQL注入漏洞复现
  • [AutoSar]基础部分 RTE 介绍