import asyncio
import websockets
import time,random,json,jsonpath
from Utils.global_variate import Global
from TestData.inbox.livechat_test_data import livechatData
loop = asyncio.get_event_loop() #异步主线
class ws:
def __init__(self, url, attr):
self.url = url
self.websocket = None
self.loop = asyncio.get_event_loop()
self.client_id = ''
self.attr = attr
async def get_client_id(self):
if not self.websocket:
self.websocket = await websockets.connect(self.url)
enter_res = json.loads(await self.websocket.recv())
if enter_res.get('action'):
if enter_res['action'] in ['readMsg', 'tipmsg']:
self.client_id = enter_res['data']['clientId']
elif enter_res['action'] == 'sendmsg':
self.client_id = enter_res['data']['response']['clientId']
else: # action是heartbeat的情况
self.client_id = enter_res['data']['cliend']
else: # action是enter的情况
self.client_id = enter_res['data']['clientId']
async def connect(self):
try:
self.websocket = await websockets.connect(self.url)
await self.get_client_id()
# print(f"Connected to {self.url}")
except websockets.ConnectionClosed as e:
await self.close()
async def send_message(self, message):
if self.websocket and self.websocket.open:
try:
print('开始发送消息')
await self.websocket.send(message)
print(f"Sent message: {message}")
except websockets.ConnectionClosed as e:
await self.close()
except Exception:
print('error')
else:
print('send重新链接')
await self.connect()
await self.send_message(message)
async def receive_message(self):
if self.websocket and self.websocket.open:
try:
message = await self.websocket.recv()
print(f"Received message: {message}")
return message
except websockets.ConnectionClosed as e:
print('receive关闭链接')
await self.close()
except NameError:
return message
except Exception as e:
print(e)
else:
print('receive重新链接')
await self.connect()
return await self.receive_message()
async def close(self):
if self.websocket:
await self.websocket.close()
self.websocket = None
print("WebSocket connection closed")
async def ws_action(ws,case):
'''
:param ws: 传入websocket服务
:param case: 传入用例数据
'''
action = case['action'] #操作
if action == 'sendmsg':
response = await ws_send_handler(case=case,ws=ws)
assert response['response']['code'] == 200
assert response['action'] == 'sendmsg'
elif action == 'receive':
expected = case['expected'] # 断言
if ws.client_id == '':
await ws.get_client_id()
msg = json.loads(await ws.receive_message())
ws_assert(expected=expected,msg=msg)
async def ws_send_handler(case,ws):
data = case['data'] # 消息数据
msgType = 'newstext'
if data['type'] in ['text', 'emoji']:
content = r'{"contentList":"%s","attachmentList":[]}' % data['content']
elif data['type'] == 'image':
content = "{\"contentList\":\"

\",\"attachmentList\":[]}"%livechatData.imgSrc
elif data['type'] == 'file':
content = r'{"contentList":"","attachmentList":[{"src":"%s","blobSrc":"%s","name":"新建 文本文档.txt","size":0,"type":"text/plain"}]}'%(livechatData.fileSrc,livechatData.fileSrc)
elif data['type'] == 'article':
content = r'{"contentList":"","attachmentList":[]}' %data['content']
elif data['type'] == 'rate':
content = r'[&*{"reviewContent":"%s","thumbsReviews":{"good":"","bad":"FeedbackURL"},"startReviews":{"one_start":"FeedbackURL","two_start":"FeedbackURL","three_start":"FeedbackURL","four_start":"","five_start":"www.baidu.com"},"reviewTitle":"%s","reviewChoose":"thumbs","askSendReview":"true","autoSendReview":"false","primaryColor":"#2C23E5"}]'%(livechatData.rateContent,livechatData.rateTitle)
msgType = 'rate'
elif data['type'] == 'link':
content = "{\"contentList\":\"%s\",\"attachmentList\":[]}"%(livechatData.linkHref,livechatData.linkTitle)
elif data['type'] == 'product':
content = "{\"contentList\":\"
\",\"attachmentList\":[]}"
elif data['type'] == 'coupon':
content = r'{"contentList":"优惠券","attachmentList":[]}'
fromId = data['fromId'] if data.get('fromId') else getattr(Global, 'customerId')
siteId = data['siteId'] if data.get('siteId') else getattr(Global, 'shopId')
uid = data['uid'] if data.get('uid') else getattr(Global, 'userId')
username = data['username'] if data.get('username') else getattr(Global, 'name')
isNote = data['isNote'] if data.get('isNote') else False
roomId = str(data['roomId']) if ws.attr == 'willdesk' else ''
avatar = 'https://img.willdesk.com/${picUrl}' if ws.attr == 'willdesk' else ''
if ws.client_id == '':
await ws.get_client_id()
client_id = ws.client_id
if ws.attr == 'willdesk':
data = {
'avatar': avatar,
'clientId': client_id,
'fromId': str(fromId), # c端userId
'isNote': isNote,
'message': content,
'msgType': msgType,
'platform': 1,
'roomChannelType': 'wk',
'roomId': roomId,
'roomPlatform': 'willdesk',
'siteId': str(siteId), # 店铺id
'uType': 2,
'uid': str(uid), # userId
'username': username # 品牌名称
}
else:
data = {
'avator': avatar,
'clientId': client_id,
'isNote': isNote,
'message': content,
'msgType': msgType,
'pageUrl': 'https://zhs-test.myshopify.com/',
'platform': 1,
'roomId': roomId,
'sendTime': int(time.time()),
'siteId': str(siteId), # 店铺id
'uType': 1,
'uid': str(fromId), # userId
'username': Global.customerName # 品牌名称
}
messageData = {
'action': 'sendmsg',
'data': data,
'seq': str(random.randint(1000000000000000, 9999999999999999))
}
await ws.send_message(message=json.dumps(messageData))
response = json.loads(await ws.receive_message())
return response
def ws_assert(expected,msg):
'''
:param expected: 断言结果
:param res: 返回的信息
:return:
'''
for item in expected: # 循环每一个断言场景
for item_key, item_value in item.items():
assert_way = item_key
path = list(item_value.keys())[0] # 拿到path路径
value = list(item_value.values())[0]
res_path_value = jsonpath.jsonpath(msg, path)[0]
if assert_way == 'eq': # eq代表完全相同
assert value == res_path_value
elif assert_way == 'not_eq': # not_eq代表不相同
assert value != res_path_value
elif assert_way == 'almost': # almonst代表四舍五入后相同就判断为一致
assert round(value, 2) == round(res_path_value, 2)
elif assert_way == 'in': # in代表断言值在返回值中
assert value in res_path_value
elif assert_way == 'ain':
assert res_path_value in value
async def ws_run(ws,case):
'''
:param ws: 传入websocket服务
:param case: 传入用例数据
'''
# asyncio.run(ws_action(ws=ws,case=case))
task = asyncio.ensure_future(ws_action(ws=ws,case=case))
await asyncio.wait_for(task,timeout=10)
# loop.run_until_complete(ws_action(ws=ws,case=case))
# 使用示例
# async def main():
# data = json.dumps({
# 'action': "sendmsg",
# 'data': {
# 'avator': "",
# 'clientId': 'cnkm29etjph80j73niq0',
# 'isNote': False,
# 'message': '{"contentList":"testtesttest
","attachmentList":[]}',
# 'msgType': 'newstext',
# 'platform': 1,
# 'roomId': '',
# 'siteId': '7238',
# 'uType': 1,
# 'sendTime': int(time.time()),
# 'uid': '36202',
# 'username': 'zhs'
# },
# 'seq' : str(random.randint(1000000000000000, 9999999999999999))
# })
# webs = ws(url=f'wss://atestws.sealapps.com/ws?platform=1&authorization=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MDk3NzY4OTgsImlzcyI6IndpbGxkZXNrLmNvbSIsInN1YiI6IkxvZ2luQXV0aFRva2VuIiwidXNlcl9pbmZvIjp7ImNvbXBhbnlJZCI6NjA1OSwiYnJhbmRJZCI6NjA1MiwiY3VzdG9tZXJTZXJ2aWNlSWQiOjU3NDYsImxvZ2luUm9sZSI6MSwiY2xpZW50SWQiOiJjbmtpMDBnNmxnY2M3MWNxZHM1ZyIsImxvZ2luVGltZSI6MTcwOTc3Njg5OH0sImtleSI6ImFsbHBsYXRmb3JtLXdpbGxkZXNrLXVzZXIifQ.EllFwfirKFTB88BzBigz64tBz1SJQFFwFp0tWlQzWVU&t=1709802959971',attr='willdesk')
# await asyncio.create_task(webs.send_message(message=data))
# msg = await asyncio.create_task(webs.receive_message())
# print(msg)
#
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())