利用python对websocket进行并发压测
简述
产品经理鉴于运营反馈并对程序的websocket长连接保持怀疑的态度,让我对websocket服务器进行压力测试,我内心是拒绝的。
开发思路
查阅websocket的相关资料,查到python的websocket有两个连接方式,长连接:WebSocketApp,短连接:create_connection。
使用 WebSocketApp 的话,我没办法获取websocket服务器端返回的数据,这个我还在研究,这里使用 create_connection 来进行压测。
连接:
1 ws = create_connection("websocket的服务器地址")
获取连接状态:
1 print("获取连接状态:", ws.connected) # True为连接成功,False为连接失败
发送消息至服务器并接收返回结果:
1 initialize_data = '{"code":2001, "msg": "","data": ""}' 2 ws.send(initialize_data) 3 result_initialize = ws.recv() # 获取返回结果 4 print("接收结果:", result_initialize)
因为create_connection为短连接,所以这一系列下来后连接会断开,所以这个时候需要一个东西来维持连接,长连接叫心跳,那短连接这里我愿成为:心脏起搏器(while True)
完整代码:
1 import threading 2 from websocket import create_connection 3 import time 4 5 thread = [] 6 def socket(): 7 while True: 8 ws = create_connection("websocket服务器地址") 9 # 2、获取连接状态 10 print("获取连接状态:", ws.connected) 11 print("获取服务器返回的连接结果", ws.recv()) 12 if ws.connected == True: 13 initialize = '{"code":2001, "msg": "","data": {}}' 14 ws.send(initialize) 15 result_initialize = ws.recv() # 获取返回结果 16 print("接收结果:", result_initialize) 17 if __name__ == "__main__": 18 for i in range(25): 19 t = threading.Thread(target=socket, args=()) 20 t.start()
利用python对websocket进行并发压测的相关教程结束。
本站发布的内容若侵犯到您的权益,请邮件联系站长删除,我们将及时处理!
从您进入本站开始,已表示您已同意接受本站【免责声明】中的一切条款!
本站大部分下载资源收集于网络,不保证其完整性以及安全性,请下载后自行研究。
本站资源仅供学习和交流使用,版权归原作者所有,请勿商业运营、违法使用和传播!请在下载后24小时之内自觉删除。
若作商业用途,请购买正版,由于未及时购买和付费发生的侵权行为,使用者自行承担,概与本站无关。
本文链接:https://www.10zhan.com/biancheng/10108.html