大数据Hadoop之——Flink1.17.0安装与使用(非常详细)
一、前期准备
1、查看网卡
2、配置静态IP
vi /etc/sysconfig/network-scripts/ifcfg-ens32 ---- 根据自己网卡设置。
3、设置主机名
hostnamectl --static set-hostname 主机名
例如:
hostnamectl --static set-hostname hadoop001
4、配置IP与主机名映射
vi /etc/hosts
5、关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
6、配置免密登录
传送门
二、JDK的安装
传送门
注意:
Flink1.16.0版本也支持使用JDK8,后续版本对JDK8的支持将会移除。
从Flink 1.17.0版本开始,必须使用Java 11或更高版本来运行Flink。这是因为Flink为了支持最新的Java API和语言特性,需要Java 11中引入的一些新功能。
虽然使用JDK8 也可以,但从 Flink 1.17 开始,部分依赖于 Flink 的第三方库已经弃用了对 JDK 8 的支持,并要求使用 JDK 11 或更高版本。
考虑到Flink后期与一些大数据框架进行整合,这些大数据框架对JDK11的支持并不完善,例如:Hive3.1.3版本还不支持JDK11,所以采用JDK8来开发Flink。
三、Flink的本地安装
1、Flink的下载安装
1.1. 下载
Index of /dist/flink/flink-1.17.0
https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
下载 flink-1.17.0-bin-scala_2.12.tgz 安装包
1.2 上传
使用xshell上传到指定安装路径此处是安装路径是 /opt/module
1.3 解压重命名
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz
mv flink-1.17.0 flink
1.4 配置环境变量
vi /etc/profile
export JAVA_HOME=/opt/module/java
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export FLINK_HOME=/opt/module/flink
export PATH=$PATH:$JAVA_HOME/bin:$FLINK_HOME/bin
1.5 加载环境变量
source /etc/profile
验证环境变量是否生效:
env | grep HOME
env | grep PATH
2、修改flink-conf.yaml配置文件
cd /opt/module/flink
vi conf/flink-conf.yaml# JobManager节点地址
jobmanager.rpc.address: hadoop001
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop001
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop001
3、启动集群
cd /opt/module/flink
./bin/start-cluster.sh
4、查看进程
5、查看WebUI
默认端口 :8081
http://192.168.200.151:8081/
6、停止集群
cd /opt/module/flink
./bin/stop-cluster.sh
三、Flink Standalone搭建
准备三台节点:hadoop001、hadoop002、hadoop003,每个节点部署不同角色。
进程 | 节点 |
Master(JobManager) Slave(TaskManager) | hadoop001 |
Slave(TaskManager) | hadoop002 |
Slave(TaskManager) | hadoop003 |
以下内容是在Flink本地安装的基础上进行的。
1、修改workers文件
cd /opt/module/flink
vi conf/workershadoop001
hadoop002
hadoop003
2、分发文件
scp -r /etc/profile root@hadoop002:/etc/profile
scp -r /etc/profile root@hadoop003:/etc/profile
scp -r /opt/module/java root@hadoop002:/opt/module/java
scp -r /opt/module/java root@hadoop003:/opt/module/java
scp -r /opt/module/flink root@hadoop002:/opt/module/flink
scp -r /opt/module/flink root@hadoop003:/opt/module/flink
让三台机器文件生效
ssh hadoop001 "source /etc/profile"
ssh hadoop002 "source /etc/profile"
ssh hadoop003 "source /etc/profile"
3、启动集群
cd /opt/module/flink
./bin/start-cluster.sh
4、查看WebUI
默认端口 :8081
http://192.168.200.151:8081/
5、测试
尝试提交一个简单任务,如果任务正常执行完毕,则集群一切正常。提交Flink自带的简单任务如下:
cd /opt/module/flink
./bin/flink run examples/streaming/WordCount.jar
6、停止集群
cd /opt/module/flink
./bin/stop-cluster.sh
四、Flink Standalone HA搭建(不推荐后面由Yarn管理)
HA的主要作用:可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制保证同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager
进程 | 节点 |
Master(JobManager) Slave(TaskManager) FlinkZooKeeperQuorumPeer | hadoop001 |
Master(JobManager) Slave(TaskManager) FlinkZooKeeperQuorumPeer | hadoop002 |
Slave(TaskManager) FlinkZooKeeperQuorumPeer | hadoop003 |
下面内容在 Flink Standalone 搭建前提下修改。
1、修改flink-conf.yaml配置文件
cd /opt/module/flink
vi conf/flink-conf.yaml# 将高可用模式设置为ZooKeeper,默认集群不会开启高可用状态
high-availability: zookeeper
# ZooKeeper集群主机名(或IP)与端口列表,多个以逗号分隔
high-availability.zookeeper.quorum: hadoop001:2181,hadoop002:2181,hadoop003:2181
# 用于持久化JobManager元数据(JobGraph、应用程序JAR文件等)的HDFS地址,以便进行故障恢复,ZooKeeper上存储的只是元数据所在的位置路径信息
high-availability.storageDir: /opt/module/flink/ha
2、修改master文件
cd /opt/module/flink
vi conf/mastershadoop001:8081
hadoop002:8082
3、分发文件
scp -r $FLINK_HOME/conf/flink-conf.yaml root@hadoop002:$FLINK_HOME/conf/flink-conf.yaml
scp -r $FLINK_HOME/conf/flink-conf.yaml root@hadoop003:$FLINK_HOME/conf/flink-conf.yaml
scp -r $FLINK_HOME/conf/masters root@hadoop002:$FLINK_HOME/conf/masters
scp -r $FLINK_HOME/conf/masters root@hadoop003:$FLINK_HOME/conf/masters
4、安装与启动Zookeeper
传送门
5、启动集群
cd /opt/module/flink
./bin/start-cluster.sh
6、查看WebUI
默认端口 :8081
http://192.168.200.151:8081/
7、测试
尝试提交一个简单任务,如果任务正常执行完毕,则集群一切正常。提交Flink自带的简单任务如下:
cd /opt/module/flink
./bin/flink run examples/streaming/WordCount.jar
8、停止集群
cd /opt/module/flink
./bin/stop-cluster.sh
五、YARN运行模式(重点)
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
本节是在 三、Flink Standalone搭建 的基础上进行修改。
1、安装启动Hadoop集群
在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
传送门
2、配置环境变量
vi /etc/profile
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
不要丢掉 一漂符号 “`”
注意:重点是 HADOOP_CLASSPATH=`hadoop classpath` 是执行命令 hadoop classpath,使环境变量能够加载到hadoop的类路径和包路径。此时可以将hadoop和Flink进行解耦,不用纠结使用hadoop的哪个版本。
3、修改配置文件flink-conf.yaml
1.此文件中的 hadoop001 无需修改,启动时候yarn自动分配代理的主机名和端口
# JobManager节点地址.
jobmanager.rpc.address: hadoop001
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop001
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop0012、设置加载检查
vi /opt/module/flink/conf/flink-conf.yaml
classloader.check-leaked-classloader: false
注意:如果上面配置中是避免启动过程中报如下异常。
Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
4、分发文件
scp -r /etc/profile root@hadoop002:/etc/profile
scp -r /etc/profile root@hadoop003:/etc/profile
scp -r /opt/module/flink/conf/flink-conf.yaml root@hadoop002:/opt/module/flink/conf/flink-conf.yaml
scp -r /opt/module/flink/conf/flink-conf.yaml root@hadoop003:/opt/module/flink/conf/flink-conf.yaml
三台机器分别执行 source /etc/profile
5、Session会话模式
YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。
5.1.启动Hadoop集群
cd /opt/module/hadoop
sbin/start-all.sh
5.2.启动Flink的会话模式
cd /opt/module/flink
bin/yarn-session.sh -d -nm flinkTest
参数说明:
-d:分离模式,执行命令后不会占用窗口,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(--name):配置在YARN UI界面上显示的任务名。
-qu(--queue):指定YARN队列名。
-tm(--taskManager):配置每个TaskManager所使用内存。
注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。
YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,
此时Yarn为Flink动态分配资源并启动JobManager。用户可以通过Web UI或者命令行两种方式提交作业。
5.3.命令行提交作业
新打开一个窗口,使用命令行方式提交作业,提交后Yarn会自动的为Flink分配资源启动对应的TaskManager。运行自己到 webui上进行查看。
cd /opt/module/flink
./bin/flink run examples/streaming/WordCount.jar
5.4.小结
通过前面来看,在yarn的每个应用其实就是对应 flink 的 一个集群
6、单作业模式
前面不变,只是提交方式改变。
cd /opt/module/flink
bin/flink run -d -t yarn-per-job examples/streaming/WordCount.jar
如果报错如图所示,则配置flink-conf.yaml
vi /opt/module/flink/conf/flink-conf.yaml
env.java.home: /opt/module/java
env.hadoop.conf.dir: /opt/module/hadoop/etc/hadoop
修改后分发文件
scp -r /opt/module/flink/conf/flink-conf.yaml root@hadoop002:/opt/module/flink/conf/flink-conf.yaml
scp -r /opt/module/flink/conf/flink-conf.yaml root@hadoop003:/opt/module/flink/conf/flink-conf.yaml
7、应用模式
前面不变,只是提交方式改变。
cd /opt/module/flink
bin/flink run-application -t yarn-application examples/streaming/WordCount.jar
如果 Flink 本身的依赖和j插件的jar,用户可以预先上传到HDFS,而不需要每次单独发送到集群,这就使得作业提交更加轻量了。
1、创建HDFS,目录
hdfs dfs -mkdir /flink-dist
hdfs dfs -mkdir /flink-jars2、上传 Flink 本身的依赖和用户jar
hdfs dfs -put /opt/module/flink/lib/ /flink-dist
hdfs dfs -put /opt/module/flink/plugins/ /flink-dist
hdfs dfs -put /opt/module/flink/examples/streaming/WordCount.jar /flink-dist3、提交作业
cd /opt/module/flink
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop001:9000/flink-dist" hdfs://hadoop001:9000/flink-jars/WordCount.jar