import asyncio import websockets import time,random,json,jsonpath from Utils.global_variate import Global from TestData.inbox.livechat_test_data import livechatData loop = asyncio.get_event_loop() #异步主线 class ws: def __init__(self, url, attr): self.url = url self.websocket = None self.loop = asyncio.get_event_loop() self.client_id = '' self.attr = attr async def get_client_id(self): if not self.websocket: self.websocket = await websockets.connect(self.url) enter_res = json.loads(await self.websocket.recv()) if enter_res.get('action'): if enter_res['action'] in ['readMsg', 'tipmsg']: self.client_id = enter_res['data']['clientId'] elif enter_res['action'] == 'sendmsg': self.client_id = enter_res['data']['response']['clientId'] else: # action是heartbeat的情况 self.client_id = enter_res['data']['cliend'] else: # action是enter的情况 self.client_id = enter_res['data']['clientId'] async def connect(self): try: self.websocket = await websockets.connect(self.url) await self.get_client_id() # print(f"Connected to {self.url}") except websockets.ConnectionClosed as e: await self.close() async def send_message(self, message): if self.websocket and self.websocket.open: try: print('开始发送消息') await self.websocket.send(message) print(f"Sent message: {message}") except websockets.ConnectionClosed as e: await self.close() except Exception: print('error') else: print('send重新链接') await self.connect() await self.send_message(message) async def receive_message(self): if self.websocket and self.websocket.open: try: message = await self.websocket.recv() print(f"Received message: {message}") return message except websockets.ConnectionClosed as e: print('receive关闭链接') await self.close() except NameError: return message except Exception as e: print(e) else: print('receive重新链接') await self.connect() return await self.receive_message() async def close(self): if self.websocket: await self.websocket.close() self.websocket = None print("WebSocket connection closed") async def ws_action(ws,case): ''' :param ws: 传入websocket服务 :param case: 传入用例数据 ''' action = case['action'] #操作 if action == 'sendmsg': response = await ws_send_handler(case=case,ws=ws) assert response['response']['code'] == 200 assert response['action'] == 'sendmsg' elif action == 'receive': expected = case['expected'] # 断言 if ws.client_id == '': await ws.get_client_id() msg = json.loads(await ws.receive_message()) ws_assert(expected=expected,msg=msg) async def ws_send_handler(case,ws): data = case['data'] # 消息数据 msgType = 'newstext' if data['type'] in ['text', 'emoji']: content = r'{"contentList":"%s","attachmentList":[]}' % data['content'] elif data['type'] == 'image': content = "{\"contentList\":\"

\",\"attachmentList\":[]}"%livechatData.imgSrc elif data['type'] == 'file': content = r'{"contentList":"","attachmentList":[{"src":"%s","blobSrc":"%s","name":"新建 文本文档.txt","size":0,"type":"text/plain"}]}'%(livechatData.fileSrc,livechatData.fileSrc) elif data['type'] == 'article': content = r'{"contentList":"
article title

article_detail

View all →
","attachmentList":[]}' %data['content'] elif data['type'] == 'rate': content = r'[&*{"reviewContent":"%s","thumbsReviews":{"good":"","bad":"FeedbackURL"},"startReviews":{"one_start":"FeedbackURL","two_start":"FeedbackURL","three_start":"FeedbackURL","four_start":"","five_start":"www.baidu.com"},"reviewTitle":"%s","reviewChoose":"thumbs","askSendReview":"true","autoSendReview":"false","primaryColor":"#2C23E5"}]'%(livechatData.rateContent,livechatData.rateTitle) msgType = 'rate' elif data['type'] == 'link': content = "{\"contentList\":\"%s\",\"attachmentList\":[]}"%(livechatData.linkHref,livechatData.linkTitle) elif data['type'] == 'product': content = "{\"contentList\":\"
Selling Plans Ski Wax
\",\"attachmentList\":[]}" elif data['type'] == 'coupon': content = r'{"contentList":"优惠券","attachmentList":[]}' fromId = data['fromId'] if data.get('fromId') else getattr(Global, 'customerId') siteId = data['siteId'] if data.get('siteId') else getattr(Global, 'shopId') uid = data['uid'] if data.get('uid') else getattr(Global, 'userId') username = data['username'] if data.get('username') else getattr(Global, 'name') isNote = data['isNote'] if data.get('isNote') else False roomId = str(data['roomId']) if ws.attr == 'willdesk' else '' avatar = 'https://img.willdesk.com/${picUrl}' if ws.attr == 'willdesk' else '' if ws.client_id == '': await ws.get_client_id() client_id = ws.client_id if ws.attr == 'willdesk': data = { 'avatar': avatar, 'clientId': client_id, 'fromId': str(fromId), # c端userId 'isNote': isNote, 'message': content, 'msgType': msgType, 'platform': 1, 'roomChannelType': 'wk', 'roomId': roomId, 'roomPlatform': 'willdesk', 'siteId': str(siteId), # 店铺id 'uType': 2, 'uid': str(uid), # userId 'username': username # 品牌名称 } else: data = { 'avator': avatar, 'clientId': client_id, 'isNote': isNote, 'message': content, 'msgType': msgType, 'pageUrl': 'https://zhs-test.myshopify.com/', 'platform': 1, 'roomId': roomId, 'sendTime': int(time.time()), 'siteId': str(siteId), # 店铺id 'uType': 1, 'uid': str(fromId), # userId 'username': Global.customerName # 品牌名称 } messageData = { 'action': 'sendmsg', 'data': data, 'seq': str(random.randint(1000000000000000, 9999999999999999)) } await ws.send_message(message=json.dumps(messageData)) response = json.loads(await ws.receive_message()) return response def ws_assert(expected,msg): ''' :param expected: 断言结果 :param res: 返回的信息 :return: ''' for item in expected: # 循环每一个断言场景 for item_key, item_value in item.items(): assert_way = item_key path = list(item_value.keys())[0] # 拿到path路径 value = list(item_value.values())[0] res_path_value = jsonpath.jsonpath(msg, path)[0] if assert_way == 'eq': # eq代表完全相同 assert value == res_path_value elif assert_way == 'not_eq': # not_eq代表不相同 assert value != res_path_value elif assert_way == 'almost': # almonst代表四舍五入后相同就判断为一致 assert round(value, 2) == round(res_path_value, 2) elif assert_way == 'in': # in代表断言值在返回值中 assert value in res_path_value elif assert_way == 'ain': assert res_path_value in value async def ws_run(ws,case): ''' :param ws: 传入websocket服务 :param case: 传入用例数据 ''' # asyncio.run(ws_action(ws=ws,case=case)) task = asyncio.ensure_future(ws_action(ws=ws,case=case)) await asyncio.wait_for(task,timeout=10) # loop.run_until_complete(ws_action(ws=ws,case=case)) # 使用示例 # async def main(): # data = json.dumps({ # 'action': "sendmsg", # 'data': { # 'avator': "", # 'clientId': 'cnkm29etjph80j73niq0', # 'isNote': False, # 'message': '{"contentList":"testtesttest
","attachmentList":[]}', # 'msgType': 'newstext', # 'platform': 1, # 'roomId': '', # 'siteId': '7238', # 'uType': 1, # 'sendTime': int(time.time()), # 'uid': '36202', # 'username': 'zhs' # }, # 'seq' : str(random.randint(1000000000000000, 9999999999999999)) # }) # webs = ws(url=f'wss://atestws.sealapps.com/ws?platform=1&authorization=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MDk3NzY4OTgsImlzcyI6IndpbGxkZXNrLmNvbSIsInN1YiI6IkxvZ2luQXV0aFRva2VuIiwidXNlcl9pbmZvIjp7ImNvbXBhbnlJZCI6NjA1OSwiYnJhbmRJZCI6NjA1MiwiY3VzdG9tZXJTZXJ2aWNlSWQiOjU3NDYsImxvZ2luUm9sZSI6MSwiY2xpZW50SWQiOiJjbmtpMDBnNmxnY2M3MWNxZHM1ZyIsImxvZ2luVGltZSI6MTcwOTc3Njg5OH0sImtleSI6ImFsbHBsYXRmb3JtLXdpbGxkZXNrLXVzZXIifQ.EllFwfirKFTB88BzBigz64tBz1SJQFFwFp0tWlQzWVU&t=1709802959971',attr='willdesk') # await asyncio.create_task(webs.send_message(message=data)) # msg = await asyncio.create_task(webs.receive_message()) # print(msg) # # loop = asyncio.get_event_loop() # loop.run_until_complete(main())