我們在實際的工作開發中,必然會遇到一些需要網頁與伺服器端保持連接(起碼看上去是保持連接)的需求,比如類似微信網頁版的聊天類應用,比如需要頻繁更新頁面數據(實時數據例如天氣,電流電壓,pm2.5等等這樣類似的數據)的監控系統頁面或股票看盤頁面,比如伺服器讀取MySQL或者redis或者第三方的數據主動推送給瀏覽器客戶端等等業務場景。我們通常採用如下幾種技術:
(1)短輪詢:前端老師利用ajax定期向伺服器發起http請求,無論數據是否更新立馬返回數據。這樣存在的缺點就是,一方面如果後端數據木有更新,那麼這一次http請求就是無用的,另一方面高並發情況下,短連結的頻繁創建銷毀,以及客戶端數量過大造成過多無用的http請求,都會對伺服器和帶寬造成壓力,短輪詢只適用於客戶端連接少,並發量不高的場景; (2)長輪詢:利用comet不斷向伺服器發起請求,伺服器將請求暫時掛起,直到有新的數據的時候才返回,相對短輪詢減少了請求次數得到了一定的優化,但是在高並發的場景下依然不適用; (3)SSE:服務端推送(Server Send Event),在客戶端發起一次請求後會保持該連接,伺服器端基於該連接持續向客戶端發送數據,從HTML5開始加入。 (4)Websocket:這是也是一種保持長連接的技術,並且是雙向的,從HTML5開始加入,並非完全基於HTTP,適合於頻繁和較大流量的雙向通訊場景,是伺服器推送消息功能的最佳實踐。而實現websocket的最佳方式,就是netty。網上的很多netty搭建websocket的博文都不夠全面,有很多問題都木有解決方式,我通過實際工作中的經驗,把常遇到的問題總結了方法,下文會說到!
什麼是websocekt呢?
websocket是一種在單個TCP連接上進行全雙工通信的協議。也就是一種保持長連接的技術,並且是雙向的。
websocket協議本身是構建在http協議之上的升級協議,客戶端首先向伺服器端去建立連接,這個連接本身就是http協議只是在頭信息中包含了一些websocket協議的相關信息,一旦http連接建立之後,伺服器端讀到這些websocket協議的相關信息就將此協議升級成websocket協議。WebSocket使得客戶端和伺服器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在WebSocket API中,瀏覽器和伺服器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。
簡單理解,就是一種通訊協議,重點是websocket的實現方式–netty。
什麼是netty呢?
3.1 socket(1)網絡編程本質就是說兩個設備之間信息的發送與接收,通過操作相應API調度計算機硬體資源,並且利用管道(網線)進行數據交互的過程。相關技術點像ISO七層模型,TCP三次握手/四次揮手等網絡編程的基礎不再贅述。
(2)而socket是對TCP/IP協議的封裝,Socket本身並不是協議,而是一個調用接口(API),通過Socket發起系統調用作業系統內核,我們才能使用TCP/IP協議。
(3)我們經常說的I/O,在計算機中指Input/Output,即輸入輸出,實質上IO分為兩種,一種是磁碟IO,磁碟上的數據讀取到用戶空間,那麼這次數據轉移操作其實就是一次I/O操作,也就是一次文件I/O。一種是網絡IO,當一個客戶端和服務端之間相互通信,交互我們稱之為網絡io(網絡通訊)
3.2 Java IO模型有BIO(同步阻塞IO)、NIO(同步非阻塞IO)、AIO(異步IO),netty就是一個NIO的高性能的框架。
(1)BIO:同步阻塞IO模型,適用於連接數目比較少且固定的架構,這種方式對伺服器資源要求比較高,並發局限於應用中,伺服器實現模式為一個連接一個線程,即客戶端有連 接請求時伺服器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造 成不必要的線程開銷,可以通過線程池機制改善(實現多個客戶連接伺服器)。
(2)NIO:同步非阻塞IO模型,適用於連接數據多且連接比較短的架構,如聊天伺服器,彈幕系統,伺服器間通訊等;伺服器實現模式為一個線程處理多個連接(一個請求一個線程),包含Selector 、Channel 、Buffer三大組件。
①selector選擇器:用一個線程處理多個客戶端的連接,就會使用到selector選擇器,selector用於監聽多個通道上是否有事件發生(比如連接請求,數據到達等),如果有事件發生,便獲取事件然後針對於每個事件進行相應的處理,因此可以使用單個線程就可以監聽多個客戶端通道。
②channel通道:Channel管道和Java IO中的Stream(流)是差不多一個等級的。只不過Stream是單向的,譬如:InputStream, OutputStream.而Channel是雙向的,同時進行讀寫數據,而流只能讀或者寫。可以實現異步讀寫數據,可以從緩衝區讀數據,也可以寫數據到緩衝區。
③buffer緩衝區:Buffer本質上是一個可以讀寫的內存塊,可以理解成容器對象,底層是有一個數組,通過buffer實現非阻塞機制,該對象提供了一組方法,可以輕鬆的使用內存塊,緩衝區對象內置了一些機制,能夠跟蹤和記錄緩衝區的狀態變化情況。Channel提供了從文件,網絡讀取數據的通道,但是讀取和寫入的數據必須經過buffer。
(3)AIO:異步IO模型,使用於連接數目多且連接比較長(重操作)的架構,比如相冊伺服器,充分 調用OS參與並發操作,編程比較複雜。在Linux底層用epoll(一種輪詢模型),aio多包了一層封裝,aio的api更好用。Windows上的aio是自己實現的,不是輪詢模型是事件模型,完成埠實現的,要比linxu上的aio效率高。
3.3 netty3.3.1 概念:netty是一個開源異步的事件驅動的網絡應用程式框架,用於快速開發可維護的高性能協議伺服器和客戶端。3.3.2 三大特點:①高並發:Netty 是一款基於 NIO(Nonblocking IO,非阻塞IO)開發異步事件驅動的高性能網絡通信框架,nio使用了select模型(多路復用器技術),從而使得系統在單線程的情況下可以同時處理多個客戶端請求。Netty使用了Reactor模型,Reactor模型有三種多線程模型,netty是在主從 Reactor 多線程模型上做了一定的改進。
Netty有兩個線程組,一個作為bossGroup線程組,負責客戶端接收,一個workerGroup線程組負責工作線程的工作(與客戶端的IO操作和任務操作等等),Netty 的所有 IO 操作都是異步非阻塞的,通過 Future-Listener 機制,用戶可以方便的主動獲取或者通過通知機制獲得 IO 操作結果。他的並發性能得到了很大提高。
②傳輸快:Netty 的傳輸依賴於零拷貝特性,實現了更高效率的傳輸。零拷貝要求內核(kernel)直接將數據從磁碟文件拷貝到Socket緩衝區(套接字),而無須通過應用程式。零拷貝減少不必要的內存拷貝,不僅提高了應用程式的性能,而且減少了內核態和用戶態上下文切換。
③封裝好:Netty 封裝了 NIO 操作的很多細節,提供了易於使用調用接口。
3.3.3 主從Reactor架構圖
說明:①Reactor響應式編程(事件驅動模型):一般有一個主循環和一個任務隊列,所有事件只管往隊列裡塞,主循環則從隊列裡取出並處理。
如果不依賴於多路復用處理多個任務就會需要多線程(與連接數對等) ,但是依賴於多路復用,這個循環就可以在單線程的情況下處理多個連接。無論是哪個連接發生了什麼事件,都會被主循環從隊列取出並處理(可能用回調函數處理等) ,也就是說程序的走向由事件驅動.
②mainReactor:主Reactor負責 單線程就可以接受所有客戶端連接
③subReactor:子Reactor負責 多線程處理客戶端的讀寫IO事件
④ThreadPool:線程池負責 處理業務耗時的操作
3.3.4 應用場景①現在物聯網的應用無處不在,大量的項目都牽涉到應用傳感器和伺服器端的數據通信,Netty作為基礎通信組件進行網絡編程。
②現在網際網路系統講究的都是高並發、分布式、微服務,各類消息滿天飛,Netty在這類架構裡面的應用可謂是如魚得水,如果你對當前的各種應用伺服器不爽,那麼完全可以基於Netty來實現自己的HTTP伺服器,FTP伺服器,UDP伺服器,RPC伺服器,WebSocket伺服器,Redis的Proxy伺服器,MySQL的Proxy伺服器等等。
現在非常多的開源軟體都是基於netty開發的,例如阿里分布式服務框架 Dubbo 的 RPC 框架,淘寶的消息中間件 RocketMQ;
③遊戲行業:無論是手遊服務端還是大型的網路遊戲,Java 語言得到了越來越廣泛的應用。Netty 作為高性能的基礎通信組件,它本身提供了 TCP/UDP 和 HTTP 協議棧。地圖伺服器之間可以方便的通過 Netty 進行高性能的通信。
④大數據:開源集群運算框架 Spark;分布式計算框架 Storm;
4.1 系統設計架構圖4.2 架構中存在的六大經典問題第一個問題:客戶端和服務端單獨通信,怎麼實現?
第二個問題:單機中websocekt主動向所有客戶端推送消息如何實現?在集群中如何實現?
第三個問題:單機如何統計同時在線的客戶數量?websocket集群如何統計在線的客戶數量呢?
第四個問題:由於客戶端和websocket伺服器集群中的某個節點建立長連接是隨機的,如何解決服務端向某個或某些部分客戶端推送消息?
第五個問題:websocket服務端周期性向客戶端推送消息,單機或集群中如何實現?
第六個問題:websocket集群中一個客戶端向其他客戶端主動發送消息,如何實現?
福利來啦!!以上所有問題,已在代碼中全部解決並實踐!!!
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
</dependencies>
(2)yml配置
websocket:
port: 7000 #埠
url: /msg #訪問url
(3) 客戶端和服務端交互的消息體
package com.wander.netty.websocket.yeelight;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageRequest implements Serializable {
private Long unionId;
private Integer current = 1;
private Integer size = 10;
}
/**
* @Author WDYin
* @Date 2021/6/10
* @Description websocket初始化器
**/
@Slf4j
@Component
public class WebsocketInitialization {
@Resource
private WebsocketChannelInitializer websocketChannelInitializer;
@Value("${websocket.port}")
private Integer port;
@Async
public void init() throws InterruptedException {
//bossGroup連接線程組,主要負責接受客戶端連接,一般一個線程足矣
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//workerGroup工作線程組,主要負責網絡IO讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//啟動輔助類
ServerBootstrap serverBootstrap = new ServerBootstrap();
//bootstrap綁定兩個線程組
serverBootstrap.group(bossGroup, workerGroup);
//設置通道為NioChannel
serverBootstrap.channel(NioServerSocketChannel.class);
//可以對入站\出站事件進行日誌記錄,從而方便我們進行問題排查。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
//設置自定義的通道初始化器,用於入站操作
serverBootstrap.childHandler(websocketChannelInitializer);
//啟動伺服器,本質是Java程序發起系統調用,然後內核底層起了一個處於監聽狀態的服務,生成一個文件描述符FD
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//異步
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* @Author WDYin
* @Date 2021/6/10
* @Description websocket通道初始化器
**/
@Component
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private WebSocketHandler webSocketHandler;
@Value("${websocket.url}")
private String websocketUrl;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//獲取pipeline通道
ChannelPipeline pipeline = socketChannel.pipeline();
//因為基於http協議,使用http的編碼和解碼器
pipeline.addLast(new HttpServerCodec());
//是以塊方式寫,添加ChunkedWriteHandler處理器
pipeline.addLast(new ChunkedWriteHandler());
/*
說明
1. http數據在傳輸過程中是分段, HttpObjectAggregator ,就是可以將多個段聚合
2. 這就就是為什麼,當瀏覽器發送大量數據時,就會發出多次http請求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/* 說明
1. 對應websocket ,它的數據是以 幀(frame) 形式傳遞
2. 可以看到WebSocketFrame 下面有六個子類
3. 瀏覽器請求時 ws://localhost:7000/msg 表示請求的uri
4. WebSocketServerProtocolHandler 核心功能是將 http協議升級為 ws協議 , 保持長連接
5. 是通過一個 狀態碼 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
//自定義的handler ,處理業務邏輯
pipeline.addLast(webSocketHandler);
}
}
/**
* @Author WDYin
* @Date 2021/6/10
* @Description websocket處理器
**/
@Slf4j
@Component
@ChannelHandler.Sharable//保證處理器,在整個生命周期中就是以單例的形式存在,方便統計客戶端的在線數量
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//通道map,存儲channel,用於群發消息,以及統計客戶端的在線數量,解決問題問題三,如果是集群環境使用redis的hash數據類型存儲即可
private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
//任務map,存儲future,用於停止隊列任務
private static Map<String, Future> futureMap = new ConcurrentHashMap<>();
//存儲channel的id和用戶主鍵的映射,客戶端保證用戶主鍵傳入的是唯一值,解決問題四,如果是集群中需要換成redis的hash數據類型存儲即可
private static Map<String, Long> clientMap = new ConcurrentHashMap<>();
/**
* 客戶端發送給服務端的消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
try {
//接受客戶端發送的消息
MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);
//每個channel都有id,asLongText是全局channel唯一id
String key = ctx.channel().id().asLongText();
//存儲channel的id和用戶的主鍵
clientMap.put(key, messageRequest.getUnionId());
log.info("接受客戶端的消息." + ctx.channel().remoteAddress() + "-參數[" + messageRequest.getUnionId() + "]");
if (!channelMap.containsKey(key)) {
//使用channel中的任務隊列,做周期循環推送客戶端消息,解決問題二和問題五
Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
//存儲客戶端和服務的通信的Chanel
channelMap.put(key, ctx.channel());
//存儲每個channel中的future,保證每個channel中有一個定時任務在執行
futureMap.put(key, future);
} else {
//每次客戶端和服務的主動通信,和服務端周期向客戶端推送消息互不影響 解決問題一
ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "伺服器時間" + LocalDateTime.now() + "wdy"));
}
} catch (Exception e) {
log.error("websocket伺服器推送消息發生錯誤:", e);
}
}
/**
* 客戶端連接時候的操作
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("一個客戶端連接." + ctx.channel().remoteAddress() + Thread.currentThread().getName());
}
/**
* 客戶端掉線時的操作
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String key = ctx.channel().id().asLongText();
//移除通信過的channel
channelMap.remove(key);
//移除和用戶綁定的channel
clientMap.remove(key);
//關閉掉線客戶端的future
Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
future.cancel(true);
futureMap.remove(key);
});
log.info("一個客戶端移除." + ctx.channel().remoteAddress());
ctx.close(); //關閉連接
}
/**
* 發生異常時執行的操作
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
String key = ctx.channel().id().asLongText();
//移除通信過的channel
channelMap.remove(key);
//移除和用戶綁定的channel
clientMap.remove(key);
//移除定時任務
Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
future.cancel(true);
futureMap.remove(key);
});
//關閉長連接
ctx.close();
log.info("異常發生 " + cause.getMessage());
}
public static Map<String, Channel> getChannelMap() {
return channelMap;
}
public static Map<String, Future> getFutureMap() {
return futureMap;
}
public static Map<String, Long> getClientMap() {
return clientMap;
}
}
/**
* @Author WDYin
* @Date 2021/8/10
* @Description websocket程序
**/
@Slf4j
@Component
public class WebsocketApplication {
@Resource
private WebsocketInitialization websocketInitialization;
@PostConstruct
public void start() {
try {
log.info(Thread.currentThread().getName() + ":websocket啟動中.");
websocketInitialization.init();
log.info(Thread.currentThread().getName() + ":websocket啟動成功!!!");
} catch (Exception e) {
log.error("websocket發生錯誤:",e);
}
}
}
/**
* @Author WDYin
* @Date 2021/8/10
* @Description websocket程序
**/
@Slf4j
@Component
public class WebsocketApplication {
@Resource
private WebsocketInitialization websocketInitialization;
@PostConstruct
public void start() {
try {
log.info(Thread.currentThread().getName() + ":websocket啟動中.");
websocketInitialization.init();
log.info(Thread.currentThread().getName() + ":websocket啟動成功!!!");
} catch (Exception e) {
log.error("websocket發生錯誤:",e);
}
}
}
/**
* @Author WDYin
* @Date 2021/9/12
* @Description
**/
@RequestMapping("index")
@Controller
public class WebsocketController {
/**
*
* @param id 用戶主鍵
* @param idList 要把消息發送給其他用戶的主鍵
*/
@RequestMapping("hello1")
private void hello(Long id, List<Long> idList){
//獲取所有連接的客戶端,如果是集群環境使用redis的hash數據類型存儲即可
Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
//獲取與用戶主鍵綁定的channel,如果是集群環境使用redis的hash數據類型存儲即可
Map<String, Long> clientMap = WebSocketHandler.getClientMap();
//解決問題六,websocket集群中一個客戶端向其他客戶端主動發送消息,如何實現?
clientMap.forEach((k,v)->{
if (idList.contains(v)){
Channel channel = channelMap.get(k);
channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName()+"伺服器時間" + LocalDateTime.now() + "wdy")));
}
});
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判斷當前瀏覽器是否支持websocket
if(window.WebSocket) {
socket = new WebSocket("ws://localhost:7000/msg");
//相當於channelReado, ev 收到伺服器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
//相當於連接開啟(感知到連接開啟)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "連接開啟了.."
}
//相當於連接關閉(感知到連接關閉)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "連接關閉了.."
}
} else {
alert("當前瀏覽器不支持websocket")
}
//發送消息到伺服器
function send(websocketMessage) {
if(!window.socket) { //先判斷socket是否創建好
return;
}
if(socket.readyState == WebSocket.OPEN) {
//通過socket 發送消息
socket.send(websocketMessage)
} else {
alert("連接沒有開啟");
}
}
</script>
<form onsubmit="return false">
<textarea name="websocketMessage" style="height: 300px; width: 300px"></textarea>
<input type="button" value="發生消息" onclick="send(this.form.websocketMessage.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
#第一步:
upstream websocket-router {
server 192.168.1.31:7000 max_fails=10 weight=1 fail_timeout=5s;
keepalive 1000;
}
#第二步:
server {
listen 80; #監聽80埠
server_name websocket.wdy.com; #域名配置
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
location / {
client_max_body_size 100M;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header Upgrade $http_upgrade; #支持wss
proxy_set_header Connection "upgrade"; #支持wssi
proxy_pass http://websocket-router; #代理路由
root html;
index index.html index.htm;
}
}
連結:https://blog.csdn.net/qq_41889508/article/details/105953114
如果看到這裡,說明你喜歡這篇文章,請 轉發、點讚。微信搜索「web_resource」,關注後回復「進群」或者掃描下方二維碼即可進入無廣告交流群。1. GitHub 上有什麼好玩的項目?
2. Linux 運維必備 150 個命令匯總
3. SpringSecurity + JWT 實現單點登錄