当前位置: 首页 > news >正文

Apache Airflow (十一) :HiveOperator及调度HQL

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. HiveOperator配置

2. HiveOperator调度HQL案例


1. HiveOperator配置

可以通过HiveOperator直接操作Hive SQL ,HiveOperator的参数如下:

hql(str):需要执行的Hive SQL。hive_cli_conn_id(str):连接Hive的conn_id,在airflow webui connection中配置的。

想要在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore:

#切换Python37环境[root@node4 ~]# conda activate python37#安装hive provider package(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2#启动airflow(python37) [root@node4 ~]# airflow webserver --port 8080(python37) [root@node4 ~]# airflow scheduler

登录Airflow webui并设置Hive Metastore,登录后找到”Admin”->”Connections”,点击“+”新增配置:

2. HiveOperator调度HQL案例

1) 启动Hive,准备表

启动HDFS、Hive Metastore,在Hive中创建以下三张表:

create table person_info(id int,name string,age int) row format delimited fields terminated by '\t';create table score_info(id int,name string,score int) row format delimited fields terminated by '\t';

向表 person_info加载如下数据:

1 zs 182 ls 193 ww 20

向表score_info加载如下数据:

1 zs 1002 ls 2003 ww 300

2) 在node4节点配置Hive 客户端

由于Airflow 使用HiveOperator时需要在Airflow安装节点上有Hive客户端,所以需要在node4节点上配置Hive客户端。

将Hive安装包上传至node4 “/software”下解压,并配置Hive环境变量

#在/etc/profile文件最后配置Hive环境变量export HIVE_HOME=/software/hive-1.2.1export PATH=$PATH:$HIVE_HOME/bin#使环境变量生效source /etc/profile

修改HIVE_HOME/conf/hive-site.xml ,写入如下内容:

<configuration><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><property><name>hive.metastore.local</name><value>false</value></property><property><name>hive.metastore.uris</name><value>thrift://node1:9083</value></property></configuration>

3) 编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到HiveOperator,需要在本地对应的python环境中安装对应的provider package。

C:\Users\wubai>d:D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scriptsd:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-apache-hive==2.0.2注意:这里本地安装也有可能缺少对应的C++环境,我们也可以不安装,直接跳过也可以。

Python配置文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperatordefault_args = {'owner':'wangwu','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_hive_sql',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=HiveOperator(task_id='person_info',hive_cli_conn_id="node1-hive-metastore",hql='select id,name,age from person_info',dag = dag
)second=HiveOperator(task_id='score_info',hive_cli_conn_id="node1-hive-metastore",hql='select id,name,score from score_info',dag=dag
)third=HiveOperator(task_id='join_info',hive_cli_conn_id="node1-hive-metastore",hql='select a.id,a.name,a.age,b.score from person_info a join score_info b on a.id = b.id',dag=dag
)first >> second >>third

4) 调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:


http://www.lryc.cn/news/238879.html

相关文章:

  • SpringBoot-Docker容器化部署发布
  • 重生奇迹mu格斗怎么加点
  • 「浙江科聪新品发布」新品发布潜伏顶升式移动机器人专用控制器
  • 大数据学习(22)-spark
  • String类常用方法总结
  • TensorFlow实战教程(二十八)-Keras实现BiLSTM微博情感分类和LDA主题挖掘分析
  • 个人博客添加访问人数以及访问时间-githubpage
  • Django--重定向redirect
  • 在html和css中的引用svg(一)
  • C/C++ 实现:自然排序:针对两个需要排序的字符串,不仅逐个比较每个字符的顺序,对于连在一起的数字字符会作为一个完整数字进行比较 某知名企业的笔试题
  • sse实时通信
  • Qt专栏3—Qt项目创建Hello World
  • linux输出的重定向无效问题和解决
  • chromium114添加新的语言国际化支持
  • 赛氪荣幸受邀参与中国联合国采购促进会第五次会员代表大会
  • 车载通信架构 —— 传统车内通信网络发展回顾
  • `maven.test.skip` 和 `skipTests` 的区别
  • linux输出的重定无效问题和解决
  • 开发上门按摩系统对技师如何管理,薪资结构怎么设计
  • 云HIS系统源码,医院管理系信息统源码,融合B/S版四级电子病历系统
  • Redis篇---第十篇
  • (vue)前后端配合实现文件预览功能
  • .NET 6 在已知拓扑路径的情况下使用 Dijkstra,A*算法搜索最短路径
  • SQL Server删除重复数据只保留一条
  • 如何使用 WPF 应用程序连接 FastReport报表
  • 【Django使用】4大模块50页md文档,第4篇:Django请求与响应和cookie与session
  • uniapp - 开关按钮
  • 使用sql语句获取SQL server库里所有表的表名,注释,行数
  • D-Wave推出新开源及解决无线信道解码新方案!
  • JavaScrip获取视频第一帧作为封面图