当前位置: 首页 > news >正文

flink的安装与使用(ubuntu)

组件版本

虚拟机:ubuntu-20.04.6-live-server-amd64.iso

flink:flink-1.18.0-bin-scala_2.12.tgz

jdk:jdk-8u291-linux-x64.tar

flink 下载

1、官网:https://flink.apache.org/downloads/

2、清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

flink 安装

1、上传文件至服务器指定路径

/usr/local/myapp/flink

2、解压文件

tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /usr/local/myapp/flink

jdk 安装

1、ubuntu 中自带了 jdk,先将其卸载

sudo apt-get remove *openjdk*
sudo apt-get autoremove

2、上传文件至服务器指定路径

/usr/local/myapp/jdk

3、解压文件

tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk

4、配置环境变量

vim /etc/profile

在文末增加配置(路径根据自身情况进行调整)

export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib

5、测试 jdk

root@vm1:/usr/local/myapp/jdk# java -version
java version "1.8.0_291"
Java(TM) SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291

测试 flink

1、进入到 flink 的安装路径下

cd /usr/local/myapp/flink/flink-1.18.0/

2、修改配置文件

vim conf/flink-conf.yaml

内容

jobmanager.bind-host: 0.0.0.0

3、关闭/禁用防火墙

systemctl stop ufw.service
systemctl disable ufw.service

4、启动 flink

./bin/start-cluster.sh

5、浏览器访问:http://ip:8081/

能看到内容说明正常

设置 flink 的 Standalone 模式集群并上传任务执行

1、机器规划

类型主机名IP
JobManagervm1192.168.141.120
TaskManagervm2192.168.141.121
TaskManagervm3192.168.141.122

2、设置每个服务器的机器名

vim /etc/hostname

3、设置每个服务器的 hosts 文件

vim /etc/hosts

增加三台服务器的机器名对照

192.168.141.120 vm1
192.168.141.121 vm2
192.168.141.122 vm3

使其立即生效(建议到这一步后,都重新启动下)

source /etc/hosts

4、设置服务器间的免密登录

4.1、自身免密

vm1 执行(vm2/vm3 同理)

ssh-keygen -t rsa

之后的内容全部回车即可

生成后,可在 /root/.ssh/ 中看到 id_rsa.pub 文件

通过命令设置到认证文件中

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 

重启服务器,通过命令测试是否可以免密登录自身

ssh vm1

通过 exit 命令可以退出当前的 ssh 登录

4.2、设置相互免密(以 vm1 为演示,其余服务器同理)

在 vm1 服务器中,将生成的自身密钥传输到其余两台服务器上

scp /root/.ssh/id_rsa.pub root@vm2:/root
scp /root/.ssh/id_rsa.pub root@vm3:/root

在 vm2/vm3 服务器中,将传输过来的密钥,通过命令设置到认证文件中

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 

vm1 设置完成,通过命令来测试能不能直接登录到 vm2/vm3 中

ssh vm2
ssh vm3

vm2/vm3 同理,都需要执行这些步骤:

A、生成自身密钥,添加到自身的认证文件中

B、将自身密钥传输到其余的服务器中,并在该服务器中通过命令设置自身密钥到其余服务器的认证文件中

注意:vm2 和 vm3 执行时,一个服务器完全执行结束/测试后,再进行下一个,不然会有密钥文件存在被覆盖的风险

5、设置主机时间同步

安装工具

apt-get install -y ntpdate

执行同步

ntpdate -u ntp.sjtu.edu.cn

6、配置 flink

以下以 vm1 为例,其他服务器的配置可将配置好的配置文件同步过去

6.1、masters 文件

vim masters

内容

vm1:8081

6.2、workers 文件

vim workers

内容

vm2
vm3

6.3、flink-conf.yaml 文件

vim flink-conf.yaml

内容(篇幅问题,去掉了注释)

env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMEDjobmanager.rpc.address: vm1jobmanager.rpc.port: 6123jobmanager.bind-host: 0.0.0.0jobmanager.memory.process.size: 1600mtaskmanager.bind-host: 0.0.0.0taskmanager.memory.process.size: 1728mtaskmanager.numberOfTaskSlots: 3parallelism.default: 1jobmanager.execution.failover-strategy: regionrest.port: 8081rest.address: vm1rest.bind-address: vm1blob.server.port: 45579

7、启动集群

只需在 vm1 上启动集群模式即可

root@vm1:/usr/local/myapp/flink/flink-1.18.0# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.

可以看到 vm2/vm3 的也会被启动,不需要手动去 vm2/vm3 再启动一次了

可以通过 java 的 jps 命令查看程序是否启动成功了

vm1 上

在这里插入图片描述

vm2 上

在这里插入图片描述

vm3 上

在这里插入图片描述

从图上可以分析出是以 Standalone 的集群模式启动了,其中 vm1 是 JobManager,vm2/vm3 是 TaskManager

8、页面查看状态

浏览器输入地址:http://192.168.141.120:8081/

可看到主页面

在这里插入图片描述

9、自定义一个任务

idea 创建一个 maven 项目

9.1、依赖及插件

<properties><flink.version>1.18.0</flink.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

9.2、程序内容

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;// 无界流
public class UnboundStreamJob {public static void main(String[] args) throws Exception {//1 获取flink运行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//2.加载数据源为dataStream  ,绑定客户机的9999端口,将这个网络端口发送的数据加载为dataStreamDataStreamSource<String> dataStream = environment.socketTextStream("192.168.141.122", 9999, "\n");//3.执行多个转换算子 ,SingleOutputStreamOperator是DataStreamSource子类SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, String>() {@Override//value:表示一个待处理的数据,在这里就是一行字符串//out:  用于输出结果的工具对象public void flatMap(String value, Collector<String> out) throws Exception {//拆分value,通过out输出结果String[] words = value.split("//s+");   //去除一个或多个空格for (String word : words) {out.collect(word);}}})  //执行一行字符串拆分为多个单词.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}) //将多个单词转换为(单词,1) 这种tuple2对象.keyBy(0)  //根据单词为key分组,0表示tuple2中的第一个属性,也就是单词.sum(1);//统计每组单词的个数,  1表示tuple2中第2个属性,也就是次数//4.通过sink算子输出结果result.print();//5.发布执行environment.execute("flinkWordCount"); //为任务起别名}}

9.3、程序说明

与 vm3 所在的 IP 为 192.168.141.122 在 9999 端口上进行 socket 通信,程序接收到消息后,进行计算并输出到控制台中

10、在 vm3 上开启一个 socket 通信(这一步一定要在上传任务之前进行)

netcat -lk 9999

11、提交任务(WebUI 方式)

11.1、打包刚才的程序,将打包好的 jar 包复制到某个好找的路径

11.2、打开网页中的 Submit New Job 选项,并点击 Add New

在这里插入图片描述

11.3、选择刚才打包的 jar 包进行上传,之后点击该 jar 包,填写启动类的路径,之后点击 Submit 提交按钮

在这里插入图片描述

11.4、正常情况下,任务就发布完成了,可以在 Task Managers 查看哪个节点的 Free Slots 相比 All Slots 减少了一个,那么这个节点的服务器就是执行该任务的服务器

在这里插入图片描述

12、提交任务(命令方式)

12.1、上传 jar 包到服务器中(任意一个服务器都行)

root@vm1:/usr/local/myapp/flink/task# ls
demo01-1.0-SNAPSHOT.jar

12.2、添加到任务中

../flink-1.18.0/bin/flink run -d -c xx.xx.xx.UnboundStreamJob demo01-1.0-SNAPSHOT.jar

说明:需要指定启动类

12.3、看到下面的信息,说明提交任务完成

Job has been submitted with JobID a893314f5efbb93bf3e6edefa578fd35

13、测试

13.1、点击该服务器,其中的 Stdout 就是控制台输出的地方

我们在 vm3 中开启的 socket 通信中,发送一条消息

在这里插入图片描述

13.2、回到页面中,刷新下控制台输出,会发现多了一个输出信息

在这里插入图片描述

13.3、至此,测试就完成了

http://www.lryc.cn/news/215508.html

相关文章:

  • 容器:软件性能测试的最佳环境
  • 【Qt控件之QMovie】详解
  • Star History 九月开源精选 |开源 GitHub Copilot 替代
  • 【Rabbit MQ】Rabbit MQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制
  • C++设计模式_25_Interpreter 解析器
  • 能源化工过程-故障诊断数据集初探-田纳西-伊斯曼过程数据集
  • 【Linux】安装配置解决CentosMobaXterm的使用及Linux常用命令以及命令模式
  • 一台服务器安装两个mysql、重置数据库用于测试使用
  • JS动态转盘可手动设置份数与概率(详细介绍)
  • 在k8s中,etcd有什么作用?
  • conda配置虚拟环境相关记录
  • 数据库的本质永远都不会改变基础语句(第二十二课)
  • Object转List<>,转List<Map<>>
  • React使用富文本CKEditor 5,上传图片并可设置大小
  • 【工具使用】批量修改文件夹的时间操作
  • Android Snackbar
  • 详解API接口如何安全的传输数据(内附商品详情API接口接入方式)
  • 网工内推 | 大专以上,福利待遇好,IE认证优先(云厂商)
  • Python time strptime()和strftime()
  • 是谁家班主任还不知道 怎么发布期中成绩啊。
  • 损失函数(Loss Function)一文详解-聚类问题常见损失函数Python代码实现+计算原理解析
  • 测试用例设计方法 —— 场景法详解
  • el-table表格设置——动态修改表头
  • 京东数据分析:2023年9月京东洗地机行业品牌销售排行榜
  • 使用 TensorFlow SSD 网络进行对象检测
  • (2)STM32单片机上位机
  • 从InnoDB索引的数据结构,去理解索引
  • Nacos:动态服务发现与配置管理的终极解决方案
  • 易思无人值守智能物流系统Sys_ReportFile文件上传漏洞复现
  • java Map List转化,通过Map保存数据,通过List排序。取前三名