接口自动化--commons内容详解-02
上篇文章主要讲解了接口自动化主要架构框架,这篇文庄主要讲解commons中的内容
1. requests_utils.py
首先讲解这个工具类,主要是因为在接口自动化中,基本都有的接口都是发送请求,获取响应结果,唯一不同的是,每个接口的内容不一样,发送接口的方式不一样,针对这方面就做接口自动化的封装。
思路: 使用requests.session()发送请求,发送请求的内容是字典格式,并且里面的内容是不一样的所以我们会使用关键字参数,发送内容有两种形式,有可能是json格式,也有可能是文件上传格式,要对这部分的数据进行处理,具体代码如下
#生成日志对象
logger = logging.getLogger(__name__)class RequestUtil:sess= requests.session() #使用requests.session()比requests.request()函数对于接口中有cookies的def send_all_request(self,**kwargs):for key,value in kwargs.items():if key=="files":for file_key, file_value in value.items():value[file_key] = open(file_value, "rb")elif key=='data':new_data=json.dumps(kwargs['data']) #对于data的值是json格式要转换成字典格式kwargs['data']=new_datares=RequestUtil.sess.request(**kwargs)logger.info(res.text)return res
2. yaml_util.py
在运行接口自动化用例的时候,要有对应的测试用例去运动,也会有一些配置要读取,因为他们都是使用yaml进行维护的,所以读取yaml中的数据要进行处理
思路: 首先要读取yaml中的内容,针对有接口关联的要借助第三方文件讲提取的字段写入到第三方文件中,这个时候要写入yaml文件,在每次执行完测试用例以后要讲yaml文件中的数据清除,所以也要有清除yaml文件的操作,还有一些特殊的操作,比如我们在做流程测试用例的时候,一个流程测试用例中可能有多个接口,其中某一个接口要执行多次,并且每次执行的内容页不一样,这个时候就要对这个接口做参数化,这个时候也要对写入yaml做具体的处理,这部分后面会专门讲解
常用的yaml处理内容,代码如下:
import yaml
#读取yaml
def read_yaml(key):with open("../extract.yaml",encoding='utf-8') as f:value=yaml.safe_load(f)return value[key]
#读取extract.yaml里面所有的值
def read_all():try:with open("../extract.yaml", encoding='utf-8') as f:return yaml.safe_load(f) or {} # 如果文件为空或不存在,则返回空字典except FileNotFoundError:return {}#写入yaml
def write_yaml(data):with open("../extract.yaml", 'w', encoding='utf-8') as f:yaml.safe_dump(data, f, allow_unicode=True, sort_keys=False, default_flow_style=False)
def handle_yaml(key, value):s = read_all()if key in s:if isinstance(s[key], list):s[key].append(value)else:s[key] = [s[key], value]else:s[key] = valuewrite_yaml(s)
#清空
def clean_yaml():with open("../extract.yaml", encoding='utf-8', mode="w") as f:pass#读取yaml测试用例
def read_testcase(yaml_path):with open(yaml_path) as f:value = yaml.safe_load(f)return value
3.path_util.py
路径的处理,因为在接口自动化中一个代码可能在不同的环境中,所以使用绝对路径就会导致换个地址就无法运行,针对这种情况这边可以使用相对路径,对项目路径做统一处理,先获取当前项目的路径,然后其他路径都是在项目路径进行拼接,具体代码如下:
import os
from pathlib import Path
#项目路径
project_path=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
#测试报告临时存放位置
tempath=os.path.join(project_path,r'reports\temp')
#测试报告存放位置
reports_path=os.path.join(project_path,r'reports')
#测试用例路径
testcases_path=os.path.join(project_path,'testcases')
testcase_path = Path(testcases_path)
#日志存储路径
logs_path=os.path.join(project_path,r'1ogs')
4. model_util.py
测试用例因为都是使用yaml格式进行校验的,所以针对yaml格式也要做一些基本的校验,如果不符合测试用例指定的格式,就直接报错
from dataclasses import dataclass"""
作用:用来校验测试用例是否符合规则
@dataclass:表明CaseInfo是数据类目的:不用写init()初始方法,创建对象并且传入参数就可以自动赋值
"""
@dataclass
class CaseInfo:# 必填feature: strstory: strtitle: strrequest: dictvalidate: dict# 选填extract: dict = Noneparamtrize: list = None
"""
作用:校验测试用例
caseinfo :接收测试用例字典
**caseinfo: 将测试用例字典,解包为键值对,例如: 'name': 'HYY'
new_caseinfo:接收创建的CaseInfo()对象
"""
def verify_yaml(caseinfo: dict):try:new_caseinfo = CaseInfo(**caseinfo)return new_caseinfoexcept Exception:raise Exception("YAML测试用例不符合框架的规范!")
5. loging_util.py
接口自动化中测试用例的执行的日志也需要记录,这样就可以在执行报错的时候找出到底哪里报错,所以对日志的封装代码如下:
#对统一的日志进行处理
from loguru import logger
from datetime import datetimefrom commons.path_util import logs_pathclass ApiAutoLog():#利用loguru封装接口自动化项目日志记录器def __new__(cls, *args, **kwargs):log_name = datetime.now().strftime("%Y-%m-%d") # 以时间命名日志文件,格式为"年-月-日"sink = f'{logs_path}/{log_name}' # 日志记录文件路径level = "DEBUG" # 记录的最低日志级别为DEBUGencoding = "utf-8" # 写入日志文件时编码格式为utf-8enqueue = True # 多线程多进程时保证线程安全rotation = "500MB" # 日志文件最大为500MB,超过则新建文件记录日志retention = "1 week" # 日志保留时长为一星期,超时则清除logger.remove(handler_id=None) #控制日志是否在控制台打印,这个代表不在控制台打印logger.add(sink=sink, level=level, encoding=encoding,enqueue=enqueue, rotation=rotation, retention=retention)return logger
log=ApiAutoLog()
def caselog(title,inBoby,res):log.info(f'【{title}】请求参数为:{inBoby}')log.info(f'【{title}】返回结果为:{res}')
6. assert_util.py
针对每个接口都有个断言需要进行处理,这边就针对断言进行统一封装
import copy
import pymysqlclass AssertUtil:def assert_all_case(self, res, assert_type, value):# 1.深拷贝个resnew_res = copy.deepcopy(res)# 2.把json()方法改成json属性try:new_res.json = new_res.json()except Exception as e:new_res.json = {"msg": 'response not json data'}raise e# 3.循环判断断言for msg, yq_and_sj_data in value.items():yq, sj = yq_and_sj_data[0], yq_and_sj_data[1]# 根据属性获取到属性的值try:sj_value = getattr(new_res, sj)# print("++++", assert_type, msg, yq, sj_value)except Exception:sj_value = sjif assert_type == "equals":# print("进入了equals")assert yq == sj_value, msgif assert_type == "contains":# print("进入了contains")assert yq in sj_value, msgif assert_type == "db_equals":yq_value = self.execute_sql(yq)assert yq_value[0] in sj_value,msgif assert_type == "db_contains":yq_value = self.execute_sql(yq)assert yq_value[0] in sj_value,msg
7.ddt_util.py(后续单独一篇文章详解)
测试用例中单个测试用例会出现正例,反例或者一个接口会执行多次,每次执行的接口一样但是内容不一样,这个时候就会用到参数化,针对参数化处理,代码如下:
import yaml
#读取yaml测试用例
from commons.model_util import verify_yamldef read_testcase(yaml_path):with open(yaml_path) as f:case_list = yaml.safe_load(f)if len(case_list)>=2: #如果列表的长度大于2就不止一个接口,此时就判断是流程测试用例return [case_list]else:if "paramtrize" in dict(*case_list).keys(): #判断yaml文件中是否有parametrize参数,如果有就走数据驱动流程new_caseinfo=ddts(*case_list)return new_caseinfoelse:return case_list #如果yaml文件中没有parametrtze参数,就是一个单接口的用例读取
#对ddt数据的处理
def ddts(caseinfo:dict):# 数据驱动:返回[{正例},{反例},{反例}]# parametrize:# -["username","password"]# -["baili","6ai11123"]# - ["baili", "baili"]# -["bai1i123","bai1i123"]data_list=caseinfo["paramtrize"]# print(data_list) # [['username','password'],['baili','baili123'],['baili',"baili']]#先对这个参数化的列表的每一行的长度进行校验,如果长度校验通过就是走参数化,如果长度校验不通过就不是参数化len_flag=Truename_len=len(data_list[0]) #获取[['username','password'],['baili','baili123'],['baili',"baili']]这个列表的第一个值的长度,第一个值就是需要参数化的变量名,列表的剩下的值都要跟这个值保持一直for data in data_list:if len(data)!=name_len:len_flag=Falseprint("paramtize的长度不一致请检查")break#如果长度没有问题str_caseinfo= yaml.dump(caseinfo)new_caseinfo=[]if len_flag:for x in range(1,len(data_list)): #读取值先从第二行进行读取也就是下标是1的进行读取因为第一行是参数的名字raw_caseinfo=str_caseinfofor y in range(0,name_len):# print(data_list[0][y],data_list[x][y]) #打印的第一个参数是第一行的第一列username,baili;password,baili123#针对参数化中有数字但是是字符串的进行处理if isinstance(data_list[x][y],str) and data_list[x][y].isdigit():data_list[x][y]="'"+data_list[x][y]+"'"raw_caseinfo = raw_caseinfo.replace("$ddt{" + data_list[0][y] + "}", str(data_list[x][y]))case_dict=yaml.safe_load(raw_caseinfo)case_dict.pop("paramtrize")new_caseinfo.append(case_dict)return new_caseinfo
8. extract_util.py(后续单独讲解)
import copy
import json
import reimport jsonpath
import yamlfrom commons.yaml_util import write_yaml, read_all
from hotload.debug_talk import DebugTalkclass ExtractUtil:# 解析提取变最def extract(self,res,var_name,attr_name,expr:str,index=None):#深拷贝,将res内容拷贝到new_res对象中去【注意:深拷贝出来的 是一个对象Object】# print(f"res打印{res}")new_res = copy.deepcopy(res)# 把json()改成json属性try:# 将json格式转化为字典格式,new_res.json 是 自己创建的变量new_res.json = new_res.json()except Exception:new_res.json = {"msg": 'response not json data'}# 通过反射获取属性的值data = getattr(new_res, attr_name)# print(f"data===============:{data}")# 判断通过什么提取方式提取数据if expr.startswith("$"):lis = jsonpath.jsonpath(dict(data), expr)# print("\nJSONPath提取返回来的值:", lis)else:lis = re.findall(expr, data)# print("\n正则表达式返回来的值:",lis)# 通过下标取值if lis:s=read_all()if index == None:if var_name in s:if isinstance(s[var_name], list):s[var_name].append(lis)else:s[var_name] = [s[var_name], lis]else:s[var_name]=liselse:if var_name in s:if isinstance(s[var_name], list):s[var_name].append(lis[index])else:s[var_name] = [s[var_name], lis[index]]else:s[var_name]=lis[index]write_yaml(s)# if index==None:# write_yaml({var_name: lis})# #更新yaml文件中的token值,list是你指定text或者json属性获取的内容# else:# write_yaml({var_name:lis[index]})#解析使用变量,把¥{access_token}替换从extract.yaml里面提取具体的值def change(self,request_data:dict):# 把字典转换成字符串data_str= yaml.safe_dump(request_data)# 字符串替换new_request_dat= self.hotload_replace(data_str)# 3.把字符串还原成字典data_dict = yaml.safe_load(new_request_dat)return data_dict#使用热加载方法,让yaml文件中可以使用python的一些方法def hotload_replace(self, data_str: str):# 定义一个正则匹配这种表达式# regexp ="\\$\V{(.*?)\\}"regexp = "\\$\\{(.*?)\\((.*?)\\)\\}" # ${函数名(参数)}# 通过正则表达式在data_str字符串中去四配,得到所有的表达式list"${name} is a ${token}"fun_list = re.findall(regexp, data_str)# print(fun_list) # [('number',"),('token','1,2')]for f in fun_list:# print(f)if f[1] == "": # 没有参数new_value = getattr(DebugTalk(), f[0])() #到DebugTalk()中找到以f[0]命名的函数进行调用获取得到的值# print("value1:%s" % new_value)else: # 有参数,1-N个参数new_value = getattr(DebugTalk(), f[0])(*f[1].split(","))# 如果value是一个数字格式的字符串if isinstance(new_value, str) and new_value.isdigit():new_value = "'" + new_value + "'"# 拼接旧的值old_value = "${" + f[0] + "(" + f[1] + ")}"# 把旧的表达式替换成函数返回的新的值data_str = data_str.replace(str(old_value), str(new_value))return data_str
9.main_util.py
一个测试用例执行流程:如果入参里面是上一个接口的响应,第一步数据替换,第二步发送请求,第三步获取响应,第四步响应中数据提取,第五步断言判断
from commons.assert_util import AssertUtil
from commons.extract_util import ExtractUtil
from commons.loging_util import caselog
from commons.requests_utils import RequestUtileu = ExtractUtil()
ru = RequestUtil()
au = AssertUtil()
def stand_case_flow(case_obj):new_request = eu.change(case_obj.request)# 发送请求res = RequestUtil().send_all_request(**new_request)caselog(case_obj.title, new_request, res.json())if case_obj.extract:for key, value in case_obj.extract.items():eu.extract(res, key, *value)if case_obj.validate:for assert_type, value in eu.change(case_obj.validate).items():au.assert_all_case(res, assert_type, value)else:print("此用例没有围言!")