当前位置: 首页 > news >正文

阿里MAXCOMPUTE数据专辑信息读取并同步数据表

阿里MAXCOMPUTE数据专辑信息读取并同步数据表

在阿里云大数据体系中,我们可以使用数据地图的数据专辑,对数据的类别等进行一个管理

那么管理后的数据,我们想要落表进行相关的数据分析,如何做呢?

查看阿里云官方文档可以知道,我们可以通过阿里云OpenAPI取得专辑和对应的数据表信息,之后将结果落入MaxCompute中
在这里插入图片描述

Code

"""
@author:Biglucky
@date:2024-07-26请求专辑信息并且写入到ODPS中参数:1、一组阿里云账号和需要访问的endpointALIBABA_CLOUD_ACCESS_KEY_ID :key信息ALIBABA_CLOUD_ACCESS_KEY_SECRET :secret信息ALIBABA_CLOUD_ENDPOINT :阿里云开放API endpointODPS_ENDPOINT :Maxcompute的endpoint2、一个ODPS表,用于存储album信息TABLE_PROJECT :MAXCOMPUTE的空间名称TABLE_NAME :MAXCOMPUTE的表名称创建好的table 包含列为:{  album_id	string  ,album_name	string   专辑名称,entity_type	string 类型,entity_name	string 表名称,project_name	string 项目名称,add_album_time	string 数据表添加到转机时间}3、安装好相关的包STEPS:1、读取阿里云开放API的album信息2、读取album下的存放在DataFrame对象信息3、将数据入到ODPS中"""import sys
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
import pandas as pd
from odps import ODPS
from odps.df import DataFrame# 配置信息:海外公共组账号
ALIBABA_CLOUD_ACCESS_KEY_ID = "你的KEY"
ALIBABA_CLOUD_ACCESS_KEY_SECRET ="你的SECRET"
ALIBABA_CLOUD_ENDPOINT = "开放API的endpoint" # https://next.api.aliyun.com/product/dataworks-public  进行查询# OUTPUT TABLE 
TABLE_NAME = "你的存储Table"
TABLE_PROJECT = "你的空间名称"
ODPS_ENDPOINT = "MaxCompute endpoint信息"   #http://service.ap-southeast-1.maxcompute.aliyun.com/apidef album_list(client):"""功能:传入一个阿里client,读取album信息,并且用df格式化返回client : OpenApiClientreturn df: DataFrame"""#配置接口param参数params = open_api_models.Params(# API Name,action='ListMetaCollections',# API Version,version='2020-05-18',# Protocol,protocol='HTTPS',# HTTP Method,method='POST',auth_type='AK',style='RPC',# API PATH,pathname=f'/',# Request body content format,req_body_type='json',# Response body content format,body_type='json')queries = {}queries['CollectionType'] = 'ALBUM' #请求类型是数据专辑queries['PageSize']= '100'  runtime = util_models.RuntimeOptions()request = open_api_models.OpenApiRequest(query=OpenApiUtilClient.query(queries))result = client.call_api(params, request, runtime)df = pd.DataFrame.from_records( result["body"]["Data"]["CollectionList"])  #将专辑id整合成DataFrame之后进行返回return dfdef album_detail (album_id,client):"""function:requst for the table list of the album by album idrequest param:* album_id : the id number of the album* client : the client of the openAPIreturn:total_list : DataFrame    the table list of the album(album id)"""params = open_api_models.Params(# API Name,action='ListMetaCollectionEntities',# API Version,version='2020-05-18',# Protocol,protocol='HTTPS',# HTTP Method,method='POST',auth_type='AK',style='RPC',# API PATH,pathname=f'/',# Request body content format,req_body_type='json',# Response body content format,body_type='json')queries = {}queries['CollectionQualifiedName'] = album_id #CollectionQualifiedName is the album idqueries['PageSize']  = 50for i in range(0,300,50):queries['NextToken'] = iruntime = util_models.RuntimeOptions()request = open_api_models.OpenApiRequest(query=OpenApiUtilClient.query(queries))result = client.call_api(params, request, runtime)df = pd.DataFrame.from_records( result["body"]["Data"]["EntityList"]) # get the table list of the album(album id)if i == 0 :total_list = df elif (len(df)==0)  :breakelse :            total_list = pd.concat([total_list,df],ignore_index = True)return total_listdef __main__():#STEP 1 initialize client instance config = open_api_models.Config(access_key_id = ALIBABA_CLOUD_ACCESS_KEY_ID,access_key_secret = ALIBABA_CLOUD_ACCESS_KEY_SECRET)config.endpoint = ALIBABA_CLOUD_ENDPOINTclient = OpenApiClient(config)#STEP 2 get the whole album numbersdf_album = album_list(client)albums =  df_album[["QualifiedName","Name"]]#STEP 3 requst each album by album id to get the table list and table namealbums_tables = pd.DataFrame()  for i in range(0,len(albums)):album_id = albums.iloc[i,0]album_name = albums.iloc[i,1]album_detail_tables = album_detail(album_id,client) album_detail_tables["album_id"] = album_idalbum_detail_tables["album_name"] = album_name#concat the whole informationalbums_tables = pd.concat([albums_tables,album_detail_tables[["album_id","album_name","EntityContent","QualifiedName"]]],ignore_index=True)#STEP 4 format the dataframealbums_tables["entity_type"] = albums_tables["EntityContent"].apply(lambda x: x["entityType"])albums_tables["entity_name"] = albums_tables["EntityContent"].apply(lambda x: x["name"])albums_tables["project_name"] = albums_tables["EntityContent"].apply(lambda x: x["projectName"])albums_tables["add_album_time"] = albums_tables["EntityContent"].apply(lambda x: x["addToCollectionTimestamp"])albums_tables = albums_tables.drop(columns = ["EntityContent","QualifiedName"])#STEP 5 insert the data into odps table o = ODPS(access_id=ALIBABA_CLOUD_ACCESS_KEY_ID,secret_access_key=ALIBABA_CLOUD_ACCESS_KEY_SECRET,project = TABLE_PROJECT,endpoint = ODPS_ENDPOINT)odps_df = DataFrame(albums_tables)pt = 'ds=' + args['YYYY-MM-DD'] # read the dataworks params odps_df.persist(name=TABLE_NAME,partition=pt,odps=o,create_partition=True)#run 
__main__()

Reference

  • 阿里云,ListMetaCollections - 查询集合信息

​ https://help.aliyun.com/zh/dataworks/developer-reference/api-dataworks-public-2020-05-18-listmetacollections?spm=a2c4g.11186623.0.0.7acc43f9jyudaO

  • 阿里云,ListMetaCollectionEntities - 查询集合中的实体

    https://help.aliyun.com/zh/dataworks/developer-reference/api-dataworks-public-2020-05-18-listmetacollectionentities?spm=a2c4g.11186623.0.0.663143f9J7Ywoe

http://www.lryc.cn/news/429573.html

相关文章:

  • rufus制作ubantu的U盘安装介质时,rufus界面上的分区类型选什么?
  • 【系统架构设计师-2018年】案例分析-答案及详解
  • linux驱动入门实验班——平台总线设备驱动模型和设备树
  • 零基础学习Python(六)
  • 微信小程序--31(todolist案例)
  • springboot项目使用本地依赖项,打包后出现NoClassDefFoundError的一种解决方法
  • Maven高级使用指南
  • windows docker 执行apt-get 权限问题
  • Linux系统信息排查
  • 《图解设计模式》笔记(四)分开考虑
  • Linux shell编程学习笔记74:sed命令——沧海横流任我行(中)
  • [数据集][目标检测]道路积水检测数据集VOC+YOLO格式2699张1类别
  • 不同路径
  • 【HTML】HTML学习之引入CSS样式表
  • shaushaushau1
  • 揭秘面试必备:高频算法与面试题全面解析
  • 设计模式-visit模式-在语法树的实践
  • ZK-Rollups测评
  • redis生产使用场景(一):并行流+二级缓存
  • EXCEL跨文件查询,指定条件列,返回满足条件的指定列
  • [数据集][目标检测]流水线物件检测数据集VOC+YOLO格式9255张26类别
  • StarRocks 存算分离 Compaction 原理
  • 搭建ELK日志采集与分析系统
  • java集合中自动排序的treeset和treemap
  • Android 修改SystemUI 音量条的声音进度条样式
  • 电商场景的视频生成的prompt测评集合
  • day34
  • 无缝融入,即刻智能[三]:Dify-LLM平台知识库构建(多路召回、精排重排),43K+星标见证专属智能方案
  • AWS服务WAF
  • 二叉树中的奇偶树问题