当前位置: 首页 > news >正文

基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据

本文尝试基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据。

一、用文心一言AI大模型将需求转化为样例代码

1、第一次对话:“python3写一段从rest服务器获取数据的样例代码”

同时生成了以下注解 

这段代码首先定义了一个函数 get_data_from_rest_server,该函数使用 requests.get 方法向指定的 URL 发送 GET 请求。如果请求成功(即响应的状态码为 200),则将响应的 JSON 数据返回。否则,它将打印出错误消息并返回 None。然后,我们调用这个函数来获取数据,并打印出获取到的数据。注意你需要将 url 替换为你的 REST 服务器的实际 URL。 

  2、第二次对话:“以上请求需带入服务器的认证参数”

同时生成了以下注解 

 在上述代码中,我们通过在 requests.get 方法中传递 auth=(username, password) 来提供服务器的认证参数。这样,当你发送请求时,requests 会自动将认证参数添加到请求中,从而完成身份验证过程。

3、第三次对话:“以上代码需代入请求消息头”

 

import requests  # 定义服务器认证参数的用户名和密码  
username = 'your_username'  
password = 'your_password'  def get_data_from_rest_server():  # 将此 URL 替换为你的 REST 服务器 URL  url = 'http://example.com/api/data'  # 定义请求头,可以包含自定义的头部字段和值  headers = {  'Content-Type': 'application/json',  # 声明请求的内容类型为 JSON  'Authorization': f'Basic {username}:{password}',  # 添加基本的身份验证头部字段  }  # 使用 GET 方法请求数据,并携带服务器认证参数和请求头  response = requests.get(url, headers=headers)  # 检查响应的状态码,如果请求成功(状态码为 200),则处理响应的数据  if response.status_code == 200:  data = response.json()  return data  else:  print(f"请求失败,状态码: {response.status_code}")  return None  # 调用函数获取数据  
data = get_data_from_rest_server()  
if data is not None:  print(data)

注解如下 :

在上述代码中,我们通过创建一个字典对象来定义请求头,其中包含了一个 Content-Type 字段和一个 Authorization 字段。Content-Type 字段声明了请求的内容类型为 JSON,而 Authorization 字段包含了基本的身份验证值。然后,我们将这个字典对象传递给 requests.get 方法的 headers 参数,从而将请求头添加到请求中。 

 通过和文心一言AI大模型的三次对话,我们大致了解了用python 的 requests 库从 REST 服务器获取数据的基本方法。

二、核查华为分布式块存储REST接口规范

1、RESTful API的请求必须包括如下消息头

2、拟测试请求接口官方样例

查询存储池 接口描述
查询存储池。
URI 路径
https://${ip}:${port}/dsware/service/cluster/storagepool/queryStoragePool
访问方法
GET
执行实例
Request: /dsware/service/cluster/storagepool/queryStorageNodeInfo?poolId=0
Response: {
"result": 0,
"storagePools": [
{
"poolId": 0,
"totalCapacity": 675305,
"usedCapacity": 0,
"reductionInvolvedCapacity": 0,
"allocatedCapacity": 0,
"usedCapacityRate": 0.0,
"deduplicationSaved": 0,
"compressionSaved": 0,
"deduplicationRatio": 1.0,
"compressionRatio": 1.0,
"dataReductionRatio": 1.0,
"thinRate": 0,
"replicationFactor": 3,
"poolName": "rep3",
"poolServerType": 1,
"thinThreshold": 70,
"poolSpec": "normal_pool",
"redundancyPolicy": "replication",
"numDataUnits": 0,
"numParityUnits": 0,
"numFaultTolerance": 0,
"cellSize": 0,
"ecCacheMediaType": "false",
"ecCacheRate": "0",
"ecCacheWbCacheRate": "0",
"compressionAlgorithm": "performance",
"enableAdvanceVolume": "true",
"encryptType": 0,
"supportEncryptForMainStorageMedia": 0,
"serviceType": 1,
"storageMediaType": "ssd_card",
"physicalTotalCapacity": 482361,
"usedCapacityAfterDedup": 0,
"writableCapacity": 482361,
"markDelCapacity": 0,
"bbuInfo": "close",
"fastGcTime": 0
}
]
}

三、代码实写

根据需求进行代码编写,并多次测试,最终通过的代码如下(以下代码存为名为“healthck.py”的文件):
#!/usr/bin/python3
#coding=utf-8##__author__='daijianbing'import requests
import jsondef authenticate(username, password,url_auth):# 构建认证数据auth_data = {'user_name': username,'password': password}# 将认证数据转换为JSON格式auth_json = json.dumps(auth_data)#禁止未经验证的HTTPSrequests.packages.urllib3.disable_warnings()# 发送POST请求到REST API进行身份验证response = requests.post(url_auth,verify = False, data=auth_json, headers={'Content-Type': 'application/json'})# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常if response.status_code == 200:#print(response.json())return response.json()['data']['x_auth_token']else:connect = 0raise Exception('Authentication failed')def queryStoragePool(token,url):# 构建认证数据headers = {'Connection': 'keep-alive','X-Auth-Token': token}#print(headers)# 发送请求到REST APIresponse = requests.get(url, verify = False, headers=headers)# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常if response.status_code == 200:#print(response.json())return response.json()['storagePools']else:raise Exception('Authentication failed')def queryPoolstatus(token,url,poolId):# 构建认证数据headers = {'Connection': 'keep-alive','X-Auth-Token': token}params = {'poolId':str(poolId),'vbs': 'true'}params_json = json.dumps(params)#print("url:",url,"headers:",headers,"params_json:",params_json)# 发送T请求到REST APIresponse = requests.get(url, verify = False, params=params, headers=headers)# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常if response.status_code == 200:#print(response.json())return response.json()['status']else:raise Exception('Authentication failed')def queryAlarms(token,url):# 构建认证数据headers = {'Connection': 'keep-alive','X-Auth-Token': token}params = {'filter':"alarmStatus::1",}params_json = json.dumps(params)# 发送请求到REST APIresponse = requests.get(url, verify = False, params=params, headers=headers)# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常if response.status_code == 200:#print(response.json())return response.json()['data']else:raise Exception('Authentication failed')# 使用示例  
username = 'zabbix'  
password = 'passwd'
hostip = '192.168.0.1'
port = '8088'
hostport = "https://"+hostip+":"+port
url_auth = hostport+'/api/v2/aa/sessions'
checkstatus = 1
requests.packages.urllib3.disable_warnings()
try:token = authenticate(username, password, url_auth)print('x_auth_token:', token)url_pool = hostport+'/dsware/service/resource/queryStoragePool'pools = queryStoragePool(token,url_pool)url_poolstatus=hostport+'/dsware/service/cluster/storagepool/queryStatus'for pool in pools:poolstatus=queryPoolstatus(token,url_poolstatus,pool['poolId'])url_alarms = hostport+'/api/v2/common/alarm_count'Alarms=queryAlarms(token,url_alarms)print('poolid:',pool['poolId'],'poolname:',pool['poolName'],'poolstatus:',poolstatus,'alarmcount:',Alarms['count'])
except:checkstatus = 0
finally:print("checkstatus:",checkstatus)print("chech over")

四、验证运行

实际运行:

# python3 --version
Python 3.9.9
# python3 healthck.py   
x_auth_token: MzMwMDE0OTkyNkEzMDAxNGUwOTcwMzE2
poolid: 0 poolname: Pool01 poolstatus: 0 alarmcount: 0
checkstatus: 1
chech over

本段代码实现了存储REST接口认证登录,查询全部的pool并列出poolid,poolname,pool状态,输出当前未恢复告警数并返回本次的检查状态(1为检查成功)。

http://www.lryc.cn/news/240147.html

相关文章:

  • 2022-4-11 南科大现代控制与最优估计
  • 【注册Huggingface】获取token
  • 【蓝桥杯软件赛 零基础备赛20周】第4周——简单模拟1
  • 使用OpenCV将图像转换为NV12格式并加载NV12数据
  • 【Lodash】 Filter 与Map 的结合使用
  • python命令行 引导用户填写可用的ip地址和端口号
  • 【小黑送书—第九期】>>重磅!这本30w人都在看的Python数据分析畅销书:更新了!
  • 关于APP备案的通知以及APP备案的常见问题
  • iOS 17.0 YYText 崩溃处理
  • 微信小程序面试题【100道】
  • 【nlp】2.8 注意力机制拓展
  • mysql 存储引擎ROWS与实际行数不一致
  • 软考小记-软件工程
  • 【开源】基于Vue和SpringBoot的创意工坊双创管理系统
  • COBOL排序问题
  • 数字化转型过程中面临最大的问题是什么?如何借助数字化工具实现快速转型?
  • 视频剪辑有妙招:批量置入封面,轻松提升视频效果
  • Java查询多条数据放入word模板 多个word文件处理成zip压缩包并在前端下载.zip文件
  • PC8223(CC/CV控制)高耐压输入5V/3.4A同步降压电路内建补偿带恒流恒压输出
  • 【webrtc】ModuleRtpRtcpImpl2: RtpRtcp DEPRECATED_Create 废弃了
  • 八股文面试day5
  • 数据处理生产环境_获取当前日期的前一天日期
  • 5.过滤敏感词 + 发布帖子 + 帖子详情
  • 大数据基础设施搭建 - Flume
  • 华为OD机试 - 找朋友(Java 2023 B卷 100分)
  • ESP32 MicroPython 颜色及二维码识别⑫
  • 数据结构与算法编程题15
  • 基于Mapmost Alpha工具快速搭建3D场景可视化大屏
  • OpenAI再次与Sam Altman谈判;ChatGPT Voice正式上线
  • 技术是增长关键驱动!传音控股新专利亮相,看未来手机趋势