Commit 85a683b6 authored by zhanhuasheng's avatar zhanhuasheng

接口自动化提交

parents
[url]
#测试环境域名
test_address = https://allplatformtest.sealapps.com
#生产环境域名
formal_address = https://api.willdesk.com
#账号
[account]
#测试环境账号
test_account = zhanhuasheng@channelwill.cn
#测试环境密码
test_password = a313248705
#生产环境账号
formal_account = fkngznzqbn@iubridge.com
#生产环境密码
formal_password = 123456789A
[mysql]
#地址
address = 47.251.1.100
#端口
port = 3306
#账号
account = willdesk
#密码
password = uitSElgrwLp6vFG
\ No newline at end of file
import pytest
from Utils import yaml_handler
from Utils import req_handler
from TestData.account.account_test_data import accountData
class TestAccount:
yaml_path = r'account/account.yaml'
yaml_data = yaml_handler.yaml_handler.get_case(yaml_path)
@pytest.mark.parametrize('case',yaml_data)
def test_account(self,case):
# case = req_handler.ReqHandler.params_handler(case=case,var_class=accountData) #替换变量
req_handler.ReqHandler.send_requests(case=case,var_class=accountData)
print(case)
\ No newline at end of file
import pytest
from Utils import yaml_handler
from Utils import req_handler
from TestData.billing.buyPlan_test_data import buyPlanData
class TestBuyPlan:
yaml_path = r'billing\buyPlan.yaml'
yaml_data = yaml_handler.yaml_handler.get_case(yaml_path)
@pytest.mark.parametrize('case',yaml_data)
def test_buy_plan(self,case):
case = req_handler.ReqHandler.params_handler(case=case,var_class=buyPlanData) #替换变量
req_handler.ReqHandler.send_requests(case=case,var_class=buyPlanData)
print(case)
\ No newline at end of file
from Utils.global_variate import Global
class accountData(Global):
origin_language = 'en' #原始语言
target_language = 'zh-CN' #要翻译的语言
old_name = 'zhs' #旧名字
new_name = 'auto_test' #新名字
trans_key = 'account.account'
\ No newline at end of file
from Utils.global_variate import Global
import datetime
class buyPlanData(Global):
numItem = [1,2,10] #有额度限制的服务
today = str(datetime.datetime.now().date())
next_month = str(datetime.date.today() + datetime.timedelta(days=30))
from Utils.sql_handler import test_env_conn
from TestData.account.account_test_data import accountData
select_sql = f'select * from willdesk.customer_service where id = {accountData.userId} '
select_result = test_env_conn.select_one_value(sql=select_sql)
assert select_result['local_lang'] == accountData.target_language
\ No newline at end of file
from Utils.sql_handler import test_env_conn
from TestData.account.account_test_data import accountData
select_sql = f'select * from willdesk.translate_result where trans_key = "{accountData.trans_key}" and trans_lang = "{accountData.target_language}"'
select_result = test_env_conn.select_one_value(sql=select_sql)
setattr(accountData,'originValue',select_result['original_value'])
setattr(accountData,'transValue',select_result['trans_value'])
from Utils.sql_handler import test_env_conn
from TestData.billing.buyPlan_test_data import buyPlanData
delete_plan_sql = f'delete from willdesk.business_plan_current_used where `comp_id` = {buyPlanData.companyId} and status in (1,2)' #删除当前套餐
delete_item_sql = f'delete from willdesk.business_plan_current_used_item_detail where `comp_id` = {buyPlanData.companyId} and status in (1,2)' #删除当前的套餐
test_env_conn.execute_sql(delete_plan_sql)
test_env_conn.execute_sql(delete_item_sql)
from Utils.sql_handler import test_env_conn
from TestData.billing.buyPlan_test_data import buyPlanData
select_sql = f'select * from willdesk.business_plan_current_used where `comp_id` = {buyPlanData.companyId} and `status` = 2'
result = test_env_conn.select_one_value(sql=select_sql) #检查当前套餐是否为免费
if result['plan_id'] == buyPlanData.FreePlanId:
update_plan_sql = f'update willdesk.business_plan_current_used set `plan_id` = {buyPlanData.ProPlanId} where `comp_id` = {buyPlanData.companyId} and `status` = 2'
test_env_conn.execute_sql(sql=update_plan_sql) #如果是免费套餐,手动update成付费
# now_active_plan = test_env_conn.select_one_value(sql=f'select * from willdesk.business_plan_current_used where `comp_id` = {buyPlanData.companyId} and `status` = 2')
for num in range(1,38):
number = 2000 if num in buyPlanData.numItem else -1
update_item_sql = f'update willdesk.business_plan_current_used_item_detail set amount = {number},plan_id = {buyPlanData.ProPlanId} where `comp_id` = {buyPlanData.companyId} and `status` = 2 and `item_id` = {num};'
test_env_conn.execute_sql(sql=update_item_sql)
\ No newline at end of file
import configparser
import os
from Utils import path_handler
class configHandler:
def __init__(self,file_path):
self.conf = configparser.ConfigParser()
if not os.path.exists(file_path):
self.config_file = open(file_path,mode='w',encoding='utf8')
self.conf.read(file_path,encoding='utf8')
def get_value(self,section,option):
value = self.conf.get(section=section,option=option)
return value
base_config = configHandler(path_handler.Config_dir+'/base_config.ini')
class Global:
'''
公用变量类,所有的子变量类继承该类,存取公用变量,公用变量类默认存token等基础信息
'''
import os
Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Utils_dir = os.path.join(Base_dir,'Utils')
TestCase_dir = os.path.join(Base_dir,'TestCase')
Config_dir = os.path.join(Base_dir,'Config')
TestFile_dir = os.path.join(Base_dir,'TestFile')
TestData_dir = os.path.join(Base_dir,'TestData')
YamlCase_dir = os.path.join(Base_dir,'YamlCase')
import asyncio
import json
import jsonpath
import requests
import re
import os
from Utils import config_handler
from Utils import global_variate
from Utils import path_handler
from Utils import websocket_handler
class ReqHandler:
def params_handler(self,case,var_class):
'''
用来处理替换${}变量的方法
:param case: 传入需要替换变量的用例字典
:param var_class: 传入对应的变量类(变量类必须先继承Global公用变量类)
:return: 返回处理好的case字典
'''
case = str(case)
replace_words = re.findall(r'\$\{(.+?)\}',case)
if not replace_words:
return eval(case)
else:
for word in replace_words:
try:
value = getattr(var_class,word) #找到正确的变量值
if not isinstance(value,str):
case = case.replace(r"'${%s}'"%word,str(value)) #替换掉${}变量符
else:
case = case.replace(r"${%s}"%word,str(value)) #替换掉${}变量符
except Exception:
import traceback
print(traceback.print_exc())
print(f'{word}变量未找到')
raise Exception
return eval(case)
@classmethod
def send_requests(self,case,var_class):
if isinstance(case,dict):
if case.get('before_sql'):
sql = case['before_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir,sql).replace('\\','/'),mode='r',encoding='utf8') as f:
content = f.read()
exec(content)
case = self.params_handler(self,case=case,var_class=var_class)
title = case['title']
if case.get('ws'):
#识别到ws字段后,直接走ws_request函数,不继续往下面走
self.ws_requests(self,case)
if case.get('after_sql'):
sql = case['after_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir, sql).replace('\\', '/'), mode='r',
encoding='utf8') as f:
content = f.read()
exec(content)
return
url = case['url'] if 'http' in case['url'] else config_handler.base_config.get_value('url','test_address') + case['url'] #判断是否有域名,没有的话给config文件中的默认测试域名
method = case['method']
expected = case['expected']
if case.get('data'):
data = case['data']
else:
data = {}
headers = {'Authorization' : getattr(global_variate.Global,'access_token'),'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'} #如果没有请求头,默认赋予一个带token的head
if case.get('headers'):
for key,value in case['headers'].items():
headers[key] = value
if method == 'post':
if headers.get('content-type') == 'application/x-www-form-urlencoded' or headers.get('content-type') == 'text/xml':
res = requests.request(url=url,method=method,data=data,headers=headers)
elif headers.get('content-type') == 'multipart/formdata':
res = requests.request(url=url,method=method,files=data,headers=headers)
else:
res = requests.request(url=url,method=method,json=data,headers=headers)
else:
res = requests.request(url=url,method=method,data=data,headers=headers)
self.assert_handler(self,res=res,expected=expected)
if case.get('set_value'):
import random
async def test_recv():
data = json.dumps({
'action': "sendmsg",
'data': {
'avatar': "https://img.willdesk.com/${picUrl}",
'clientId': websocket_handler.ws_willdesk.client_id,
'fromId': '35948', #c端userId
'isNote': False,
'message': '{\"contentList\":\"testtesttest<br>\",\"attachmentList\":[]}',
'msgType': 'newstext',
'platform': 1,
'roomChannelType': 'wk',
'roomId': '502856393624136484',
'roomPlatform': 'willdesk',
'siteId': '7238', #店铺id
'uType': 2,
'uid': '5746', #userId
'username': 'zhs' #品牌名称
},
'seq': str(random.randint(1000000000000000, 9999999999999999))
})
await websocket_handler.ws_willdesk.send_message(data)
# await websocket_handler.ws_willdesk.receive_message()
asyncio.run(test_recv())
self.set_value_handler(self,res=res,item=case['set_value'],var_class=var_class)
if case.get('after_sql'):
sql = case['after_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir, sql).replace('\\', '/'), mode='r',
encoding='utf8') as f:
content = f.read()
exec(content)
else:
raise TypeError('用例格式有误')
def ws_requests(self,case):
'''
调用websocket服务的入口
ws填willdesk或customer,willdesk代表接下来的动作要用willdesk端操作,customer代表c端
'''
ws = case['ws'] # 操作端
if ws not in ['willdesk', 'customer']:
raise Exception('websocket操作端填写有误')
websocket = websocket_handler.ws_willdesk if ws == 'willdesk' else websocket_handler.ws_customer
websocket_handler.ws_run(ws=websocket,case=case)
def assert_handler(self,res,expected):
'''
处理断言的函数
:param res: 传入返回体
:param expected: 传入期望结果
:return:
'''
assert res.status_code == 200 #先断言状态码是正确的
if isinstance(expected,list):
'''
穿进来的expected一定是 {"eq":{$.code : value}} 的形式,要断言的值一定是通过jsonpath去检索的
'''
for item in expected: #循环每一个断言场景
for item_key,item_value in item.items():
assert_way = item_key
path = list(item_value.keys())[0] #拿到path路径
value = list(item_value.values())[0]
res_path_value = jsonpath.jsonpath(res.json(),path)[0]
if assert_way == 'eq': #eq代表完全相同
assert value == res_path_value
elif assert_way == 'not_eq': #not_eq代表不相同
assert value != res_path_value
elif assert_way == 'almost': #almonst代表四舍五入后相同就判断为一致
assert round(value,2) == round(res_path_value,2)
elif assert_way == 'in': #in代表断言值在返回值中
assert value in res_path_value
elif assert_way == 'ain':
assert res_path_value in value
def set_value_handler(self,res,item,var_class):
'''
:param res: 传response请求体
:param item: 以{key名 : jsonpath路径,key名2 : jsonpath路径} 的格式传入要设置的值
:return:
'''
for k,v in item.items():
value = jsonpath.jsonpath(res.json(),v)
if value:
setattr(var_class, k, value[0])
else:
raise Exception(f'没有找到{k}值,对应jsonpath{v}')
\ No newline at end of file
import pymysql
from Utils.config_handler import base_config
class SqlHandler:
def __init__(self,address,port,password,account,database = 'willdesk'):
self.address = address
self.port = port
self.password = password
self.account = account
self.database = database
self.conn = pymysql.connect(host=address,port=port,user=account,password=password,database=database,cursorclass=pymysql.cursors.DictCursor)
self.cursor = self.conn.cursor()
def select_one_value(self,sql):
self.cursor.execute(sql)
result = self.cursor.fetchone()
return result
def select_many_value(self, sql):
self.cursor.execute(sql)
result = self.cursor.fetchall()
return result
def execute_sql(self,sql):
try:
self.cursor.execute(sql)
self.conn.commit()
except Exception:
self.conn.rollback()
def close_db(self):
print('已关闭')
self.cursor.close()
self.conn.close()
test_env_conn = SqlHandler(address=base_config.get_value('mysql','address'),port=int(base_config.get_value('mysql','port')),account=base_config.get_value('mysql','account'),password=base_config.get_value('mysql','password'))
import asyncio
import websockets
import time,random,json,jsonpath
from Utils.global_variate import Global
class ws:
def __init__(self, url, attr):
self.url = url
self.websocket = None
self.loop = asyncio.get_event_loop()
self.client_id = ''
async def connect(self):
try:
self.websocket = await websockets.connect(self.url)
enter_res = eval(await self.websocket.recv())
if enter_res.get('action'):
if enter_res['action'] in ['readMsg','tipmsg']:
self.client_id = enter_res['data']['clientId']
elif enter_res['action'] =='sendmsg':
self.client_id = enter_res['data']['response']['clientId']
else: #action是heartbeat的情况
self.client_id = enter_res['data']['cliend']
else: #action是enter的情况
self.client_id = enter_res['data']['clientId']
# print(f"Connected to {self.url}")
except websockets.ConnectionClosed as e:
await self.close()
async def send_message(self, message):
if self.websocket and self.websocket.open:
try:
print('开始发送消息')
await self.websocket.send(message)
print(f"Sent message: {message}")
except websockets.ConnectionClosed as e:
await self.close()
except Exception:
print('error')
else:
# print('send重新链接')
await self.connect()
await self.send_message(message)
async def receive_message(self):
if self.websocket and self.websocket.open:
try:
message = await self.websocket.recv()
print(f"Received message: {message}")
return message
except websockets.ConnectionClosed as e:
print('receive关闭链接')
await self.close()
except NameError:
return message
else:
print('receive重新链接')
await self.connect()
return await self.receive_message()
def ws_sendmsg(self,case):
pass
async def close(self):
if self.websocket:
await self.websocket.close()
self.websocket = None
print("WebSocket connection closed")
async def ws_action(ws,case):
'''
:param ws: 传入websocket服务
:param case: 传入用例数据
'''
action = case['action'] #操作
expected = case['expected'] #断言
if action == 'sendmsg':
response = ws_send_handler(case=case,ws=ws)
assert response['response']['code'] == 200
assert response['action'] == 'sendmsg'
elif action == 'receive':
msg = await ws.receive_message()
while msg['action'] == 'heartbeat':
time.sleep(0.1)
msg = await ws.receive_message()
result = ws_assert(expected=expected,msg=msg)
assert result
def ws_send_handler(case,ws):
if case.get('file'):
'''预留文件处理区'''
data = case['data'] # 消息数据
msgType = 'newstext'
if data['type'] in ['text', 'emoji']:
content = r'{"contentList":"%s","attachmentList":[]}' % data['content']
elif data['type'] == 'picture':
content = r'{"contentList":"<div data-type="willdesk-image" class="willdesk-chat-image" contenteditable="false"><img src="https://img.willdesk.com/test/chat/5973/2024/03/06/078ee3ce18c91ab608c4949705d1cfef/png图片_1024×1024.png" data-width="1024" data-height="1024" data-size="219148" data-chattagtype="wd-img" style="max-height: 200px; max-width: 200px; vertical-align: bottom; margin: 2px 0px; border-radius: 4px; width: 200px; height: 200px;"></div> <br>","attachmentList":[]}'
elif data['type'] == 'file':
content = r'{"contentList":"","attachmentList":[{"src":"https://img.willdesk.com/test/chat/5973/2024/03/06/6e7d5240ef4f8ecec7430851f764b2ee/新建_文本文档.txt","blobSrc":"https://img.willdesk.com/test/chat/5973/2024/03/06/6e7d5240ef4f8ecec7430851f764b2ee/新建_文本文档.txt","name":"新建 文本文档.txt","size":0,"type":"text/plain"}]}'
elif data['type'] == 'article':
content = r'{"contentList":"<div data-type="willdesk-faq" style="background: rgb(255, 255, 255); border-radius: 8px; min-width: 230px; max-width: 100%; width: 300px; box-sizing: border-box; position: relative; border: 0.5px solid rgb(229, 229, 235);"><div style="padding: 12px 16px 0px; color: rgb(18, 17, 39); font-size: 16px; font-weight: 600; line-height: 24px; overflow: hidden; display: -webkit-box; text-overflow: ellipsis; -webkit-box-orient: vertical; -webkit-line-clamp: 2;">article title</div><div style="padding: 4px 16px 12px; color: rgb(108, 107, 128); font-size: 14px; max-height: 48px; font-weight: 400; line-height: 24px; overflow: hidden; display: -webkit-box; text-overflow: ellipsis; -webkit-box-orient: vertical; -webkit-line-clamp: 2;"><p>article_detail</p></div><a href="%s" title="faq" target="_blank" style="box-sizing: border-box; margin-top: 8px; text-decoration: none; padding: 8px 16px; background: rgb(243, 243, 246); color: rgb(44, 35, 229); font-size: 14px; font-family: Inter; line-height: 22px; width: 100%; border-radius: 0px 0px 8px 8px; display: inline-block;">View all →</a></div>","attachmentList":[]}' % \
data['content']
elif data['type'] == 'rate':
content = r'[&*{"reviewContent":"<p>Would you mind sharing your feedback on your experience so fa222r?</p>","thumbsReviews":{"good":"","bad":"FeedbackURL"},"startReviews":{"one_start":"FeedbackURL","two_start":"FeedbackURL","three_start":"FeedbackURL","four_start":"","five_start":"www.baidu.com"},"reviewTitle":"How was your chat with {AgentName}222?","reviewChoose":"thumbs","askSendReview":"true","autoSendReview":"false","primaryColor":"#2C23E5"}]'
msgType = 'rate'
elif data['type'] == 'link':
content = r'{"contentList":"<a class="wd-chat-link" data-type="willdesk-link" href="https://www.baidu.com" contenteditable="false" target="_blank">test</a>","attachmentList":[]}'
elif data['type'] == 'product':
content = r'{"contentList":"<a href="https://zhs-test.myshopify.com/products/selling-plans-ski-wax" title="https://zhs-test.myshopify.com/products/selling-plans-ski-wax" data-type="willdesk-product" target="_blank" style="display: flex; background: rgb(255, 255, 255); border-radius: 8px; min-height: 80px; max-height: 150px; min-width: 100px; max-width: 100%; width: 300px; box-sizing: border-box; position: relative; text-decoration: none; padding: 12px 16px; border: 1px solid rgb(231, 233, 236);"><img src="https://cdn.shopify.com/s/files/1/0690/4551/8586/products/snowboard_wax.png?v=1708331992" style="width: 110px; height: 110px; max-height: 150px; object-fit: cover; border-radius: 8px; margin-right: 4%; flex-shrink: 0; border: 0.5px solid rgb(231, 233, 236);"><div style="width: calc(100% - 118px); display: flex; flex-direction: column; position: relative; justify-content: center;"><div style="width: 100%; max-height: 48px; line-height: 24px; color: rgb(18, 17, 39); font-size: 14px; font-weight: bold; overflow: hidden; display: -webkit-box; text-overflow: ellipsis; -webkit-box-orient: vertical; -webkit-line-clamp: 2;">Selling Plans Ski Wax</div></div></a>","attachmentList":[]}'
elif data['type'] == 'coupon':
content = r'{"contentList":"<a class="wd-chat-link" data-type="willdesk-link" href="https://tt-zldev003.myshopify.com/discount/V-0QE73BP5" contenteditable="false" style="margin-right: 8px" target="_blank">优惠券</a>","attachmentList":[]}'
fromId = data['fromId'] if data.get('fromId') else getattr(Global, 'customerId')
siteId = data['siteId'] if data.get('siteId') else getattr(Global, 'shopId')
uid = data['uid'] if data.get('uid') else getattr(Global, 'userId')
username = data['username'] if data.get('username') else getattr(Global, 'name')
isNote = data['isNote'] if data.get('isNote') else False
roomId = data['roomId']
messageData = {
'action': 'sendmsg',
'data': {
'avatar': "https://img.willdesk.com/${picUrl}",
'clientId': ws.client_id,
'fromId': fromId, # c端userId
'isNote': isNote,
'message': content,
'msgType': msgType,
'platform': 1,
'roomChannelType': 'wk',
'roomId': roomId,
'roomPlatform': 'willdesk',
'siteId': siteId, # 店铺id
'uType': 2,
'uid': uid, # userId
'username': username # 品牌名称
},
'seq': str(random.randint(1000000000000000, 9999999999999999))
}
await ws.send_message(message=json.dumps(messageData))
response = eval(await ws.receive_message())
return response
def ws_assert(expected,msg):
'''
:param expected: 断言结果
:param res: 返回的信息
:return:
'''
try:
msg = eval(msg)
except Exception:
return 0
for item in expected: # 循环每一个断言场景
for item_key, item_value in item.items():
assert_way = item_key
path = list(item_value.keys())[0] # 拿到path路径
value = list(item_value.values())[0]
res_path_value = jsonpath.jsonpath(msg, path)[0]
if assert_way == 'eq': # eq代表完全相同
assert value == res_path_value
elif assert_way == 'not_eq': # not_eq代表不相同
assert value != res_path_value
elif assert_way == 'almost': # almonst代表四舍五入后相同就判断为一致
assert round(value, 2) == round(res_path_value, 2)
elif assert_way == 'in': # in代表断言值在返回值中
assert value in res_path_value
elif assert_way == 'ain':
assert res_path_value in value
def ws_run(ws,case):
'''
:param ws: 传入websocket服务
:param case: 传入用例数据
'''
loop = asyncio.get_event_loop()
loop.run_until_complete(ws_action(ws=ws,case=case))
# 使用示例
# async def main():
# data = json.dumps({
# 'action': "sendmsg",
# 'data': {
# 'avatar': "https://img.willdesk.com/${picUrl}",
# 'clientId': 'cnk14g7fot40rbaaqa20',
# 'fromId': '35948',
# 'isNote': False,
# 'message': '{"contentList":"testtesttest<br>","attachmentList":[]}',
# 'msgType': 'newstext',
# 'platform': 1,
# 'roomChannelType': 'wk',
# 'roomId': '502856393624136484',
# 'roomPlatform': 'willdesk',
# 'siteId': '7238',
# 'uType': 2,
# 'uid': '5746',
# 'username': 'zhs'
# },
# 'seq' : str(random.randint(1000000000000000, 9999999999999999))
# })
# webs = ws(url=f'wss://atestws.sealapps.com/ws?platform=1&authorization=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MDk3MDc4NDAsImlzcyI6IndpbGxkZXNrLmNvbSIsInN1YiI6IkxvZ2luQXV0aFRva2VuIiwidXNlcl9pbmZvIjp7ImNvbXBhbnlJZCI6NjA1OSwiYnJhbmRJZCI6NjA1MiwiY3VzdG9tZXJTZXJ2aWNlSWQiOjU3NDYsImxvZ2luUm9sZSI6MSwiY2xpZW50SWQiOiJjbmsxNGc3Zm90NDByYmFhcWEyMCIsImxvZ2luVGltZSI6MTcwOTcwNzg0MH0sImtleSI6ImFsbHBsYXRmb3JtLXdpbGxkZXNrLXVzZXIifQ.XcTxOOmKaSQDGSiZIHfx6AB83c0qlo8kzs2zQpV-XDw&t=1709710166112',attr='willdesk')
# await asyncio.create_task(webs.send_message(message=data))
# msg = await asyncio.create_task(webs.receive_message())
#
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
import yaml
import os
from Utils import path_handler
class yamlHandler:
def get_case(self,file_path):
try:
while file_path[0] == '/' or file_path[0] == '\\':
file_path = file_path[1:]
file_path = os.path.join(path_handler.YamlCase_dir,file_path)
with open(file_path,'rb') as f:
data = yaml.load(f,yaml.Loader)
return [data[case] for case in data] #以数组套字典的格式返回
except FileNotFoundError:
print(f'yaml用例文件未找到,传入路径:{file_path}')
raise Exception
yaml_handler = yamlHandler()
\ No newline at end of file
api1:
title: 获取用户信息
url: /api/v1/customerService/listCustomerService
method: post
data: {"isActivation" : -1,"isDelete" : -1,"v" : "${v}"}
set_value: {"picUrl" : "$.data.list[?(@.id == '${userId}')].profile"}
expected:
- eq: {"$.code" : 0}
#
#api2:
# title: 更改账户名
# url: /api/v1/customerService/updateCustomerServiceAccount
# method: post
# data: {"id" : "${userId}" , "localLang" : "zh-CN" ,"name" : "${new_name}" , "profile" : "${picUrl}", "v" : "${v}"}
# expected:
# - eq: {"$.code" : 0}
# - eq: {"$.msg" : "success"}
#
#api3:
# title: 检查用户名
# url: /api/v1/customerService/listCustomerService
# method: post
# data: {"isActivation" : -1,"isDelete" : -1,"v" : "${v}"}
# expected:
# - eq: {"$.code" : 0}
# - eq: {"$.data.list[?(@.id == '${userId}')].name" : "${new_name}"}
#
#api4:
# title: 检查用户名 - 首页查询账户信息接口
# url: /api/v1/customerService/getCustomerServiceInfo?v=${v}
# method: get
# expected:
# - eq: { "$.code": 0 }
# - eq: { "$.data.name": "${new_name}" }
#
#api5:
# title: 检查用户名 - inbox页面检查资源列表接口
# url: /api/v1/resource/getResourceItemList
# method: post
# data: {"resourceType" : "teammates" , "v" : "${v}"}
# expected:
# - eq: {"$.code" : 0}
# - eq: {"$.data.list[?(@.relId == '${userId}')].name" : "${new_name}"}
#
#api6:
# title: 更改用户的设置语言
# url: /api/v1/customerService/updateCustomerServiceAccount
# method: post
# data: {"id" : "${userId}" , "localLang" : "${target_language}" , "profile" : "${picUrl}", "v" : "${v}"}
# after_sql: /account/account_selectLanguage.py
# expected:
# - eq: {"$.code" : 0}
# - eq: {"$.msg" : "success"}
#
#api8:
# title: 检查翻译文本接口
# url: /api/v1/translate/transKeyValue
# before_sql: /account/account_selectTransResult.py
# method: post
# data: {"keyValue" : {"${trans_key}" : "${originValue}"},"targetLang" : "${target_language}","transType" : 1,"v" : "${v}"}
# expected:
# - eq: {"$.code": 0 }
# - eq: {"$.data.translated" : {"${trans_key}" : "${transValue}"}}
#
#api9:
# title: 检查翻译文本接口 - 2
# url: /api/v1/translate/transText
# method: post
# data: {"targetLang" : "English","text" : ["${originValue}"]}
# expected:
# - eq: {"$.code": 0 }
# - eq: {"$.data.translated" : ["${originValue}"]}
#
#
#api10:
# title: 还原设置
# url: /api/v1/customerService/updateCustomerServiceAccount
# method: post
# data: {"id" : "${userId}" , "localLang" : "${origin_language}" ,"name" : "${old_name}", "profile" : "${picUrl}", "v" : "${v}"}
# expected:
# - eq: {"$.code" : 0}
# - eq: {"$.msg" : "success"}
api1:
title: 查询当前月度套餐种类
url: /api/v1/businessPlan/getBasicPlan?planType=1&v=${v}
method: get
set_value: {"ProPlanId" : "$.data.basicPlanList[?(@.name == 'Pro')].id" , "FreePlanId" : "$.data.basicPlanList[?(@.name == 'Free')].id"}
expected:
- eq: {"$.code": 0}
api2:
title: 查询当前年度套餐种类
url: /api/v1/businessPlan/getBasicPlan?planType=3&v=${v}
method: get
expected:
- eq: {"$.code": 0}
api3:
title: 订阅免费套餐
url: /api/v1/businessPlan/buyBasicPlan
method: post
before_sql: billing/buyPlan_deleteNowPlan.py
data: {"shopId":"${shopId}","planId":"${FreePlanId}","v":"${v}"}
expected:
- eq: {"$.code": 0}
api4:
title: 检查是否切为免费套餐
url: /api/v1/businessPlan/getCompanyCurrentPlan?v=${v}
method: get
expected:
- eq: {"$.code": 0}
- eq: {"$.data.planId": "${FreePlanId}"}
- eq: {"$.data.price": "0.00"}
- eq: {"$.data.companyBasicPlanItem[?(@.itemId == 1)].amount": 20}
- eq: {"$.data.companyBasicPlanItem[?(@.itemId == 2)].amount": 20}
- eq: {"$.data.companyBasicPlanItem[?(@.itemId == 10)].amount": 1}
- eq: {"$.data.companyBasicPlanItem[?(@.itemId == 3)].amount": -1}
- eq: {"$.data.createAtFormat": "${today}"}
- eq: {"$.data.expireAtFormat": "${next_month}"}
api5:
title: 获取折扣信息
url: /api/v1/businessPlan/getPlanActive
method: post
data: {"v":"${v}"}
expected:
- eq: {"$.code": 0}
- eq: {"$.data.discount": 5.1}
api6:
title: 购买付费套餐 - 检查是否返回shopify付款链接
url: /api/v1/businessPlan/buyBasicPlan
method: post
data: { "shopId": "${shopId}","planId": "${ProPlanId}","v": "${v}" }
expected:
- eq: { "$.code": 0 }
- in: { "$.data.confirmationUrl" : "https://zhs-test.myshopify.com/admin/charges/4794771/"}
api7:
title: 检查是否切为付费套餐(仅校验了额度,没有校验planid)
url: /api/v1/businessPlan/getCompanyCurrentPlan?v=${v}
method: get
before_sql: billing/buyPlan_updateToPro.py
expected:
- eq: { "$.code": 0 }
# - eq: { "$.data.planId": "${ProPlanId}" }
- eq: { "$.data.companyBasicPlanItem[?(@.itemId == 1)].amount": 2000 }
- eq: { "$.data.companyBasicPlanItem[?(@.itemId == 2)].amount": 2000 }
- eq: { "$.data.companyBasicPlanItem[?(@.itemId == 10)].amount": 2000 }
- eq: { "$.data.companyBasicPlanItem[?(@.itemId == 3)].amount": -1 }
- eq: { "$.data.createAtFormat": "${today}" }
- eq: { "$.data.expireAtFormat": "${next_month}" }
api1:
title: 获取用户信息
url: /api/v1/customerService/getCustomerServiceInfo?v=${v}
method: get
expected:
- eq: {"$.code": 0}
- eq: {"$.data.brandId": "${brand_id}"}
- eq: {"$.data.brandName": "${brand_name}"}
- eq: {"$.data.email": "${email}"}
- eq: {"$.data.id": "${id}"}
- eq: {"$.data.name": "${name}"}
\ No newline at end of file
import pytest
import requests
import time
import re
from Utils import config_handler
from Utils import global_variate
from Utils import websocket_handler
session = requests.session()
def willdesk_login():
'''
执行用例前获取登录态的接口,全局只执行一次,通过env参数控制是否测试环境
'''
env = 'test'
email = 'test_account' if env == 'test' else 'formal_account'
password = 'test_password' if env == 'test' else 'formal_password'
address = 'test_address' if env == 'test' else 'formal_address'
account = config_handler.base_config.get_value('account',email)
password = config_handler.base_config.get_value('account',password)
login_url = f'{config_handler.base_config.get_value("url",address)}/api/v1/customerService/loginV2' #登录的api
token_url = f'{config_handler.base_config.get_value("url",address)}/api/v1/customerService/genToken' #登录后获取token的api
version_url = 'https://hstest.sealapps.com/login' if env == 'test' else 'https://app.willdesk.com/login'
headers = {
'Content-Type' : 'application/json',
'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0'
}
version_res = session.get(url=version_url,headers=headers).text
v = re.findall(r'<script src="/(.*?)/assets',version_res)[0]
login_data = {
'email' : account,
'password' : password,
'v' : v
}
login_res = session.post(url=login_url,headers=headers,json=login_data).json()
code = login_res['data']['code']
brandName = login_res['data']['list'][0]['brandName']
brandId = login_res['data']['list'][0]['id']
token_data = {
'code' : code,
'brandId' : brandId,
'email' : account,
'v' : v
}
token_res = session.post(url=token_url,headers=headers,json=token_data).json()
token = token_res['data']['accessToken']
email = token_res['data']['email']
userId = token_res['data']['id']
name = token_res['data']['name']
shopList = token_res['data']['shopList']
companyId = token_res['data']['shopList'][0]['companyId']
setattr(global_variate.Global,'access_token',token)
setattr(global_variate.Global,'brandId',brandId)
setattr(global_variate.Global,'brandName',brandName)
setattr(global_variate.Global,'code',code)
setattr(global_variate.Global,'v',v)
setattr(global_variate.Global,'email',email)
setattr(global_variate.Global,'userId',userId)
setattr(global_variate.Global,'name',name)
setattr(global_variate.Global,'shopList',shopList)
setattr(global_variate.Global,'shopId',shopList[0]['id'])
setattr(global_variate.Global,'companyId',companyId)
def customer_login():
env = 'test'
address = 'test_address' if env == 'test' else 'formal_address'
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0'
}
version_url = 'https://allplatformtest.sealapps.com/api/v1/shop/local/getCustomerHtmlLinks?customerDomain=https://utest.sealapps.com&shopDomain=zhs-test.myshopify.com' if env == 'test' else 'https://api.willdesk.com/api/v1/shop/local/getCustomerHtmlLinks?customerDomain=https://u.willdesk.com&shopDomain=test-2023013104.myshopify.com'
shop_url = 'https://allplatformtest.sealapps.com/api/v1/setting/getCShopSetting?domain=zhs-test.myshopify.com' if env == 'test' else 'https://api.willdesk.com/api/v1/setting/getCShopSetting?domain=test-2023013104.myshopify.com'
version_res = session.get(url=version_url,headers=headers).text
c_token = re.findall(r'utest.sealapps.com/(.*?)/assets',version_res)[0]
shop_res = session.get(url=shop_url,headers=headers).json()
login_url = f'{config_handler.base_config.get_value("url",address)}/api/v1/customer/login' #登录的api
data = {
'email' : 'autotest_customer@qq.com',
'isVisitor' : 0,
'location' : 'Shanghai,China',
'phone' : '',
'shopId' : shop_res['data']['shopInfo']['shopId'],
'timeZone' : '8',
'userName' : 'autotest_customer',
'v' : c_token
}
login_res = session.post(url=login_url,json=data,headers=headers).json()
setattr(global_variate.Global,'clientToken',login_res['data']['accessToken'])
setattr(global_variate.Global,'customerId',login_res['data']['customerId'])
address = 'wss://atestws.sealapps.com/ws' if env == 'test' else 'wss://ws.willdesk.com/ws'
print(f'{address}?platform=1&authorization={global_variate.Global.access_token}&t={int(time.time())}')
websocket_handler.ws_willdesk = websocket_handler.ws(
url=f'{address}?platform=1&authorization={global_variate.Global.access_token}&t={int(time.time())}',
attr='willdesk') # willdesk账号websocket链接
websocket_handler.ws_client = websocket_handler.ws(
url=f'{address}?platform=1&authorization={global_variate.Global.clientToken}&t={int(time.time())}',
attr='customer') # c端websocket链接
@pytest.fixture(scope='session',autouse=True)
def call_fixture():
#自定义调用顺序
willdesk_login()
customer_login()
\ No newline at end of file
import pytest
import time
import os
from Utils.sql_handler import test_env_conn
if __name__ == '__main__':
tims_str = int(time.time())
pytest.main(['-vs','TestCase/account',f'--alluredir=./allureReports','--clean-alluredir'])
os.system(f'allure generate ./allureReports -c')
# os.system('allure open ./allureReports')
test_env_conn.close_db()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment