import time import jsonpath import pymysql.err import warnings import pytest import requests import re import os import urllib3.exceptions from Utils import config_handler from Utils import global_variate from Utils import path_handler from Utils import websocket_handler from Utils.log_handler import logger class ReqHandler: def params_handler(self,case,var_class,needReplace=False): ''' 用来处理替换${}变量的方法 :param case: 传入需要替换变量的用例字典 :param var_class: 传入对应的变量类(变量类必须先继承Global公用变量类) :return: 返回处理好的case字典 ''' try: if case.get('replace'): needReplace = True wordList = case['replace'] newWordList = [] case = str(case) replace_words = re.findall(r'\$\{(.+?)\}',case) if not replace_words: return eval(case) else: for word in replace_words: try: value = getattr(var_class,word) #找到正确的变量值 if not isinstance(value,str): case = case.replace(r"'${%s}'"%word,str(value)) #替换掉${}变量符 else: case = case.replace(r"${%s}"%word,str(value)) #替换掉${}变量符 if re.findall(r'\$\{(.+?)\}',case): #保底 case = case.replace(r'${%s}'%word,str(value)) if needReplace: for word in wordList: case = case.replace(value,'') newWordList.append({word:f'''{value}'''}) except Exception: import traceback print(traceback.print_exc()) print(f'{word}变量未找到') raise Exception if needReplace: case = eval(case) for newWord in newWordList: for key,value in newWord.items(): case['data'][key] = value return case else: return eval(case) except SyntaxError: print('内容可能含有单引号或者换行导致报错,用例:',case) raise Exception @classmethod def send_requests(self,case,var_class): try: if isinstance(case,dict): title = case['title'] if case.get('before_sql'): sql = case['before_sql'] while sql[0] == '\\' or sql[0] == '/': sql = sql[1:] with open(os.path.join(path_handler.TestFile_dir,sql).replace('\\','/'),mode='r',encoding='utf8') as f: content = f.read() exec(content) case = self.params_handler(self,case=case,var_class=var_class) if case.get('ws'): #识别到ws字段后,直接走ws_request函数,不继续往下面走 self.ws_requests(self,case) if case.get('after_sql'): sql = case['after_sql'] while sql[0] == '\\' or sql[0] == '/': sql = sql[1:] with open(os.path.join(path_handler.TestFile_dir, sql).replace('\\', '/'), mode='r', encoding='utf8') as f: content = f.read() exec(content) if case.get('sleep'): time.sleep(float(case['sleep'])) return 1 time.sleep(1) #https请求延时1秒,避免打满连接池 if 'http' in case['url']: url = case['url'] else: if case['url'][0] == '/': url = config_handler.base_config.get_value('url','test_address') + case['url'] else: url = config_handler.base_config.get_value('url','test_address') + '/' + case['url'] # url = case['url'] if 'http' in case['url'] else config_handler.base_config.get_value('url','test_address') + case['url'] #判断是否有域名,没有的话给config文件中的默认测试域名 method = case['method'] expected = case['expected'] if case.get('data'): data = case['data'] else: data = {} auth = getattr(global_variate.Global,'access_token') if 'c端' not in title else getattr(global_variate.Global,'customerToken') headers = {'Authorization' : auth,'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'} #如果没有请求头,默认赋予一个带token的head if case.get('headers'): headers.update(case['headers']) if method == 'post': if headers.get('content-type') == 'application/x-www-form-urlencoded' or headers.get('content-type') == 'text/xml': res = requests.request(url=url,method=method,data=data,headers=headers) elif headers.get('content-type') == 'multipart/formdata': res = requests.request(url=url,method=method,files=data,headers=headers) else: res = requests.request(url=url,method=method,json=data,headers=headers) else: res = requests.request(url=url,method=method,data=data,headers=headers) self.assert_handler(self,res=res,expected=expected) if case.get('set_value'): self.set_value_handler(self,res=res,item=case['set_value'],var_class=var_class) if case.get('after_sql'): sql = case['after_sql'] while sql[0] == '\\' or sql[0] == '/': sql = sql[1:] with open(os.path.join(path_handler.TestFile_dir, sql).replace('\\', '/'), mode='r', encoding='utf8') as f: content = f.read() exec(content) if case.get('sleep'): time.sleep(float(case['sleep'])) logger.info(f'{title}用例执行成功') else: raise TypeError('用例格式有误') except AssertionError: logger.error(f'{title}用例执行失败,失败原因:断言不通过\n用例{case}\n期望值{expected}\n实际返回值{res.text}') # print(f'{title}用例执行失败,失败原因:断言不通过\n用例{case}\n期望值{expected}\n实际返回值{jsonpath.jsonpath(res.json())}') raise AssertionError except pymysql.err.InterfaceError: print('数据库连接失败') raise pymysql.err.InterfaceError except urllib3.exceptions.MaxRetryError: time.sleep(30) #超限时,暂停30秒 print('接口超限1') self.send_requests(case,var_class) except requests.exceptions.ConnectionError: time.sleep(30) #超限时,暂停30秒 print('接口超限2') self.send_requests(case,var_class) except requests.exceptions.SSLError: time.sleep(10) # 超限时,暂停30秒 print('接口超限3') self.send_requests(case, var_class) except Exception as e: import traceback print(traceback.print_exc()) logger.error(f'{title}用例执行失败,失败原因:{e}') print(f'{title}用例执行失败,失败原因:{e}\n用例{case}') raise Exception def ws_requests(self,case): ''' 调用websocket服务的入口 ws填willdesk或customer,willdesk代表接下来的动作要用willdesk端操作,customer代表c端 ''' ws = case['ws'] # 操作端 if ws not in ['willdesk', 'customer']: raise Exception('websocket操作端填写有误') websocket = websocket_handler.ws_willdesk if ws == 'willdesk' else websocket_handler.ws_client # websocket_handler.ws_run(ws=websocket,case=case) websocket_handler.loop.run_until_complete(websocket_handler.ws_run(ws=websocket, case=case)) def assert_handler(self,res,expected): ''' 处理断言的函数 :param res: 传入返回体 :param expected: 传入期望结果 :return: ''' try: assert res.status_code == 200 #先断言状态码是正确的 time = res.elapsed.total_seconds() if time >= 5: warnings.warn(f'接口响应时间过长,耗时{str(time)}秒,接口地址:{res.url}',RuntimeWarning) #告警接口耗时过长 logger.error(f'接口响应时间过长,耗时{str(time)}秒,接口地址:{res.url}') if isinstance(expected,list): ''' 穿进来的expected一定是 {"eq":{$.code : value}} 的形式,要断言的值一定是通过jsonpath去检索的 ''' for item in expected: #循环每一个断言场景 for item_key,item_value in item.items(): assert_way = item_key path = list(item_value.keys())[0] #拿到path路径 value = list(item_value.values())[0] if 'in_list' not in assert_way: res_path_value = jsonpath.jsonpath(res.json(),path)[0] if str(path)[0:2] == '$.' else path else: res_path_value = jsonpath.jsonpath(res.json(),path) if 'in' in assert_way and isinstance(value,dict): value = str(value) res_path_value = str(value) if isinstance(res_path_value,str) and '{"contentList":' in res_path_value: res_path_value = res_path_value.replace('\\','') #兼容websocket,当遇到包含{"contentList":内容的值,且类型是字符串时,会自动去除所有\号 if assert_way == 'eq': #eq代表完全相同 assert value == res_path_value elif assert_way == 'like': #代表值相同但是type可能不同 assert str(value) == str(res_path_value) elif assert_way == 'not_eq': #not_eq代表不相同 assert value != res_path_value elif assert_way == 'almost': #almonst代表四舍五入后相同就判断为一致 assert round(value,2) == round(res_path_value,2) elif assert_way in ['in','in_list']: #in代表断言值在返回值中 assert value in res_path_value elif assert_way in ['ain','ain_list']:#ain代表返回值在断言值中 assert res_path_value in value elif assert_way in ['not_in','not_in_list']: assert value not in res_path_value elif assert_way in ['a_not_in','a_not_in_list']: assert res_path_value not in value elif assert_way == 'exec': exec(value) except AssertionError: print(f'期望值为{value}({type(value)}),实际返回值为{res_path_value}({type(res_path_value)}),完整的expected体为{expected}') raise AssertionError except Exception: print(f'断言值:{item},返回值:{res.text}') raise Exception def set_value_handler(self,res,item,var_class): ''' :param res: 传response请求体 :param item: 以{key名 : jsonpath路径,key名2 : jsonpath路径} 的格式传入要设置的值 :return: ''' for k,v in item.items(): value = jsonpath.jsonpath(res.json(),v) if value: setattr(var_class, k, value[0]) else: raise Exception(f'没有找到{k}值,对应jsonpath{v}')