Netty從入門到禿頭:websocket

2020-10-22 大鵝啊

1. 核心依賴

<dependencies> <!--netty的依賴集合,都整合在一個依賴裡面了--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency></dependencies>

2. 代碼

2.1 啟動項

public class NioWebSocketServer { private final Logger logger=Logger.getLogger(this.getClass()); private void init(){ logger.info("正在啟動websocket伺服器"); NioEventLoopGroup boss=new NioEventLoopGroup(); NioEventLoopGroup work=new NioEventLoopGroup(); try { ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,work); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new NioWebSocketChannelInitializer()); Channel channel = bootstrap.bind(8081).sync().channel(); logger.info("webSocket伺服器啟動成功:"+channel); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); logger.info("運行出錯:"+e); }finally { boss.shutdownGracefully(); work.shutdownGracefully(); logger.info("websocket伺服器已關閉"); } } public static void main(String[] args) { new NioWebSocketServer().init(); }}

netty搭建的伺服器基本上都是差不多的寫法:

  • 綁定主線程組和工作線程組,這部分對應架構圖中的事件循環組
  • 只有伺服器才需要綁定埠,客戶端是綁定一個地址
  • 配置channel(數據通道)參數,重點就是ChannelInitializer的配置
  • 以異步的方式啟動,最後是結束關閉兩個線程組

2.2 ChannelInitializer寫法

public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//設置log監聽器,並且日誌級別為debug,方便觀察運行流程 ch.pipeline().addLast("http-codec",new HttpServerCodec());//設置解碼器 ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));//聚合器,使用websocket會用到 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());//用於大數據的分區傳輸 ch.pipeline().addLast("handler",new NioWebSocketHandler());//自定義的業務handler }}

2.3 自定義的處理器NioWebSocketHandler

public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> { private final Logger logger=Logger.getLogger(this.getClass()); private WebSocketServerHandshaker handshaker; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { logger.debug("收到消息:"+msg); if (msg instanceof FullHttpRequest){ //以http請求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); }else if (msg instanceof WebSocketFrame){ //處理websocket客戶端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加連接 logger.debug("客戶端加入連接:"+ctx.channel()); ChannelSupervise.addChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //斷開連接 logger.debug("客戶端斷開連接:"+ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ // 判斷是否關閉鏈路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判斷是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } // 本例程僅支持文本消息,不支持二進位消息 if (!(frame instanceof TextWebSocketFrame)) { logger.debug("本例程僅支持文本消息,不支持二進位消息"); throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } // 返回應答消息 String request = ((TextWebSocketFrame) frame).text(); logger.debug("服務端收到:" + request); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 群發 ChannelSupervise.send2All(tws); // 返回【誰發的發給誰】 // ctx.channel().writeAndFlush(tws); } /** * 唯一的一次http請求,用於創建websocket * */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { //要求Upgrade為websocket,過濾掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,則創建BAD_REQUEST的req,返回給客戶端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:8081/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * 拒絕不合法的請求,並返回錯誤信息 * */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,關閉連接 if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } }}

執行流程是:

  • web發起一次類似是http的請求,並在channelRead0方法中進行處理,並通過instanceof去判斷幀對象是FullHttpRequest還是WebSocketFrame,建立連接是時候會是FullHttpRequest
  • 在handleHttpRequest方法中去創建websocket,首先是判斷Upgrade是不是websocket協議,若不是則通過sendHttpResponse將錯誤信息返回給客戶端,緊接著通過WebSocketServerHandshakerFactory創建socket對象並通過handshaker握手創建連接
  • 在連接創建好後的所以消息流動都是以WebSocketFrame來體現
  • 在handlerWebSocketFrame去處理消息,也可能是客戶端發起的關閉指令,ping指令等等

2.4 保存客戶端的信息

當有客戶端連接時候會被channelActive監聽到,當斷開時會被channelInactive監聽到,一般在這兩個方法中去保存/移除客戶端的通道信息,而通道信息保存在ChannelSupervise中:

public class ChannelSupervise { private static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap(); public static void addChannel(Channel channel){ GlobalGroup.add(channel); ChannelMap.put(channel.id().asShortText(),channel.id()); } public static void removeChannel(Channel channel){ GlobalGroup.remove(channel); ChannelMap.remove(channel.id().asShortText()); } public static Channel findChannel(String id){ return GlobalGroup.find(ChannelMap.get(id)); } public static void send2All(TextWebSocketFrame tws){ GlobalGroup.writeAndFlush(tws); }}

ChannelGroup是netty提供用於管理web於伺服器建立的通道channel的,其本質是一個高度封裝的set集合,在伺服器廣播消息時,可以直接通過它的writeAndFlush將消息發送給集合中的所有通道中去。但在查找某一個客戶端的通道時候比較坑爹,必須通過channelId對象去查找,而channelId不能人為創建,所有必須通過map將channelId的字符串和channel保存起來。

結尾

感謝看到最後的朋友,都看到最後了,點個讚再走啊,如有不對之處還請多多指正。

相關焦點

  • Netty核心14-簡易實現websocket
    前面三篇講解了如何用netty來實現http伺服器。本篇來講解下如何用netty來做websocket伺服器。關於websocket的知識點我這裡過多不講解,大家可以自行百度。Websocket之間的通信有兩個環節。
  • 解決websocket和netty中無法注入service
    首先,目前我的項目是springboot+netty,在netty-client中注入了service,但是在調用service的時候一直報null空指針異常。剛開始實驗了N次還是無法解決這個問題,以為是自己的寫法問題,並沒有想到是service無法實例化的問題,後來通過度娘,才找到了解決方法。
  • SpringBoot+Netty+Websocket整合案例(實現基本的聊天功能)
    之前使用Springboot整合了websocket,實現了一個後端向前端推送信息的基本小案例,這篇文章主要是增加了一個新的框架就是Netty,實現一個高性能的websocket伺服器,並結合前端代碼,實現一個基本的聊天功能。你可以根據自己的業務需求進行更改。
  • netty-socketio+vue實現服務端推送消息到前端
    市場上也有很多的消息推送技術,比如websocket、socketio、netty等。本文介紹下Java springboot下的netty-socketio的實現方式。--服務端--> <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.18&
  • 八、Netty入門服務端代碼
    今天我們先通過入門的角度從Idea中導入Netty包,並且使用Netty構建一個可用於接受數據的服務端。IDEA的maven項目的netty包的導入(其他jar同) 在這之前要有搭建好maven的環境,如何搭建maven環境請自行百度。1.
  • MqttWk 1.0.0-netty 發布,Java MQTT服務及消息代理
    MqttWk by netty基於 nutzboot + netty + redis + kafka 實現的MQTT服務broker(另有t-io版本分支,暫未發布)本項目代碼主要來源於
  • netty快速入門教程
    讓我們開始吧本章圍繞 Netty 的核心架構,通過簡單的示例帶你快速入門。當你讀完本章節,你馬上就可以用 Netty 寫出一個客戶端和伺服器。開始之前在開始之前我們先說明下開發環境,我們使用netty-4.1.30這個版本,jdk使用1.8及以上版本。
  • WebSocket從入門到精通,半小時就夠
    本文也是一篇關於WebSocket從入門到精通的文章,內容由淺入深,比較適合想要在短時間內較深入的了解WebSocket協議的開發者學習。當收到到來自客戶端的消息時,同樣列印日誌。接收到來自服務端的消息後,同樣列印日誌。
  • 「源碼學習」基於Netty的大文件斷點續傳開源解決方案
    在實際應用中我們經常使用到網盤服務或公司內部的文件傳輸系統,用來高效的上傳下載較大文件。:chunked、websocket 實現的大文件分塊上傳斷點續傳處理器,同時具備上傳/下載進度和上傳/下載速度的推送功能。
  • WebSocket硬核入門:徒手擼WebSocket伺服器
    到網上搜羅了一番資料後用 Node.js 實現了一個WebSocket協議伺服器,倒也沒有想像中那麼複雜,除去注釋語句和 console 語句後,大約 200 行代碼左右。本文分享了自已開發一個WebSocket服務端實現過程中需要的知識儲備,以及具體的代碼實現含義等,非常適合想在短時間內對WebSocket協議從入門到精通的
  • workerman搭建websocket入門教程,簡單實用
    試想一下,一個聯網電燈,控制端命令發出去,電燈要2到3秒鐘以後才能開關。這種體驗實在太差勁了。那麼我們該如何實現實時通訊呢?最常用的一種就是Websocket。後端能實現websocket的語方很多,例如Node.js,Go,Python,dotnet core,java。
  • 八問WebSocket協議:為你快速解答WebSocket熱門疑問
    另外,如果您對Web端的即時通訊技術還完全不了解,那麼《新手入門貼:詳解Web端即時通訊技術的原理》、《Web端即時通訊技術盤點:短輪詢、Comet、Websocket、SSE》這兩篇文章請您務必抽時間讀一讀。
  • Netty學習-Netty 快速入門實例-TCP 服務
    伺服器可以回復消息給客戶端 "hello, 客戶端~"目的:對 Netty 線程模型 有一個初步認識, 便於理解 Netty 模型理論說明: 創建 Maven 項目,並引入 Netty 包<dependency> <groupId>io.netty
  • Netty核心13-HttpResponseEncoder
    /netty/issues/2983 for more information.到此HttpResponseEncoder分析完畢。其實HttpResponseEncoder與HttpRequestDecoder,就是處理Http響應報文和請求報文的過程。我們知道了報文的格式,解析起來並不難。
  • 輕量級、高性能的WebSocket框架 源碼分享
    簡介本項目幫助你在spring-boot中使用Netty來開發WebSocket伺服器,並像spring-websocket的註解開發一樣簡單源碼獲取方式:關注轉發之後私信回復【源碼】即可免費獲取要求jdk版本為1.8或1.8+快速開始添加依賴: <dependency> <groupId>org.yeauty</groupId> <artifactId>netty-websocket-spring-boot-starter
  • 你不要和我說Netty快速入門會很難
    Netty快速入門實例-TCP服務1、實例要求:使用IDEA創建Netty項目;2、Netty伺服器在6668埠監聽,客戶端能發送消息給伺服器「hello,伺服器」;3、伺服器可以回復消息給客戶端「hello,客戶端」;4、目的:對Netty線程模型有一個初步認識,便於理解Netty模型理論;
  • SpringBoot+WebSocket實現服務端、客戶端
    作者 | 47號Gamer丶 來源 | urlify.cn/jUR3Af66套java從入門到精通實戰課程分享 一、引言本人最近一直在使用springboot框架開發項目,畢竟現在很多公司都在採用此框架
  • Netty 4.1.35.Final 發布,經典開源 Java 網絡服務框架
    to offload / customize key signing operations when using BoringSSL (#8943) Allow to offload certificate validation when using BoringSSL (#8974) Add user possibility to skip the evaluation of a certain websocket
  • 「網絡通信」Netty 入門實戰
    它不是什麼實實在在的東西,但是當你閱讀本指南和玩 Netty 的時候,你會意識到這種哲學會讓你的生活變得更加輕鬆。依賴dependencies { implementation "io.netty:netty-all:4.1.56.Final"}實戰世界上最簡單的協議實現不是發送Hello World消息,被伺服器接受到返回相應的響應結果。