Commit 73ecf256 authored by zhanhuasheng's avatar zhanhuasheng

新增log

parent 72f4fc0a
......@@ -4,4 +4,3 @@ class channelData(Global):
pass
# brandId = 6052
# shopId = 7238
......@@ -11,7 +11,7 @@ setattr(channelData, 'channel_id', channel_id)
setattr(channelData, 'channel_name', channel_name)
setattr(channelData, 'channel_resource_id', channel_resource_id)
channel_open_room_select_sql = f'select * from `im_room` where site_id = {channel_rel_id} and status = 1 and is_ignore = 0 '
channel_open_room_select_sql = f'select * from `im_room` where site_id = {channel_rel_id} and status = 1 and is_ignore = 0 and room_type != 2'
channel_open_room_select_result = test_env_conn.select_many_value(sql=channel_open_room_select_sql)
setattr(channelData, 'channel_open_room_num', len(channel_open_room_select_result))
setattr(channelData, 'channel_open_room_num_add', len(channel_open_room_select_result) + 1)
......@@ -29,16 +29,16 @@ setattr(channelData, 'customer_service_name', customer_service_list_select_resul
setattr(channelData, 'customer_service_image', customer_service_list_select_result[0]['profile'])
setattr(channelData, 'customer_service_email', customer_service_list_select_result[0]['email'])
customer_channel_open_room_select_sql = f'select * from `im_room` where site_id = {channel_rel_id} and status = 1 and is_ignore = 0 and allocation_uid = {customer_service_id}'
customer_channel_open_room_select_sql = f'select * from `im_room` where site_id = {channel_rel_id} and status = 1 and room_type != 2 and is_ignore = 0 and allocation_uid = {customer_service_id}'
customer_channel_open_room_select_result = test_env_conn.select_many_value(sql=customer_channel_open_room_select_sql)
setattr(channelData, 'customer_channel_open_room_num', len(customer_channel_open_room_select_result))
customer_open_room_select_sql = f'select * from `im_room` where status = 1 and is_ignore = 0 and allocation_uid = {customer_service_id}'
customer_open_room_select_sql = f'select * from `im_room` where status = 1 and room_type != 2 and is_ignore = 0 and allocation_uid = {customer_service_id}'
customer_open_room_select_result = test_env_conn.select_many_value(sql=customer_open_room_select_sql)
setattr(channelData, 'customer_open_room_num', len(customer_open_room_select_result))
setattr(channelData, 'customer_open_room_num_add', len(customer_open_room_select_result) + 1)
other_room_select_sql = f'select * from `im_room` where allocation_uid != {customer_service_id} and cate_id = {channelData.brandId} and status = 1 limit 10'
other_room_select_sql = f'select * from `im_room` where allocation_uid != {customer_service_id} and room_type != 2 and cate_id = {channelData.brandId} and status = 1 limit 10'
other_room_select_result = test_env_conn.select_one_value(sql=other_room_select_sql)
setattr(channelData, 'other_room_id', other_room_select_result['room_id'])
......@@ -15,21 +15,21 @@ team_room_list_select_result = test_env_conn.select_many_value(sql=team_room_lis
team_room_list = tuple(i['room_id'] for i in team_room_list_select_result)
if team_room_list != ():
open_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 1'
open_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 1 and room_type != 2'
open_room_select_result = test_env_conn.select_many_value(sql=open_room_select_sql)
pending_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 2'
pending_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 2 and room_type != 2'
pending_room_select_result = test_env_conn.select_many_value(sql=pending_room_select_sql)
close_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 3'
close_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 3 and room_type != 2'
close_room_select_result = test_env_conn.select_many_value(sql=close_room_select_sql)
setattr(teamData, 'open_room_num', len(open_room_select_result))
setattr(teamData, 'pending_room_num', len(pending_room_select_result))
setattr(teamData, 'close_room_num', len(close_room_select_result))
other_room_select_sql = f'select * from `im_room` where room_id not in {team_room_list} and site_id = {teamData.shopId} and status = 1 limit 10'
other_room_select_sql = f'select * from `im_room` where room_id not in {team_room_list} and site_id = {teamData.shopId} and room_type != 2 and status = 1 limit 10'
else:
setattr(teamData, 'open_room_num', 0)
setattr(teamData, 'pending_room_num', 0)
setattr(teamData, 'close_room_num', 0)
other_room_select_sql = f'select * from `im_room` where `cate_id` = {teamData.brandId} and site_id = {teamData.shopId} and status = 1 limit 10'
other_room_select_sql = f'select * from `im_room` where `cate_id` = {teamData.brandId} and room_type != 2 and site_id = {teamData.shopId} and status = 1 limit 10'
other_room_select_result = test_env_conn.select_one_value(sql=other_room_select_sql)
setattr(teamData, 'other_room_id', other_room_select_result['room_id'])
......@@ -48,7 +48,7 @@ setattr(teamData, 'team_member_image', team_member_info_select_result['profile']
setattr(teamData, 'team_member_email', team_member_info_select_result['email'])
team_member_open_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and status = 1 and allocation_uid = {team_member_id}'
team_member_open_room_select_sql = f'select * from `im_room` where room_id in {team_room_list} and room_type != 2 and status = 1 and allocation_uid = {team_member_id}'
team_member_open_room_select_result = test_env_conn.select_many_value(sql=team_member_open_room_select_sql)
setattr(teamData, 'team_member_open_room_num', len(team_member_open_room_select_result))
from Utils.sql_handler import test_env_conn
from TestData.inbox.ticketInfo_test_data import ticketInfoData
all_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 1 and `is_ignore` = 0'
wait_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 2 and `is_ignore` = 0'
close_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 3 and `is_ignore` = 0'
your_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 1 and `allocation_uid` = {ticketInfoData.userId} and `is_ignore` = 0'
your_wait_ticket_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 2 and `allocation_uid` = {ticketInfoData.userId} and `is_ignore` = 0'
your_close_ticket_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 3 and `allocation_uid` = {ticketInfoData.userId} and `is_ignore` = 0'
all_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 1 and `is_ignore` = 0 and room_type != 2'
wait_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 2 and `is_ignore` = 0 and room_type != 2'
close_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 3 and `is_ignore` = 0 and room_type != 2'
your_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 1 and `allocation_uid` = {ticketInfoData.userId} and `is_ignore` = 0 and room_type != 2'
your_wait_ticket_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 2 and `allocation_uid` = {ticketInfoData.userId} and `is_ignore` = 0 and room_type != 2'
your_close_ticket_sql = f'select * from `im_room` where `cate_id` = {ticketInfoData.brandId} and `status` = 3 and `allocation_uid` = {ticketInfoData.userId} and `is_ignore` = 0 and room_type != 2'
all_ticket_num = len(test_env_conn.select_many_value(sql=all_ticket_select_sql))
wait_ticket_num = len(test_env_conn.select_many_value(sql=wait_ticket_select_sql))
......
from Utils.sql_handler import test_env_conn
from TestData.inbox.ticketInfo_test_data import ticketInfoData
ticket_info_select_sql = f'select * from im_room where room_id = {ticketInfoData.roomId}'
ticket_info_select_sql = f'select * from im_room where room_id = {ticketInfoData.roomId} and room_type != 2'
ticket_info_result = test_env_conn.select_one_value(sql=ticket_info_select_sql)
setattr(ticketInfoData, 'ticketCustomerImage', ticket_info_result['first_send_avatar'])
......
from Utils.sql_handler import test_env_conn
from TestData.inbox.ticketStatus_test_data import ticketStatusData
all_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `is_ignore` = 0'
your_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `allocation_uid` = {ticketStatusData.userId} and `is_ignore` = 0'
unread_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `read_status` = 1 and `is_ignore` = 0'
unassigned_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `allocation_uid` = 0 and `is_ignore` = 0'
all_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `is_ignore` = 0 and room_type != 2'
your_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `allocation_uid` = {ticketStatusData.userId} and `is_ignore` = 0 and room_type != 2'
unread_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `read_status` = 1 and `is_ignore` = 0 and room_type != 2'
unassigned_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `allocation_uid` = 0 and `is_ignore` = 0 and room_type != 2'
all_ticket_num = len(test_env_conn.select_many_value(sql=all_ticket_select_sql))
......@@ -23,7 +23,7 @@ setattr(ticketStatusData,'unassigned_ticket_num_minus',len(unassigned_ticket_num
setattr(ticketStatusData,'unassigned_ticket_id',unassigned_ticket_num[0]['room_id'])
read_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `read_status` = 2 and `room_platform` = "willdesk" and `is_ignore` = 0 and `site_id` = {ticketStatusData.shopId}'
read_ticket_select_sql = f'select * from `im_room` where `cate_id` = {ticketStatusData.brandId} and `status` = 1 and `read_status` = 2 and `room_platform` = "willdesk" and `is_ignore` = 0 and `site_id` = {ticketStatusData.shopId} and room_type != 2'
read_ticket_select_result = test_env_conn.select_one_value(sql=read_ticket_select_sql)
setattr(ticketStatusData,'read_ticket_id',str(read_ticket_select_result["room_id"]))
......
......@@ -4,7 +4,7 @@ import os
import inspect
class LogHandler:
def __init__(self, name='autotest_logger', log_dir='../TestLog'):
def __init__(self, name='autotest_logger', log_dir='./TestLog'):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
......@@ -13,7 +13,7 @@ class LogHandler:
today = datetime.datetime.now().strftime("%Y-%m-%d")
log_file = os.path.join(log_dir, f"{today}_log.log")
file_handler = logging.FileHandler(log_file)
file_handler = logging.FileHandler(log_file,encoding='utf-8')
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] in %(module)s (line %(lineno)d): %(message)s'
......@@ -37,5 +37,6 @@ class LogHandler:
self.logger.error(formatted_message, *args, **kwargs)
# 使用示例:
log_dir = '../TestLog' # 指定日志存放目录
log_dir = './TestLog' # 指定日志存放目录
logger = LogHandler(name='autotest_logger', log_dir=log_dir)
......@@ -7,6 +7,7 @@ from Utils import config_handler
from Utils import global_variate
from Utils import path_handler
from Utils import websocket_handler
from Utils.log_handler import logger
class ReqHandler:
......@@ -40,19 +41,53 @@ class ReqHandler:
@classmethod
def send_requests(self,case,var_class):
if isinstance(case,dict):
if case.get('before_sql'):
sql = case['before_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir,sql).replace('\\','/'),mode='r',encoding='utf8') as f:
content = f.read()
exec(content)
case = self.params_handler(self,case=case,var_class=var_class)
title = case['title']
if case.get('ws'):
#识别到ws字段后,直接走ws_request函数,不继续往下面走
self.ws_requests(self,case)
try:
if isinstance(case,dict):
if case.get('before_sql'):
sql = case['before_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir,sql).replace('\\','/'),mode='r',encoding='utf8') as f:
content = f.read()
exec(content)
case = self.params_handler(self,case=case,var_class=var_class)
title = case['title']
if case.get('ws'):
#识别到ws字段后,直接走ws_request函数,不继续往下面走
self.ws_requests(self,case)
if case.get('after_sql'):
sql = case['after_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir, sql).replace('\\', '/'), mode='r',
encoding='utf8') as f:
content = f.read()
exec(content)
if case.get('sleep'):
time.sleep(float(case['sleep']))
return 1
url = case['url'] if 'http' in case['url'] else config_handler.base_config.get_value('url','test_address') + case['url'] #判断是否有域名,没有的话给config文件中的默认测试域名
method = case['method']
expected = case['expected']
if case.get('data'):
data = case['data']
else:
data = {}
headers = {'Authorization' : getattr(global_variate.Global,'access_token'),'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'} #如果没有请求头,默认赋予一个带token的head
if case.get('headers'):
headers.update(case['headers'])
if method == 'post':
if headers.get('content-type') == 'application/x-www-form-urlencoded' or headers.get('content-type') == 'text/xml':
res = requests.request(url=url,method=method,data=data,headers=headers)
elif headers.get('content-type') == 'multipart/formdata':
res = requests.request(url=url,method=method,files=data,headers=headers)
else:
res = requests.request(url=url,method=method,json=data,headers=headers)
else:
res = requests.request(url=url,method=method,data=data,headers=headers)
self.assert_handler(self,res=res,expected=expected)
if case.get('set_value'):
self.set_value_handler(self,res=res,item=case['set_value'],var_class=var_class)
if case.get('after_sql'):
sql = case['after_sql']
while sql[0] == '\\' or sql[0] == '/':
......@@ -63,41 +98,14 @@ class ReqHandler:
exec(content)
if case.get('sleep'):
time.sleep(float(case['sleep']))
return 1
url = case['url'] if 'http' in case['url'] else config_handler.base_config.get_value('url','test_address') + case['url'] #判断是否有域名,没有的话给config文件中的默认测试域名
method = case['method']
expected = case['expected']
if case.get('data'):
data = case['data']
logger.info(f'{title}用例执行成功')
else:
data = {}
headers = {'Authorization' : getattr(global_variate.Global,'access_token'),'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'} #如果没有请求头,默认赋予一个带token的head
if case.get('headers'):
headers.update(case['headers'])
if method == 'post':
if headers.get('content-type') == 'application/x-www-form-urlencoded' or headers.get('content-type') == 'text/xml':
res = requests.request(url=url,method=method,data=data,headers=headers)
elif headers.get('content-type') == 'multipart/formdata':
res = requests.request(url=url,method=method,files=data,headers=headers)
else:
res = requests.request(url=url,method=method,json=data,headers=headers)
else:
res = requests.request(url=url,method=method,data=data,headers=headers)
self.assert_handler(self,res=res,expected=expected)
if case.get('set_value'):
self.set_value_handler(self,res=res,item=case['set_value'],var_class=var_class)
if case.get('after_sql'):
sql = case['after_sql']
while sql[0] == '\\' or sql[0] == '/':
sql = sql[1:]
with open(os.path.join(path_handler.TestFile_dir, sql).replace('\\', '/'), mode='r',
encoding='utf8') as f:
content = f.read()
exec(content)
if case.get('sleep'):
time.sleep(float(case['sleep']))
else:
raise TypeError('用例格式有误')
raise TypeError('用例格式有误')
except AssertionError:
logger.error(f'{title}用例执行失败,失败原因:断言不通过\n期望值{expected}\n实际返回值{res.text}')
except Exception as e:
logger.error(f'{title}用例执行失败,失败原因:{e}')
raise Exception
def ws_requests(self,case):
'''
......
......@@ -162,6 +162,7 @@ api17:
title: b端主动发起会话 - 笔记
ws: willdesk
action: sendmsg
sleep: 1
data:
type: text
roomId: "${roomId}"
......
import pytest
import os
from Utils.sql_handler import test_env_conn
if __name__ == '__main__':
try:
pytest.main(['-vs',r'C:\Users\rd71\PycharmProjects\willdesk_api_auto\TestCase',f'--alluredir=./allureReports','--clean-alluredir'])
# pytest.main(['-vs',r'C:\Users\rd71\PycharmProjects\willdesk_api_auto\TestCase\inbox\test_06_channel.py',f'--alluredir=./allureReports/json','--clean-alluredir'])
pytest.main(['-vs',r'C:\Users\rd71\PycharmProjects\willdesk_api_auto\TestCase\inbox\test_06_channel.py',f'--html=./report.html']) #allure报告一直生成不了,改用pytest自带报告
finally:
test_env_conn.close_db() #关闭数据库链接
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment