当前位置: 首页 > news >正文

Python——脚本实现datax全量同步mysql到hive

文章目录

  • 前言
  • 一、展示脚本
  • 二、使用准备
    • 1、安装python环境
    • 2、安装EPEL
    • 3、安装脚本执行需要的第三方模块
  • 三、脚本使用方法
    • 1、配置脚本
    • 2、创建.py文件
    • 3、执行脚本
    • 4、测试生成json文件是否可用


前言

在我们构建离线数仓时或者迁移数据时,通常选用sqoop和datax等工具进行操作,sqoop和datax各有优点,datax优点也很明显,基于内存,所以速度上很快,那么在进行全量同步时编写json文件是一项很繁琐的事,是否可以编写脚本来把繁琐事来简单化,接下来我将分享这样一个mysql全量同步到hive自动生成json文件的python脚本。


一、展示脚本

# coding=utf-8
import json
import getopt
import os
import sys
import pymysql# MySQL 相关配置,需根据实际情况作出修改
mysql_host = "XXXXXX"
mysql_port = "XXXX"
mysql_user = "XXX"
mysql_passwd = "XXXXXX"# HDFS NameNode 相关配置,需根据实际情况作出修改
hdfs_nn_host = "XXXXXX"
hdfs_nn_port = "XXXX"# 生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/XXX/XXX/XXX"def get_connection():return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)def get_mysql_meta(database, table):connection = get_connection()cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table):return list(map(lambda x: x[0], get_mysql_meta(database, table)))def get_hive_columns(database, table):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table)return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))def generate_json(source_database, source_table):job = {"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk": "","connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","path": "${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode": "append","fieldDelimiter": "\t","compress": "gzip"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f)def main(args):source_database = ""source_table = ""options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valuegenerate_json(source_database, source_table)if __name__ == '__main__':main(sys.argv[1:])

二、使用准备

1、安装python环境

这里我安装的是python3环境

sudo yum install -y python3

2、安装EPEL

EPEL(Extra Packages for Enterprise Linux)是一个由 Fedora Special Interest Group 维护的软件仓库,提供了大量在官方 RHEL 或 CentOS 软件仓库中没有的软件包。当你在 CentOS 或 RHEL 系统上需要安装一些不在官方软件仓库中的软件时,通常会先安装epel - release

sudo yum install -y epel-release

3、安装脚本执行需要的第三方模块

pip3 install pymysql
pip3 install cryptography

这里可能由于斑纹问题cryptography安装不上去更新一下pip和setuptools

pip3 install --upgrade pip
pip3 install --upgrade setuptools

重新安装cryptography

pip3 install cryptography

三、脚本使用方法

1、配置脚本

首先根据自己服务器修改脚本相关配置

2、创建.py文件

vim /xxx/xxx/xxx/gen_import_config.py

3、执行脚本

python3 /脚本路径/gen_import_config.py -d 数据库名 -t 表名

4、测试生成json文件是否可用

datax.py -p"-Dtargetdir=/表在hdfs存放路径" /生成的json文件路径

执行时首先要确保targetdir目标地址在hdfs上存在,如果没有需要创建后再次执行

http://www.lryc.cn/news/469198.html

相关文章:

  • Python爬虫教程:从入门到精通
  • pytorh学习笔记——cifar10(四)用VGG训练
  • CRLF、UTF-8这些编辑器右下角的选项的意思
  • 【C++干货篇】——类和对象的魅力(四)
  • 基于java的诊所管理系统源码,SaaS门诊信息系统,二次开发的不二选择
  • O2OA如何实现文件跨服务器的备份
  • 语音提示器-WT3000A离在线TTS方案-打破语种限制/AI对话多功能支持
  • 使用HAL库的STM32工程,实现DMA传输USART发送接收数据
  • 常用排序算法总结
  • [项目详解][boost搜索引擎#2] 建立index | 安装分词工具cppjieba | 实现倒排索引
  • R语言编程
  • Mysql主主互备配置
  • 如何预防数据打架?数据仓库如何保持指标数据一致性开发指南(持续更新)
  • 我谈Canny算子
  • 算法的学习笔记—平衡二叉树(牛客JZ79)
  • SSM学习day01 JS基础语法
  • kubeadm快速自动化部署k8s集群
  • 解决JAVA使用@JsonProperty序列化出现字段重复问题(大写开头的字段重复序列化)
  • 分布式理论基础
  • Java应用程序的测试覆盖率之设计与实现(二)-- jacoco agent
  • 【机器学习】13. 决策树
  • 《a16z : 2024 年加密货币现状报告》解析
  • Laravel 使用Simple QrCode 生成PNG遇到问题
  • 一站式学习 Shell 脚本语法与编程技巧,踏出自动化的第一步
  • 批处理操作的优化
  • 机器视觉运动控制一体机在DELTA并联机械手视觉上下料应用
  • RHCE-web篇
  • Java - 人工智能;SpringAI
  • MFC开发,给对话框添加定时器
  • LED灯珠:技术、类型与选择指南