今天的內容不涉及算法,只講講如何使用tornado構建websocket服務。Python的web框架眾多,tornado在其中不如Django和Flask應用廣泛,也不如新出現的框架速度快,但是對我來說是最順手的,tornado功能上不複雜也不簡陋,性能也能完全滿足我的需求,是我十分鐘意的。
tornado自帶websocket伺服器和客戶端的功能,可以拿來即用,十分方便。我曾經還使用過websockets這個包,本以為專門做websocket的工具會更強大一些,但是速度比較慢,不知道是我代碼的問題還是其他什麼原因。本文就簡單給出一個tornado實現的範例,供讀者參考。
進入正題,直接上代碼
import jsonimport loggingimport signalfrom typing import Any, Dict, Union
import tornado.ioloopimport tornado.optionsimport tornado.webfrom tornado.concurrent import Futurefrom tornado.websocket import WebSocketHandler
class EchoWebSocket(WebSocketHandler): def write_message(self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False) -> "Future[None]": if isinstance(message, (dict, )): message = json.dumps(message, ensure_ascii=False) return super().write_message(message, binary=binary)
def open(self): self.write_message({"msg": "WebSocket opened"})
def on_message(self, message): self.write_message({"msg": u"You said: " + message})
def on_close(self): logging.warning("WebSocket closed")
def check_origin(self, origin): return True
class MyApplication(tornado.web.Application): is_closing = False
def signal_handler(self, signum, frame): logging.info('exiting...') self.is_closing = True
def try_exit(self): if self.is_closing: tornado.ioloop.IOLoop.instance().stop() logging.info('exit success')
application = MyApplication([ (r"/", EchoWebSocket),])
if __name__ == "__main__": tornado.options.parse_command_line() signal.signal(signal.SIGINT, application.signal_handler) application.listen(8888) tornado.ioloop.PeriodicCallback(application.try_exit, 100).start() tornado.ioloop.IOLoop.instance().start()這部分是伺服器代碼。
17-22行覆寫了WebSocketHandler類的write_message方法,因為這裡使用json做服務端和客戶端之間通信的格式,如果通信內容帶中文等非ascii字符,可能會被處理成全部ascii字符,所以此處手動指定ensure_ascii=False。
33行這個check_origin方法,跟安全有關,此處直接返回True是為了演示方便,實際項目中不要這樣寫。如果你想要允許來自你站點子域名的連結的話,可以按照這個示例來寫:
def check_origin(self, origin): parsed_origin = urllib.parse.urlparse(origin) return parsed_origin.netloc.endswith(".mydomain.com")與服務端代碼配合的客戶端代碼如下:
import asyncioimport json
from tornado.websocket import websocket_connect
async def echo1(): conn = await websocket_connect("ws://127.0.0.1:8888") write_data = {"msg": "有錢大曬啊"} while True: await asyncio.sleep(1) conn.write_message(json.dumps(write_data, ensure_ascii=False)) msg = await conn.read_message() write_data = {"msg": "Sorry,有錢真系大曬嘅"} if msg is None: break print(json.loads(msg))
async def echo2(): def msg_callback(msg): print(msg)
conn = await websocket_connect("ws://127.0.0.1:8888", on_message_callback=msg_callback) write_data = {"msg": "有錢大曬啊"} while True: await asyncio.sleep(1) conn.write_message(json.dumps(write_data, ensure_ascii=False)) write_data = {"msg": "Sorry,有錢真系大曬嘅"}
if __name__ == "__main__": asyncio.run(echo2())代碼運行效果如下:
服務端
客戶端
這裡的服務端和客戶端代碼組合起來是一個複讀機的功能,只為拋磚引玉,讀者可以魔改出一個有趣的東西。
最後,感謝讀者能夠讀到這裡。
另一種websocket實現:GitHub - aaugustin/websockets: Library for building WebSocket servers and clients in Python
通過ctrl + c來關閉tornado的實現代碼:https://stackoverflow.com/questions/17101502/how-to-stop-the-tornado-web-server-with-ctrlc