import asyncio import websockets import time,random,json,jsonpath from Utils.global_variate import Global class ws: def __init__(self, url, attr): self.url = url self.websocket = None self.loop = asyncio.get_event_loop() self.client_id = '' async def connect(self): try: self.websocket = await websockets.connect(self.url) enter_res = eval(await self.websocket.recv()) if enter_res.get('action'): if enter_res['action'] in ['readMsg','tipmsg']: self.client_id = enter_res['data']['clientId'] elif enter_res['action'] =='sendmsg': self.client_id = enter_res['data']['response']['clientId'] else: #action是heartbeat的情况 self.client_id = enter_res['data']['cliend'] else: #action是enter的情况 self.client_id = enter_res['data']['clientId'] # print(f"Connected to {self.url}") except websockets.ConnectionClosed as e: await self.close() async def send_message(self, message): if self.websocket and self.websocket.open: try: print('开始发送消息') await self.websocket.send(message) print(f"Sent message: {message}") except websockets.ConnectionClosed as e: await self.close() except Exception: print('error') else: # print('send重新链接') await self.connect() await self.send_message(message) async def receive_message(self): if self.websocket and self.websocket.open: try: message = await self.websocket.recv() print(f"Received message: {message}") return message except websockets.ConnectionClosed as e: print('receive关闭链接') await self.close() except NameError: return message else: print('receive重新链接') await self.connect() return await self.receive_message() def ws_sendmsg(self,case): pass async def close(self): if self.websocket: await self.websocket.close() self.websocket = None print("WebSocket connection closed") async def ws_action(ws,case): ''' :param ws: 传入websocket服务 :param case: 传入用例数据 ''' action = case['action'] #操作 expected = case['expected'] #断言 if action == 'sendmsg': response = ws_send_handler(case=case,ws=ws) assert response['response']['code'] == 200 assert response['action'] == 'sendmsg' elif action == 'receive': msg = await ws.receive_message() while msg['action'] == 'heartbeat': time.sleep(0.1) msg = await ws.receive_message() result = ws_assert(expected=expected,msg=msg) assert result def ws_send_handler(case,ws): if case.get('file'): '''预留文件处理区''' data = case['data'] # 消息数据 msgType = 'newstext' if data['type'] in ['text', 'emoji']: content = r'{"contentList":"%s","attachmentList":[]}' % data['content'] elif data['type'] == 'picture': content = r'{"contentList":"

","attachmentList":[]}' elif data['type'] == 'file': content = r'{"contentList":"","attachmentList":[{"src":"https://img.willdesk.com/test/chat/5973/2024/03/06/6e7d5240ef4f8ecec7430851f764b2ee/新建_文本文档.txt","blobSrc":"https://img.willdesk.com/test/chat/5973/2024/03/06/6e7d5240ef4f8ecec7430851f764b2ee/新建_文本文档.txt","name":"新建 文本文档.txt","size":0,"type":"text/plain"}]}' elif data['type'] == 'article': content = r'{"contentList":"
article title

article_detail

View all →
","attachmentList":[]}' % \ data['content'] elif data['type'] == 'rate': content = r'[&*{"reviewContent":"

Would you mind sharing your feedback on your experience so fa222r?

","thumbsReviews":{"good":"","bad":"FeedbackURL"},"startReviews":{"one_start":"FeedbackURL","two_start":"FeedbackURL","three_start":"FeedbackURL","four_start":"","five_start":"www.baidu.com"},"reviewTitle":"How was your chat with {AgentName}222?","reviewChoose":"thumbs","askSendReview":"true","autoSendReview":"false","primaryColor":"#2C23E5"}]' msgType = 'rate' elif data['type'] == 'link': content = r'{"contentList":"test","attachmentList":[]}' elif data['type'] == 'product': content = r'{"contentList":"
Selling Plans Ski Wax
","attachmentList":[]}' elif data['type'] == 'coupon': content = r'{"contentList":"优惠券","attachmentList":[]}' fromId = data['fromId'] if data.get('fromId') else getattr(Global, 'customerId') siteId = data['siteId'] if data.get('siteId') else getattr(Global, 'shopId') uid = data['uid'] if data.get('uid') else getattr(Global, 'userId') username = data['username'] if data.get('username') else getattr(Global, 'name') isNote = data['isNote'] if data.get('isNote') else False roomId = data['roomId'] messageData = { 'action': 'sendmsg', 'data': { 'avatar': "https://img.willdesk.com/${picUrl}", 'clientId': ws.client_id, 'fromId': fromId, # c端userId 'isNote': isNote, 'message': content, 'msgType': msgType, 'platform': 1, 'roomChannelType': 'wk', 'roomId': roomId, 'roomPlatform': 'willdesk', 'siteId': siteId, # 店铺id 'uType': 2, 'uid': uid, # userId 'username': username # 品牌名称 }, 'seq': str(random.randint(1000000000000000, 9999999999999999)) } await ws.send_message(message=json.dumps(messageData)) response = eval(await ws.receive_message()) return response def ws_assert(expected,msg): ''' :param expected: 断言结果 :param res: 返回的信息 :return: ''' try: msg = eval(msg) except Exception: return 0 for item in expected: # 循环每一个断言场景 for item_key, item_value in item.items(): assert_way = item_key path = list(item_value.keys())[0] # 拿到path路径 value = list(item_value.values())[0] res_path_value = jsonpath.jsonpath(msg, path)[0] if assert_way == 'eq': # eq代表完全相同 assert value == res_path_value elif assert_way == 'not_eq': # not_eq代表不相同 assert value != res_path_value elif assert_way == 'almost': # almonst代表四舍五入后相同就判断为一致 assert round(value, 2) == round(res_path_value, 2) elif assert_way == 'in': # in代表断言值在返回值中 assert value in res_path_value elif assert_way == 'ain': assert res_path_value in value def ws_run(ws,case): ''' :param ws: 传入websocket服务 :param case: 传入用例数据 ''' loop = asyncio.get_event_loop() loop.run_until_complete(ws_action(ws=ws,case=case)) # 使用示例 # async def main(): # data = json.dumps({ # 'action': "sendmsg", # 'data': { # 'avatar': "https://img.willdesk.com/${picUrl}", # 'clientId': 'cnk14g7fot40rbaaqa20', # 'fromId': '35948', # 'isNote': False, # 'message': '{"contentList":"testtesttest
","attachmentList":[]}', # 'msgType': 'newstext', # 'platform': 1, # 'roomChannelType': 'wk', # 'roomId': '502856393624136484', # 'roomPlatform': 'willdesk', # 'siteId': '7238', # 'uType': 2, # 'uid': '5746', # 'username': 'zhs' # }, # 'seq' : str(random.randint(1000000000000000, 9999999999999999)) # }) # webs = ws(url=f'wss://atestws.sealapps.com/ws?platform=1&authorization=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MDk3MDc4NDAsImlzcyI6IndpbGxkZXNrLmNvbSIsInN1YiI6IkxvZ2luQXV0aFRva2VuIiwidXNlcl9pbmZvIjp7ImNvbXBhbnlJZCI6NjA1OSwiYnJhbmRJZCI6NjA1MiwiY3VzdG9tZXJTZXJ2aWNlSWQiOjU3NDYsImxvZ2luUm9sZSI6MSwiY2xpZW50SWQiOiJjbmsxNGc3Zm90NDByYmFhcWEyMCIsImxvZ2luVGltZSI6MTcwOTcwNzg0MH0sImtleSI6ImFsbHBsYXRmb3JtLXdpbGxkZXNrLXVzZXIifQ.XcTxOOmKaSQDGSiZIHfx6AB83c0qlo8kzs2zQpV-XDw&t=1709710166112',attr='willdesk') # await asyncio.create_task(webs.send_message(message=data)) # msg = await asyncio.create_task(webs.receive_message()) # # loop = asyncio.get_event_loop() # loop.run_until_complete(main())