import asyncio import websockets import time,random,json,jsonpath from Utils.global_variate import Global from TestData.inbox.livechat_test_data import livechatData loop = asyncio.get_event_loop() #异步主线 loop.set_debug(True) class ws: def __init__(self, url, attr): self.url = url self.websocket = None self.loop = asyncio.get_event_loop() self.client_id = '' self.attr = attr async def get_client_id(self): if not self.websocket: self.websocket = await websockets.connect(self.url,open_timeout=60) enter_res = json.loads(await self.websocket.recv()) if enter_res.get('action'): if enter_res['action'] in ['readMsg', 'tipmsg']: self.client_id = enter_res['data']['clientId'] elif enter_res['action'] == 'sendmsg': self.client_id = enter_res['data']['response']['clientId'] else: # action是heartbeat的情况 self.client_id = enter_res['data']['cliend'] else: # action是enter的情况 self.client_id = enter_res['data']['clientId'] setattr(Global,self.attr+'_clientId',self.client_id) async def connect(self): try: self.websocket = await websockets.connect(self.url) await self.get_client_id() # print(f"Connected to {self.url}") except websockets.ConnectionClosed as e: await self.close() async def send_message(self, message): if self.websocket and self.websocket.open: try: await self.websocket.send(message) time.sleep(0.5) except websockets.ConnectionClosed as e: await self.close() except Exception as e: print('error:',e) else: await self.connect() await self.send_message(message) async def receive_message(self): if self.websocket and self.websocket.open: try: message = await self.websocket.recv() return message except websockets.ConnectionClosed as e: await self.close() except NameError: return message except Exception as e: print(e) else: await self.connect() return await self.receive_message() async def close(self): if self.websocket: await self.websocket.close() self.websocket = None print("WebSocket connection closed") async def ws_action(ws,case): ''' :param ws: 传入websocket服务 :param case: 传入用例数据 ''' action = case['action'] #操作 if action in ['sendmsg','readMsg']: response = await ws_send_handler(case=case,ws=ws,action=action) if response.get('code'): assert response['code'] == 200 elif response.get('response'): assert response['response']['code'] == 200 else: if response['action'] == 'roomTips': assert response['brandId'] == getattr(Global,'brandId') # type = case['data']['type'] # if type in ['emoji','file','link','rate','product']: # if type not in ['file','rate','emoji']: #type等于file的时候,后端返回的action,willdesk端是sendmsg,c端又是readMsg,干脆不作判断了 # assert response['data']['action'] == 'sendmsg' # # else: # assert response['response']['code'] == 200 # assert response['action'] == 'sendmsg' elif action == 'receive': expected = case['expected'] # 断言 if ws.client_id == '': await ws.get_client_id() msg = json.loads(await ws.receive_message()) ws_assert(expected=expected,msg=msg) async def ws_send_handler(case,ws,action): data = case['data'] # 消息数据 roomId = str(data['roomId']) if ws.attr == 'willdesk' else '' uid = data['uid'] if data.get('uid') else getattr(Global, 'userId') siteId = data['siteId'] if data.get('siteId') else getattr(Global, 'shopId') fromId = data['fromId'] if data.get('fromId') else getattr(Global, 'customerId') if ws.client_id == '': await ws.get_client_id() client_id = ws.client_id if action == 'sendmsg': content = getattr(livechatData, data['type']) # 根据type取到对应格式的content if data['type'] != 'rate': msgType = 'newstext' else: msgType = 'rate' username = data['username'] if data.get('username') else getattr(Global, 'name') isNote = data['isNote'] if data.get('isNote') else False avatar = 'https://img.willdesk.com/${picUrl}' if ws.attr == 'willdesk' else '' if ws.attr == 'willdesk': data = { 'avatar': avatar, 'clientId': client_id, 'fromId': str(fromId), # c端userId 'isNote': isNote, 'message': content, 'msgType': msgType, 'platform': 1, 'roomChannelType': 'wk', 'roomId': roomId, 'roomPlatform': 'willdesk', 'siteId': str(siteId), # 店铺id 'uType': 2, 'uid': str(uid), # userId 'username': username # 品牌名称 } else: data = { 'avator': avatar, 'clientId': client_id, 'isNote': isNote, 'message': content, 'msgType': msgType, 'pageUrl': 'https://zhs-test.myshopify.com/', 'platform': 1, 'roomId': roomId, 'sendTime': int(time.time()), 'siteId': str(siteId), # 店铺id 'uType': 1, 'uid': str(fromId), # userId 'username': Global.customerName # 品牌名称 } elif action == 'readMsg': msgId = data['msgId'], msgTime = data['msgTime'] data = { 'clientId' : client_id, 'msgId' : str(msgId), 'msgTime' : str(msgTime), 'platform' : 1, 'roomId' : roomId, 'siteId' : str(siteId), 'utype' : 2 if ws.attr == 'willdesk' else 1, 'uid' : str(uid) if ws.attr == 'willdesk' else str(fromId) } messageData = { 'action': action, 'data': data, 'seq': str(random.randint(1000000000000000, 9999999999999999)) } await ws.send_message(message=json.dumps(messageData)) response = json.loads(await ws.receive_message()) return response def ws_assert(expected,msg): ''' :param expected: 断言结果 :param res: 返回的信息 :return: ''' try: for item in expected: # 循环每一个断言场景 for item_key, item_value in item.items(): assert_way = item_key path = list(item_value.keys())[0] # 拿到path路径 value = list(item_value.values())[0] res_path_value = jsonpath.jsonpath(msg, path)[0] if assert_way == 'eq': # eq代表完全相同 assert value == res_path_value elif assert_way == 'not_eq': # not_eq代表不相同 assert value != res_path_value elif assert_way == 'like': # 代表值相同但是type可能不同 assert str(value) == str(res_path_value) elif assert_way == 'almost': # almonst代表四舍五入后相同就判断为一致 assert round(value, 2) == round(res_path_value, 2) elif assert_way == 'in': # in代表断言值在返回值中 assert value in res_path_value elif assert_way == 'ain': assert res_path_value in value except AssertionError: print(f'期望值为{value}({type(value)}),实际返回值为{res_path_value}({type(res_path_value)}),完整的expected体为{expected}') raise AssertionError async def ws_run(ws,case): ''' :param ws: 传入websocket服务 :param case: 传入用例数据 ''' # asyncio.run(ws_action(ws=ws,case=case)) task = asyncio.create_task(ws_action(ws=ws,case=case)) await asyncio.wait_for(task,timeout=10) # loop.run_until_complete(ws_action(ws=ws,case=case))