当前位置: 首页 > news >正文

大数据应用开发——实时数据处理(一)

前言

大数据应用开发——实时数据采集

大数据应用开发——实时数据处理

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

        并在HBase中进行备份

大数据应用开发——数据可视化

hadoop,zookeeper,kafka,flink要开启

目录

        题目

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中


题目

按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中,并在HBase中进行备份同时建立Hive外表,基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中

Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

在IDEA下用maven创建flink项目:

# 用cmd执行,创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=flink版本号# scala版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=flink版本号

修改pox.xml文件,将flink-connector-kafka_...依赖移出来

 demo包下有两个.java

PS:一个用于批处理,另一个用于流处理

public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置发送的KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092").setTopics("order").setGroupId("my_group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 配置接收的KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("master:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dwd_order").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.NONE).build();// 指定的源创建一个数据流DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 将数据里的'符号去掉DataStream<String> text = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s.replace("'","");}});// 打印处理结果到控制台text.print();// 发送text.sinkTo(sink);// execute programenv.execute("Flink Streaming Java API Skeleton");}
}

将代码打包成.jar,可以先clean,再package

生成位置在当前项目位置/target/项目名称-...jar

 放进主节点

# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar

最后,可以用flink控制台或kafka-console-consumer.sh查看 

http://www.lryc.cn/news/485774.html

相关文章:

  • Wireshark中的length栏位
  • IDEA中创建多模块项目步骤
  • 深度学习笔记13-卷积神经网络1
  • 【新华妙笔-注册/登录安全分析报告-无验证方式导致安全隐患】
  • STM32电源管理—实现低功耗
  • 【链路层】空口数据包详解(4):数据物理通道协议数据单元(PDU)
  • 数学分组求偶数和
  • 机器学习基础02_特征工程
  • CSS Modules中的 :global
  • linux病毒编写+vim shell编程
  • WinDefender Weaker
  • 智能工厂的设计软件 为了监管控一体化的全能Supervisor 的监督学习 之 序5 架构for认知系统 总述 (架构全图)
  • vmware集群 vSAN HCL 数据库
  • 人工智能引发直播革命:AI 技术塑造无人直播全新体验
  • 数据研发基础 | 什么是流批一体
  • 《Python网络安全项目实战》项目6 编写密码工具程序
  • 现代C++HTTP框架cinatra
  • 【功耗现象】com.gorgeous.lite后台Camera 使用2小时平均电流200mA耗电量400mAh现象
  • 06.VSCODE:备战大项目,CMake专项配置
  • 还是小时候味道的麻辣片
  • GaussDB部署架构
  • 遥测数据采集工具Grafana Alloy
  • 线性数据结构
  • 【ArcGIS微课1000例】0127:计算城市之间的距离
  • 【算法】二分
  • ARM CCA机密计算安全模型之简介
  • 蓝桥杯-洛谷刷题-day3(C++)
  • K8S资源限制之ResourceQuota
  • 释放高级功能:Nexusflows Athene-V2-Agent在工具使用和代理用例方面超越 GPT-4o
  • MongoDB索引操作和执行计划Explain()详解