当前位置: 首页 > news >正文

flink wordcount

Maven配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>com.atguigu</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies></project>

java编写wordcount代码

基于DataSet API(过时的,不推荐)
之后用 DataStream API

package com.atguigu.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountBatchDemo {public static void main(String[] args) throws Exception {//1.创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.读取数据,从文件中读取DataSource<String> lineDS = env.readTextFile("input/word.txt");//3.切分、转换(word,1)FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//Todo3.1 按照空格 切分单词String[] words = value.split(" ");//Todo3.2 将单词转换为(word,1)for (String word : words) {Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);//Todo3.3 调用采集器collector 向下游发送数据out.collect(wordTuple2);}}});//4.按照word分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);//5.各分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBy.sum(1);//6.输出sum.print();}
}

在这里插入图片描述

http://www.lryc.cn/news/433292.html

相关文章:

  • 组合模式(Composite Pattern)
  • 教你制作一本加密的样本册
  • C语言进阶【1】--字符函数和字符串函数【1】
  • git提交自动带上 Signed-off-by信息
  • 图论(2)
  • ASP.NET Core 入门教学十九 依赖注入ioc
  • omm kill 内存碎片化
  • JS中给元素添加事件监听器的各种方法详解(包含比较和应用场景)
  • Python基本数据类型之复数complex
  • 第六届机器人与智能制造技术国际会议 (ISRIMT 2024)
  • 鸿蒙轻内核M核源码分析系列十九 Musl LibC
  • mysqldump备份恢复数据库
  • 路径规划——RRT算法
  • OPCUA-PLC
  • 在Windows系统上部署PPTist并实现远程访问
  • 【Grafana】Prometheus结合Grafana打造智能监控可视化平台
  • 隐私计算实训营:SplitRec:当拆分学习遇上推荐系统
  • 存在nginx版本信息泄露(请求头中存在nginx中间件版本信息)
  • 在js中观察者模式讲解
  • java常用面试题-基础知识分享
  • iOS——runLoop
  • python: 多模块(.py)中全局变量的导入
  • 0基础学习爬虫系列:Python环境搭建
  • Unity Shader实现简单的各向异性渲染(采用各向异性形式的GGX分布)
  • React开源框架之Refine
  • 【iOS】——渲染原理与离屏渲染
  • 详解CSS
  • Python执行cmd命令
  • 基于激光雷达的无人机相互避障
  • Zookeeper基本原理