1.準備
使用 IDEA 創建 Netty 項目
說明: 創建 Maven 項目,並引入 Netty 包
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.22.Final</version> </dependency>
碼代碼之前,首先看看我的目錄結構,有個心理準備,很簡單就幾個類O(∩_∩)O哈哈~
2.創建NettyServer.java類,上面有注釋幫助理解,代碼如下:
public class NettyServer { public static void main(String[] args) throws Exception { //創建BossGroup 和 WorkerGroup //說明 //1. 創建兩個線程組 bossGroup 和 workerGroup //2. bossGroup 只是處理連接請求 , 真正的和客戶端業務處理,會交給 workerGroup完成 //3. 兩個都是無限循環 //4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數 // 默認實際 cpu核數 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //創建伺服器端的啟動對象,配置參數 Serverbootstrap bootstrap = new ServerBootstrap(); //使用鏈式編程來進行設置 bootstrap.group(bossGroup, workerGroup) //設置兩個線程組 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作為伺服器的通道實現 .option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列得到連接個數 .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態// .handler(null) // 該 handler對應 bossGroup , childHandler 對應 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//創建一個通道初始化對象(匿名對象) //給pipeline 設置處理器 @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客戶socketchannel hashcode=" + ch.hashCode()); //可以使用一個集合管理 SocketChannel, 再推送消息時,可以將業務加入到各個channel 對應的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue ch.pipeline().addLast(new NettyServerHandler()); } }); // 給我們的workerGroup 的 EventLoop 對應的管道設置處理器 System.out.println(".....伺服器 is ready..."); //綁定一個埠並且同步, 生成了一個 ChannelFuture 對象 //啟動伺服器(並綁定埠) ChannelFuture cf = bootstrap.bind(6668).sync(); //給cf 註冊監聽器,監控我們關心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("監聽埠 6668 成功"); } else { System.out.println("監聽埠 6668 失敗"); } } }); //對關閉通道進行監聽 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
3.創建名為NettyServerHandler的一個Handler,可以理解為用於處理實際業務的實際執行者
註:
1. 我們自定義一個Handler 需要繼續netty 規定好的某個HandlerAdapter(規範)
2. 這時我們自定義一個Handler , 才能稱為一個handler
代碼如下:
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //讀取數據實際(這裡我們可以讀取客戶端發送的消息) /* 1. ChannelHandlerContext ctx:上下文對象, 含有 管道pipeline , 通道channel, 地址 2. Object msg: 就是客戶端發送的數據 默認Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/* //比如這裡我們有一個非常耗時長的業務-> 異步執行 -> 提交該channel 對應的 //NIOEventLoop 的 taskQueue中, //解決方案1 用戶程序自定義的普通任務 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵2", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("發生異常" + ex.getMessage()); } } }); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵3", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("發生異常" + ex.getMessage()); } } }); //解決方案2 : 用戶自定義定時任務 -》 該任務是提交到 scheduleTaskQueue中 ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵4", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("發生異常" + ex.getMessage()); } } }, 5, TimeUnit.SECONDS); System.out.println("go on ...");*/ System.out.println("伺服器讀取線程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的關係"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向連結, 出站入站 //將 msg 轉成一個 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + channel.remoteAddress()); } //數據讀取完畢 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //將數據寫入到緩存,並刷新 //一般講,我們對這個發送的數據進行編碼 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8)); } //處理異常, 一般是需要關閉通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}
4.Netty客戶端NettyClient
public class NettyClient { public static void main(String[] args) throws Exception { //客戶端需要一個事件循環組 EventLoopGroup group = new NioEventLoopGroup(); try { //創建客戶端啟動對象 //注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //設置相關參數 bootstrap.group(group) //設置線程組 .channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器 } }); System.out.println("客戶端 ok.."); //啟動客戶端去連接伺服器端 //關於 ChannelFuture 要分析,涉及到netty的異步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //給關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } }
5.創建NettyClientHandler,這裡和前面NettyServerHandler作用一樣,也需要繼承ChannelInboundHandlerAdapter ,不然它就不能被netty認為是一個Handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter { //當通道就緒就會觸發該方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8)); } //當通道有讀取事件時,會觸發 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("伺服器回復的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("伺服器的地址: "+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
到這裡Netty的TCP服務就結束了,有了這個案例,然後大家可以結合我上一篇發的