当前位置: 首页 > news >正文

數據集成平台:datax將hive數據步到mysql(全部列和指定列)

數據集成平台:datax將hive數據步到mysql(全部列和指定列)

1.py腳本

傳入參數:

target_database:數據庫
target_table:表
target_columns:列
target_positions:hive列的下標(從0開始)

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb# MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xx"# HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table, columns):connection = get_connection()cursor = connection.cursor()if columns == 'all':# 如果传入 '*' 表示要所有列sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION" % (database, table)else:# 传入指定列# 将每个列名加上单引号columns = ', '.join("'%s'" % col.strip() for col in columns.split(','))sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND COLUMN_NAME IN (%s) ORDER BY ORDINAL_POSITION" % (database, table, columns)cursor.execute(sql)fetchall = cursor.fetchall()# print(fetchall)cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table, target_columns):return map(lambda x: x[0], get_mysql_meta(database, table, target_columns))def get_hive_columns(database, table, target_columns, target_positions):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","mediumint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string","bit": "string",}return mappings[mysql_type]meta = get_mysql_meta(database, table, target_columns)if target_columns == 'all':return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)else:positions = list(map(int, target_positions.split(',')))return map(lambda x, i: {"index": positions[i], "type": type_mapping(x[1].lower())}, meta, range(len(meta)))def generate_json(target_database, target_table, target_columns, target_positions):print(get_hive_columns(target_database, target_table, target_columns, target_positions))if target_columns == 'all':target_columns_hive = "[*]"else:target_columns_hive = get_hive_columns(target_database, target_table, target_columns, target_positions)job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "hdfsreader","batchSize": "8192","batchByteSize": "33554432","parameter": {"path": "${exportdir}","defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"column": target_columns_hive,"fileType": "orc","encoding": "UTF-8","fieldDelimiter": u"\u0001","nullFormat": "\\N"}},"writer": {"name": "mysqlwriter","batchSize": "8192","batchByteSize": "33554432","parameter": {"writeMode": "replace","username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(target_database, target_table, target_columns),"connection": [{"jdbcUrl":"jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8&useSSL=false","table": [target_table]}]}}}]}
}output_path = "/opt/module/datax/job/export/" + target_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:json.dump(job, f)def main(args):target_database = ""target_table = ""target_columns = ""  # 默认为 None,表示没有指定列信息target_positions = ""options, arguments = getopt.getopt(args, 'p:d:t:c:', ['positions=', 'targetdb=', 'targettbl=', 'columns='])for opt_name, opt_value in options:if opt_name in ('-d', '--targetdb'):target_database = opt_valueif opt_name in ('-t', '--targettbl'):target_table = opt_valueif opt_name in ('-c', '--columns'):target_columns = opt_valueif opt_name in ('-p', '--positions'):target_positions = opt_valueprint(target_database, target_table, target_columns, target_positions)generate_json(target_database, target_table, target_columns, target_positions)if __name__ == '__main__':main(sys.argv[1:])

2.sh腳本

#!/bin/bash
python ~/bin/test.py -d db-t table -c all
#kunnr,name1,sort2,addrnumber,country,state -p 0,1,2,3,4,5
#all
http://www.lryc.cn/news/305947.html

相关文章:

  • pikachu靶场-File Inclusion
  • [今天跟AI聊聊职场] ~你能接受你的直接领导能力不如你,年纪还比你小很多吗?
  • 网络原理TCP之“三次握手“
  • 990-03产品经理与程序员:什么是 IT 与业务协调以及为什么它很重要?
  • Java Web(七)__Tomcat(二)
  • 【项目实战】帮美女老师做一个点名小程序(Python tkinter)
  • Elasticsearch 去重后求和
  • 考研数学——高数:函数与极限(3)
  • LeetCode49 字母异位词分组
  • 【Python】Windows本地映射远程Linux服务器上的端口(解决jupyter notebook无法启动问题)
  • C++面试:用户态和内核态的基本概念、区别
  • Vue计算属性computed()
  • JWT学习笔记
  • WSL里的Ubuntu 登录密码忘了怎么更改
  • 【软件测试面试】要你介绍项目-如何说?完美面试攻略...
  • 【Crypto | CTF】RSA打法 集合
  • 在springboot中调用openai Api并实现流式响应
  • C++构造函数重难点解析
  • QT day3 作业2.22
  • AR汽车行业解决方案系列之2-远程汽修
  • 每日五道java面试题之spring篇(五)
  • 挑战杯 基于YOLO实现的口罩佩戴检测 - python opemcv 深度学习
  • 12. Springboot集成Dubbo3(三)Dubbo-Admin
  • c语言的数据结构:找环状链表入口处
  • LabVIEW声速测定实验数据处理
  • 深入剖析C语言中的段错误:从内存模型到实战调试全方位解析
  • 1.操作Python入门Python安装和使用教程
  • STM32G030C8T6:定时器1ms中断(以64MHz外部晶振为例)
  • 人工智能聊天机器人如何帮助您实现工作与生活的平衡
  • 3分钟看懂设计模式01:策略模式