当前位置: 首页 > news >正文

在 EMR Serverless 上使用 Delta Lake

本文是一份开箱即用的全自动测试脚本,用于在 EMR Serverless 上提交一个 Delta Lake 作业。本文完全遵循《最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?》 一文给出的标准和规范!

1. 导出环境相关变量

注意: 以下仅为示意值,实操时请根据个人环境替换相关值。

export APP_NAME="emr-serverless-deltalake-test"
export APP_S3_HOME="s3://$APP_NAME"
export APP_LOCAL_HOME="/home/ec2-user/$APP_NAME"
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::1111111111111:role/EMR_SERVERLESS_ADMIN'

2. 创建作业专属工作目录和S3存储桶

mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME

3. 准备作业脚本

cat << EOF >> $APP_LOCAL_HOME/delta_table.py
from datetime import datetime
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, SparkSessionspark = SparkSession\.builder\.appName("Delta-Lake integration demo - create tables")\.enableHiveSupport()\.getOrCreate()## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])spark.sql("""drop table if exists delta_table""")## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://$APP_NAME/delta_table'""");data.writeTo("delta_table").append()
EOF
aws s3 cp $APP_LOCAL_HOME/delta_table.py $APP_S3_HOME/delta_table.py

4. 准备作业描述文件

cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"$APP_NAME","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"s3://$APP_NAME/delta-test.py","sparkSubmitParameters":"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar,/usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

5. 提交 & 监控 作业

export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name apache-hudi-delta-streamer \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId) && \
now=$(date +%s)sec && \
while true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseecho -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"breakfi
done

6. 检查错误

JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME
http://www.lryc.cn/news/178050.html

相关文章:

  • Stream流的使用详解(持续更新)
  • golang工程——gRpc 拦截器及原理
  • Python接口自动化之unittest单元测试
  • 在亚马逊云科技Amazon SageMaker上部署构建聊天机器人的开源大语言模型
  • 【51单片机】10-蜂鸣器
  • 26377-2010 逆反射测量仪 知识梳理
  • css实现渐变电量效果柱状图
  • FileManager/本地文件增删改查, Cache/图像缓存处理 的操作
  • vue中使用富文本编辑器
  • 13.(开发工具篇github)如何在GitHub上上传本地项目
  • vue3中状态适配
  • uniapp h5 端 router.base设置history后仍有#号
  • 上网行为监管软件(上网行为管理软件通常具有哪些功能)
  • C#中的for和foreach的探究与学习
  • 【ES6知识】Promise 对象
  • 【Git】配置SSH密钥实现Git操作免密
  • AI能给百融云带来什么?
  • AI创作系统ChatGPT商业运营版源码+AI绘画/支持GPT联网提问/支持Midjourney绘画+Prompt应用+支持国内AI提问模型
  • vue.draggable拖拽,项目中三个表格互相拖拽的实例操作,前端分页等更多小技巧~
  • 400G DR4 QSFP-DD光模块:数据中心应用全攻略
  • 自动驾驶:路径规划概述
  • vlc将本地文件推流成ts实时流
  • C# 自定义控件库之Lable组合控件
  • 解密防关联指纹浏览器:联盟营销领域的秘密武器
  • asp.net core mvc Razor +dapper 增删改查,分页(保姆教程)
  • 网络安全——自学(黑客)方法
  • 秋招算法岗,面试复盘
  • AI类APP能做什么
  • 计算机毕业设计 基于SSM的垃圾分类管理系统(以医疗垃圾为例)的设计与实现 Java实战项目 附源码+文档+视频讲解
  • 友思特案例|友思特 Ensenso 3D相机:汽车工业自动化的革命性力量