<dependencies> <!--netty的依賴集合,都整合在一個依賴裡面了--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency></dependencies>
public class NioWebSocketServer { private final Logger logger=Logger.getLogger(this.getClass()); private void init(){ logger.info("正在啟動websocket伺服器"); NioEventLoopGroup boss=new NioEventLoopGroup(); NioEventLoopGroup work=new NioEventLoopGroup(); try { ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,work); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new NioWebSocketChannelInitializer()); Channel channel = bootstrap.bind(8081).sync().channel(); logger.info("webSocket伺服器啟動成功:"+channel); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); logger.info("運行出錯:"+e); }finally { boss.shutdownGracefully(); work.shutdownGracefully(); logger.info("websocket伺服器已關閉"); } } public static void main(String[] args) { new NioWebSocketServer().init(); }}
netty搭建的伺服器基本上都是差不多的寫法:
public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//設置log監聽器,並且日誌級別為debug,方便觀察運行流程 ch.pipeline().addLast("http-codec",new HttpServerCodec());//設置解碼器 ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));//聚合器,使用websocket會用到 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());//用於大數據的分區傳輸 ch.pipeline().addLast("handler",new NioWebSocketHandler());//自定義的業務handler }}
public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> { private final Logger logger=Logger.getLogger(this.getClass()); private WebSocketServerHandshaker handshaker; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { logger.debug("收到消息:"+msg); if (msg instanceof FullHttpRequest){ //以http請求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); }else if (msg instanceof WebSocketFrame){ //處理websocket客戶端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加連接 logger.debug("客戶端加入連接:"+ctx.channel()); ChannelSupervise.addChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //斷開連接 logger.debug("客戶端斷開連接:"+ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ // 判斷是否關閉鏈路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判斷是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } // 本例程僅支持文本消息,不支持二進位消息 if (!(frame instanceof TextWebSocketFrame)) { logger.debug("本例程僅支持文本消息,不支持二進位消息"); throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } // 返回應答消息 String request = ((TextWebSocketFrame) frame).text(); logger.debug("服務端收到:" + request); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 群發 ChannelSupervise.send2All(tws); // 返回【誰發的發給誰】 // ctx.channel().writeAndFlush(tws); } /** * 唯一的一次http請求,用於創建websocket * */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { //要求Upgrade為websocket,過濾掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,則創建BAD_REQUEST的req,返回給客戶端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:8081/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * 拒絕不合法的請求,並返回錯誤信息 * */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,關閉連接 if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } }}
執行流程是:
當有客戶端連接時候會被channelActive監聽到,當斷開時會被channelInactive監聽到,一般在這兩個方法中去保存/移除客戶端的通道信息,而通道信息保存在ChannelSupervise中:
public class ChannelSupervise { private static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap(); public static void addChannel(Channel channel){ GlobalGroup.add(channel); ChannelMap.put(channel.id().asShortText(),channel.id()); } public static void removeChannel(Channel channel){ GlobalGroup.remove(channel); ChannelMap.remove(channel.id().asShortText()); } public static Channel findChannel(String id){ return GlobalGroup.find(ChannelMap.get(id)); } public static void send2All(TextWebSocketFrame tws){ GlobalGroup.writeAndFlush(tws); }}
ChannelGroup是netty提供用於管理web於伺服器建立的通道channel的,其本質是一個高度封裝的set集合,在伺服器廣播消息時,可以直接通過它的writeAndFlush將消息發送給集合中的所有通道中去。但在查找某一個客戶端的通道時候比較坑爹,必須通過channelId對象去查找,而channelId不能人為創建,所有必須通過map將channelId的字符串和channel保存起來。
感謝看到最後的朋友,都看到最後了,點個讚再走啊,如有不對之處還請多多指正。