当前位置: 首页 > news >正文

Flink入门学习 | 大数据技术

简单说两句

✨ 正在努力的小新~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:后端小知识CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:后端小知识

🔎GZH后端小知识

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

Flink入门学习-WordCount

image-20240414195737021

我们今天来编写一个Flink入门学习案例,统计单词出现的次数

这里就先直接上手实践,先不看枯燥的理论

IDEA方式运行

我们首先创建Flink运行环境

//设置Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后模拟一点数据

//从集合中读取模拟数据DataStream<String> stream = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");

切词做转换

stream.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {//value就是每一个元素的数据System.out.println("读取内容:" + value);//将每一个元素按照空格切分String[] split = value.split(" ");//遍历每一个单词for (String word : split) {//将每一个单词发送到下游out.collect(new Tuple2<>(word, 1));}})

返回类型

.returns(Types.TUPLE(Types.STRING, Types.INT))

keyby分组(按照tuple的第一个元素进行分组)

.keyBy(f->f.f0)

聚合统计

.sum(1);

打印结果

 sum.print();

最后执行execute

 env.execute();

完整代码如下

package cn.wy.chapter02;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author tiancx*/
public class WordCount {public static void main(String[] args) throws Exception {//设置Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//从集合中读取模拟数据DataStream<String> stream = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {//value就是每一个元素的数据System.out.println("读取内容:" + value);//将每一个元素按照空格切分String[] split = value.split(" ");//遍历每一个单词for (String word : split) {//将每一个单词发送到下游out.collect(new Tuple2<>(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(f->f.f0).sum(1);//打印结果sum.print();env.execute();}
}

运行看结果

image-20240401232613214

提交到集群运行

启动集群后我们使用命令

flink run -c 类全限定路径名 jar文件 

flink run -c cn.wy.chapter02.WordCount FlinkDemo-1.0-SNAPSHOT.jar

image-20240401233542495

可以看到任务提交切运行成功了

我们进入web-ui界面

网址

http://localhost:8081/#/job/completed

界面如下图所示

image-20240401233753324

可以清晰的看到任务状态是FINISHED(完成)

任务执行成功了,我们的日志在哪看呢?

我们直接去TaskManager中看

image-20240401233949094

点击地址进去

image-20240401234102076这里就是结果啦

【都看到这了,点点赞点点关注呗,爱你们】😚😚

后端小知识关注引导

image-20240330155339598

💬

✨ 正在努力的小新~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:后端小知识CSDN后端领域新星创作者 | 阿里云专家博主

CSDN个人主页:后端小知识

🔎GZH后端小知识

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

http://www.lryc.cn/news/339045.html

相关文章:

  • Arthas实战教程:定位Java应用CPU过高与线程死锁
  • HTML制作跳动的心形网页
  • 如何在Odoo 17 销售应用中使用产品目录添加产品
  • 为什么pdf拆分出几页之后大小几乎没有变化
  • 如何在 VM 虚拟机中安装 OpenEuler 操作系统保姆级教程(附链接)
  • (六)PostgreSQL的组织结构(3)-默认角色和schema
  • DockerFile定制镜像
  • Java8中JUC包同步工具类深度解析(Semaphore,CountDownLatch,CyclicBarrier,Phaser)
  • 岛屿个数(dfs)
  • 【C++造神计划】运算符
  • Cortex-M3/M4处理器的bit-band(位带)技术
  • 【TOP】IEEE旗下1区,影响因子将破8,3个月录用,CCF推荐,性价比高!
  • 赚钱游戏 2.0.1 版 (资源免费)
  • 服务调用-微服务小白入门(4)
  • 代码随想录算法训练营第三十六天| 435. 无重叠区间、 763.划分字母区间、56. 合并区间
  • 【AIGC调研系列】rerank3是什么
  • Linux下网络编程基础知识--协议
  • 在 VS Code 中使用 GitHub Copilot
  • 使用spring-ai快速对接ChatGpt
  • 免费的 ChatGPT 网站(六个)
  • arm内核驱动-中断
  • 第十五届蓝桥杯大赛软件赛省赛 C/C++ 大学 B 组
  • kotlin编译版本
  • 【C#】 删除首/尾部字符
  • 第十五篇【传奇开心果系列】Python自动化办公库技术点案例示例:深度解读Python 自动化处理图像在各行各业的应用场景
  • 什么是MOV视频格式?如何把MP4视频转MOV视频格式?
  • 整理的微信小程序日历(单选/多选/筛选)
  • Unity 人形骨骼动画模型嘴巴张开
  • Python爬虫-京东商品评论数据
  • 实况窗助力美团打造鸿蒙原生外卖新体验,用户可实时掌握外卖进展