阿里MAXCOMPUTE数据专辑信息读取并同步数据表
阿里MAXCOMPUTE数据专辑信息读取并同步数据表
在阿里云大数据体系中,我们可以使用数据地图的数据专辑,对数据的类别等进行一个管理
那么管理后的数据,我们想要落表进行相关的数据分析,如何做呢?
查看阿里云官方文档可以知道,我们可以通过阿里云OpenAPI取得专辑和对应的数据表信息,之后将结果落入MaxCompute中
Code
"""
@author:Biglucky
@date:2024-07-26请求专辑信息并且写入到ODPS中参数:1、一组阿里云账号和需要访问的endpointALIBABA_CLOUD_ACCESS_KEY_ID :key信息ALIBABA_CLOUD_ACCESS_KEY_SECRET :secret信息ALIBABA_CLOUD_ENDPOINT :阿里云开放API endpointODPS_ENDPOINT :Maxcompute的endpoint2、一个ODPS表,用于存储album信息TABLE_PROJECT :MAXCOMPUTE的空间名称TABLE_NAME :MAXCOMPUTE的表名称创建好的table 包含列为:{ album_id string ,album_name string 专辑名称,entity_type string 类型,entity_name string 表名称,project_name string 项目名称,add_album_time string 数据表添加到转机时间}3、安装好相关的包STEPS:1、读取阿里云开放API的album信息2、读取album下的存放在DataFrame对象信息3、将数据入到ODPS中"""import sys
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
import pandas as pd
from odps import ODPS
from odps.df import DataFrame# 配置信息:海外公共组账号
ALIBABA_CLOUD_ACCESS_KEY_ID = "你的KEY"
ALIBABA_CLOUD_ACCESS_KEY_SECRET ="你的SECRET"
ALIBABA_CLOUD_ENDPOINT = "开放API的endpoint" # https://next.api.aliyun.com/product/dataworks-public 进行查询# OUTPUT TABLE
TABLE_NAME = "你的存储Table"
TABLE_PROJECT = "你的空间名称"
ODPS_ENDPOINT = "MaxCompute endpoint信息" #http://service.ap-southeast-1.maxcompute.aliyun.com/apidef album_list(client):"""功能:传入一个阿里client,读取album信息,并且用df格式化返回client : OpenApiClientreturn df: DataFrame"""#配置接口param参数params = open_api_models.Params(# API Name,action='ListMetaCollections',# API Version,version='2020-05-18',# Protocol,protocol='HTTPS',# HTTP Method,method='POST',auth_type='AK',style='RPC',# API PATH,pathname=f'/',# Request body content format,req_body_type='json',# Response body content format,body_type='json')queries = {}queries['CollectionType'] = 'ALBUM' #请求类型是数据专辑queries['PageSize']= '100' runtime = util_models.RuntimeOptions()request = open_api_models.OpenApiRequest(query=OpenApiUtilClient.query(queries))result = client.call_api(params, request, runtime)df = pd.DataFrame.from_records( result["body"]["Data"]["CollectionList"]) #将专辑id整合成DataFrame之后进行返回return dfdef album_detail (album_id,client):"""function:requst for the table list of the album by album idrequest param:* album_id : the id number of the album* client : the client of the openAPIreturn:total_list : DataFrame the table list of the album(album id)"""params = open_api_models.Params(# API Name,action='ListMetaCollectionEntities',# API Version,version='2020-05-18',# Protocol,protocol='HTTPS',# HTTP Method,method='POST',auth_type='AK',style='RPC',# API PATH,pathname=f'/',# Request body content format,req_body_type='json',# Response body content format,body_type='json')queries = {}queries['CollectionQualifiedName'] = album_id #CollectionQualifiedName is the album idqueries['PageSize'] = 50for i in range(0,300,50):queries['NextToken'] = iruntime = util_models.RuntimeOptions()request = open_api_models.OpenApiRequest(query=OpenApiUtilClient.query(queries))result = client.call_api(params, request, runtime)df = pd.DataFrame.from_records( result["body"]["Data"]["EntityList"]) # get the table list of the album(album id)if i == 0 :total_list = df elif (len(df)==0) :breakelse : total_list = pd.concat([total_list,df],ignore_index = True)return total_listdef __main__():#STEP 1 initialize client instance config = open_api_models.Config(access_key_id = ALIBABA_CLOUD_ACCESS_KEY_ID,access_key_secret = ALIBABA_CLOUD_ACCESS_KEY_SECRET)config.endpoint = ALIBABA_CLOUD_ENDPOINTclient = OpenApiClient(config)#STEP 2 get the whole album numbersdf_album = album_list(client)albums = df_album[["QualifiedName","Name"]]#STEP 3 requst each album by album id to get the table list and table namealbums_tables = pd.DataFrame() for i in range(0,len(albums)):album_id = albums.iloc[i,0]album_name = albums.iloc[i,1]album_detail_tables = album_detail(album_id,client) album_detail_tables["album_id"] = album_idalbum_detail_tables["album_name"] = album_name#concat the whole informationalbums_tables = pd.concat([albums_tables,album_detail_tables[["album_id","album_name","EntityContent","QualifiedName"]]],ignore_index=True)#STEP 4 format the dataframealbums_tables["entity_type"] = albums_tables["EntityContent"].apply(lambda x: x["entityType"])albums_tables["entity_name"] = albums_tables["EntityContent"].apply(lambda x: x["name"])albums_tables["project_name"] = albums_tables["EntityContent"].apply(lambda x: x["projectName"])albums_tables["add_album_time"] = albums_tables["EntityContent"].apply(lambda x: x["addToCollectionTimestamp"])albums_tables = albums_tables.drop(columns = ["EntityContent","QualifiedName"])#STEP 5 insert the data into odps table o = ODPS(access_id=ALIBABA_CLOUD_ACCESS_KEY_ID,secret_access_key=ALIBABA_CLOUD_ACCESS_KEY_SECRET,project = TABLE_PROJECT,endpoint = ODPS_ENDPOINT)odps_df = DataFrame(albums_tables)pt = 'ds=' + args['YYYY-MM-DD'] # read the dataworks params odps_df.persist(name=TABLE_NAME,partition=pt,odps=o,create_partition=True)#run
__main__()
Reference
- 阿里云,ListMetaCollections - 查询集合信息
https://help.aliyun.com/zh/dataworks/developer-reference/api-dataworks-public-2020-05-18-listmetacollections?spm=a2c4g.11186623.0.0.7acc43f9jyudaO
-
阿里云,ListMetaCollectionEntities - 查询集合中的实体
https://help.aliyun.com/zh/dataworks/developer-reference/api-dataworks-public-2020-05-18-listmetacollectionentities?spm=a2c4g.11186623.0.0.663143f9J7Ywoe