当前位置: 首页 > news >正文

大数据-玩转数据-Flink营销对账

一、说明

在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。

二、思路

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。

三、数据准备

订单数据从OrderLog.csv中读取,交易数据从ReceiptLog.csv中读取
JavaBean类的准备

四、代码

package com.lyh.flink06;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Map;public class Project_Order {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<OrderEvent> orderEventString = env.readTextFile("input/OrderLog.csv").map(line -> {String[] data = line.split(",");return new OrderEvent(Long.valueOf(data[0]),data[1],data[2],Long.valueOf(data[3]));}).filter(log -> "pay".equals(log.getEventType()));SingleOutputStreamOperator<TxEvent> txEventString = env.readTextFile("input/ReceiptLog.csv").map(line -> {String[] data = line.split(",");return new TxEvent(data[0],data[1],Long.valueOf(data[2]));});orderEventString.connect(txEventString).keyBy(OrderEvent::getTxId,TxEvent::getTxId).process(new KeyedCoProcessFunction<String, OrderEvent, TxEvent, String>() {Map<String,OrderEvent> OrderEventmap = new HashMap<>();Map<String,TxEvent> TxEventmap = new HashMap<>();@Overridepublic void processElement1(OrderEvent value,Context ctx,Collector<String> out) throws Exception {TxEvent txEvent = TxEventmap.get(ctx.getCurrentKey());if (txEvent != null) {out.collect("订单" + value.getOrderId() + "对账成功");}else {OrderEventmap.put(ctx.getCurrentKey(),value);}}@Overridepublic void processElement2(TxEvent value,Context ctx,Collector<String> out) throws Exception {OrderEvent orderEvent = OrderEventmap.get(ctx.getCurrentKey());if (orderEvent != null) {out.collect("订单" + orderEvent.getOrderId() + "对账成功");}else {TxEventmap.put(ctx.getCurrentKey(),value);}}}).print();env.execute();}
}

五、结果

在这里插入图片描述

http://www.lryc.cn/news/136468.html

相关文章:

  • 中英双语对话大语言模型:ChatGLM-6B
  • MES生产报工管理
  • 五、修改官方FreeRTOS例程(STM32F1)
  • pytorch基础实践-数据与预处理
  • Java智慧工地系统源码(微服务+Java+Springcloud+Vue+MySQL)
  • PV3D: A 3D GENERATIVE MODEL FOR PORTRAITVIDEO GENERATION 【2023 ICLR】
  • Apache BeanUtils工具介绍
  • java 原子操作 笔记
  • 什么是线程安全性问题?Java中有哪些常用的同步机制来解决线程安全性问题?
  • Gitlab 安装全流程
  • pdf转word最简单方法~
  • Android 9.0 WiFi 扫描结果上报和获取流程
  • Java 项目日志实例:Log4j2
  • Effective C++条款14——在资源管理类中小心coping行为(资源管理)
  • 【网络教程】如何创建/添加钉钉机器人以及如何获取机器人的Token/Secret
  • wx原生微信小程序入门常用总结
  • 制作一个专属于安防监控业的小程序商城
  • 基于java羽毛球馆管理系统设计与实现
  • 安装elasticsearch8.9.0及修改配置
  • 如何构建高效的接口自动化测试框架?看完你就会了...
  • 53 | 金融行业股票销售指标分析
  • qiuzhiji1
  • 使用VisualStudio制作上位机(二)
  • Datawhale AI夏令营 - 用户新增预测挑战赛 | 学习笔记
  • HarmonyOS/OpenHarmony(Stage模型)卡片开发AbilityStage组件容器
  • 利用torchvision库实现目标检测与语义分割
  • 基于决策树(Decision Tree)的乳腺癌诊断
  • 前端面试的计算机网络部分(2)每天10个小知识点
  • 【LeetCode】224. 基本计算器
  • 服务器数据恢复-EVA存储磁盘故障导致存储崩溃的数据恢复案例