高能預警,本文是我一個月前就開始寫的,所以內容會非常長,當然也非常硬核,dubbo源碼系列結束之後我就想著寫一下netty系列的,但是netty的源碼概念又非常多,所以才寫到了現在。
我相信90%的讀者都不會一口氣看完的,因為實在太長了,長到我現在頂配的mbp打字編輯框都是卡的,但是我希望大家日後想看netty或者在面試前需要了解的朋友回頭翻一下就夠了,那我寫這個文章的意義也就有了。
也不多BB,直接開整。
NIO 基本概念阻塞(Block)與非阻塞(Non-Block)阻塞和非阻塞是進程在訪問數據的時候,數據是否準備就緒的一種處理方式,當數據沒有準備的時候。
阻塞:往往需要等待緩衝區中的數據準備好過後才處理其他的事情,否則一直等待在那裡。
非阻塞:當我們的進程訪問我們的數據緩衝區的時候,如果數據沒有準備好則直接返回,不會等待。如果數據已經準備好,也直接返回。
阻塞 IO :
非阻塞 IO :
同步(Synchronous)與異步(Asynchronous)同步和異步都是基於應用程式和作業系統處理 IO 事件所採用的方式。比如
**同步:**是應用程式要直接參與 IO 讀寫的操作。
**異步:**所有的 IO 讀寫交給作業系統去處理,應用程式只需要等待通知。
同步方式在處理 IO 事件的時候,必須阻塞在某個方法上面等待我們的 IO 事件完成(阻塞 IO 事件或者通過輪詢 IO事件的方式),對於異步來說,所有的 IO 讀寫都交給了作業系統。這個時候,我們可以去做其他的事情,並不需要去完成真正的 IO 操作,當操作完成 IO 後,會給我們的應用程式一個通知。
所以異步相比較於同步帶來的直接好處就是在我們處理IO數據的時候,異步的方式我們可以把這部分等待所消耗的資源用於處理其他事務,提升我們服務自身的性能。
同步 IO :
異步 IO :
Java BIO與NIO對比BIO(傳統IO):BIO是一個同步並阻塞的IO模式,傳統的 java.io 包,它基於流模型實現,提供了我們最熟知的一些 IO 功能,比如File抽象、輸入輸出流等。交互方式是同步、阻塞的方式,也就是說,在讀取輸入流或者寫入輸出流時,在讀、寫動作完成之前,線程會一直阻塞在那裡,它們之間的調用是可靠的線性順序。
NIO(Non-blocking/New I/O)NIO 是一種同步非阻塞的 I/O 模型,於 Java 1.4 中引入,對應 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解為 Non-blocking,不單純是 New。它支持面向緩衝的,基於通道的 I/O 操作方法。NIO 提供了與傳統 BIO 模型中的 Socket 和 ServerSocket 相對應的 SocketChannel 和 ServerSocketChannel 兩種不同的套接字通道實現,兩種通道都支持阻塞和非阻塞兩種模式。對於高負載、高並發的(網絡)應用,應使用 NIO 的非阻塞模式來開發
BIO與NIO的對比IO模型BIONIO通信面向流面向緩衝處理阻塞 IO非阻塞 IO觸發無選擇器NIO 的 Server 通信的簡單模型:BIO 的 Server 通信的簡單模型:NIO的特點:讀寫非阻塞,節約資源:沒有可讀/可寫數據時,不會發生阻塞導致線程資源的浪費Reactor 模型單線程的 Reactor 模型多線程的 Reactor 模型多線程主從 Reactor 模型Netty 基礎概念Netty 簡介Netty 是一個 NIO 客戶端伺服器框架,可快速輕鬆地開發網絡應用程式,例如協議伺服器和客戶端。它極大地簡化和簡化了網絡編程,例如 TCP 和 UDP 套接字伺服器。
「快速簡便」並不意味著最終的應用程式將遭受可維護性或性能問題的困擾。Netty 經過精心設計,結合了許多協議(例如FTP,SMTP,HTTP 以及各種基於二進位和文本的舊式協議)的實施經驗。結果,Netty 成功地找到了一種無需妥協即可輕鬆實現開發,性能,穩定性和靈活性的方法。
Netty 執行流程Netty 核心組件ChannelChannel是 Java NIO 的一個基本構造。可以看作是傳入或傳出數據的載體。因此,它可以被打開或關閉,連接或者斷開連接。
EventLoop 與 EventLoopGroupEventLoop 定義了Netty的核心抽象,用來處理連接的生命周期中所發生的事件,在內部,將會為每個Channel分配一個EventLoop。
EventLoopGroup 是一個 EventLoop 池,包含很多的 EventLoop。
Netty 為每個 Channel 分配了一個 EventLoop,用於處理用戶連接請求、對用戶請求的處理等所有事件。EventLoop 本身只是一個線程驅動,在其生命周期內只會綁定一個線程,讓該線程處理一個 Channel 的所有 IO 事件。
一個 Channel 一旦與一個 EventLoop 相綁定,那麼在 Channel 的整個生命周期內是不能改變的。一個 EventLoop 可以與多個 Channel 綁定。即 Channel 與 EventLoop 的關係是 n:1,而 EventLoop 與線程的關係是 1:1。
ServerBootstrap 與 BootstrapBootstarp 和 ServerBootstrap 被稱為引導類,指對應用程式進行配置,並使他運行起來的過程。Netty處理引導的方式是使你的應用程式和網絡層相隔離。
Bootstrap 是客戶端的引導類,Bootstrap 在調用 bind()(連接UDP)和 connect()(連接TCP)方法時,會新創建一個 Channel,僅創建一個單獨的、沒有父 Channel 的 Channel 來實現所有的網絡交換。
ServerBootstrap 是服務端的引導類,ServerBootstarp 在調用 bind() 方法時會創建一個 ServerChannel 來接受來自客戶端的連接,並且該 ServerChannel 管理了多個子 Channel 用於同客戶端之間的通信。
ChannelHandler 與 ChannelPipelineChannelHandler 是對 Channel 中數據的處理器,這些處理器可以是系統本身定義好的編解碼器,也可以是用戶自定義的。這些處理器會被統一添加到一個 ChannelPipeline 的對象中,然後按照添加的順序對 Channel 中的數據進行依次處理。
ChannelFutureNetty 中所有的 I/O 操作都是異步的,即操作不會立即得到返回結果,所以 Netty 中定義了一個 ChannelFuture 對象作為這個異步操作的「代言人」,表示異步操作本身。如果想獲取到該異步操作的返回值,可以通過該異步操作對象的addListener() 方法為該異步操作添加監 NIO 網絡編程框架 Netty 聽器,為其註冊回調:當結果出來後馬上調用執行。
Netty 的異步編程模型都是建立在 Future 與回調概念之上的。
Netty 源碼閱讀源碼閱讀,最好可以再 Debug 的情況下進行,這樣更容易幫助理解,因此在分析 Netty 前的我準備一個客戶端和服務端的代碼。
Netty - Server 代碼public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new SomeSocketServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("伺服器已啟動。。。");
future.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
Server 端 Handler:public class DemoSocketServerHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
ctx.fireChannelActive();
TimeUnit.MILLISECONDS.sleep(500);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty - Client 代碼public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new DemoSocketClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
}
Client 端 Handler :public class DemoSocketClientHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(msg);
ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(5000);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
ctx.channel().writeAndFlush("from client:begin talking");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
NioEventLoopGroup 初始化分析首先根據 Server 服務端代碼,分析 NioEventLoopGroup 的初始化過程。而在分析 NioEventLoopGroup 之前,有必要簡單的說一說 NioEventLoopGroup 與 NioEventLoop ,方便後續源碼的理解。
NioEventLoop 源碼分析前了解NioEventLoop 的繼承體系
從 NioEventLoop 的繼承體系中可以看到,NioEventLoop 本身就是一個 Executor,並且還是一個 單線程的 Executor。Executor 必然擁有一個 execute(Runnable command) 的實現方法,而 NioEventLoop 的 execute() 實現方法在其父類 SingleThreadEventExecutor 中,找到具體代碼:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}這裡不細說,但是貼出這段代碼主要為了引出 startThread(); 這句代碼,在跟這句代碼會發現,它最終調用了 NioEventLoop 的一個成員 Executor 執行了當前成員的 execute() 方法。對應的成員 io.netty.util.concurrent.SingleThreadEventExecutor#executor
而 executor 成員的初始化也是在當前代碼執行時創建的匿名 Executor ,也就是執行到即新建並且執行當前 匿名 executr() 方法。
總結:
NioEventLoop 本身就是一個 Executor。NioEventLoop 內部封裝這一個新的線程 Executor 成員。NioEventLoop 有兩個 execute 方法,除了本身的 execute() 方法對應的還有成員屬性 Executor 對應的 execute() 方法。備註: 因為這裡出現了四個 Executor,為了區分,我們給其新的名稱:
NioEventLoop 本身 Executor:NioEventLoop
NioEventLoop 的成員 Executor:子 Executor
NioEventLoopGroup 本身 Executor :NioEventLoopGroup
NioEventLoopGroup 的構造參數 Executor :總Executor
NioEventLoopGroup 的繼承體系
看到繼承體系可以直接知道 NioEventLoopGroup 也是一個 Executor,並且是一個線程池的 Executor,所以他也有 execute() 方法。對應的實現再其父類之中:io.netty.util.concurrent.AbstractEventExecutorGroup#execute
而這裡還需要說到的一點是:在 NioEventLoopGroup 的構造中,再其父類 MultithreadEventExecutorGroup 的構造再次引入了一個新的 Executor,
之所以這裡提到這個 Executor,是因為這個 Executor 是對應的 execute() 就是在 NioEventLoop 中的成員 Executor 的 execute() 執行時調用的。也就是下面對應的代碼調用。io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)
到這如果不明白,沒關係,因為只是為了引入 NioEventLoopGroup 和 NioEventLoop 的對應的兩個 Executor,和兩個 Executor 對應的兩個 execute() 方法。這個後面還會有詳細分析。
總結:
NioEventLoopGroup 是一個線程池線程 Executor。NioEventLoopGroup 也封裝了一個線程 Executor。NioEventLoopGroup 也有兩個 execute()方法。NioEventLoopGroup 初始化代碼分析上面說了基本的了解內容,下面具體分析,從 NioEventLoopGroup 的初始化進入源碼分析。
入口我們直接找 NioEventLoopGroup 的無參構造。
public NioEventLoopGroup() {
this(0);
}public NioEventLoopGroup(int nThreads) {
// 第二個參數是這個group所包含的executor
this(nThreads, (Executor) null);
}public NioEventLoopGroup(int nThreads, Executor executor) {
// 第三個參數是provider,其用於提供selector及selectable的channel,
// 這個provider是當前JVM中唯一的一個單例的provider
this(nThreads, executor, SelectorProvider.provider());
}public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 第四個參數是一個選擇策略工廠實例
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 第三個參數是選擇器工廠實例
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}跟到此,可以發現無參構造的基本參數被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默認當前CPU邏輯核心數的兩倍,selectorProvide:SelectorProvider.provider()//當前JVM中唯一的一個單例的provider,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默認選擇策略工廠實例,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//選擇器工廠實例。到這裡只是基本的初始化參數,重點方法為MultithreadEventExecutorGroup 的構造方法。下面重點分析:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 這個executor是group所包含的executor,其將來會為其所包含的每個eventLoop創建一個線程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 創建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在創建這些eventLoop過程中,只要有一個創建失敗,則關閉之前所有已經創建好的eventLoop
if (!success) {
// 關閉之前所有已經創建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 終止所有eventLoop上所執行的任務
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 創建一個選擇器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}根據無參構造直接往下跟,可以看到核心部分在最後一個父類的構造裡。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)。
再這裡完成整個 NioEventLoopGroup 的實例初始化,這裡分析下,然後再畫個圖回顧下。
初始化構造參數中的 Executor 參數,當其為空時,將其初始化
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());首先 newDefaultThreadFactory()) 創建默認的線程工廠,有興趣可以跟進去看看。然後再創建ThreadPerTaskExecutor線程 Executor 對象。(PS:這裡創建的 Executor 就是 NioEventLoopGroup 內的 Executor 對象,並不是當前 NioEventLoopGroup 自身,可以稱其為 總 Executor)。
然後可以看到這裡創建了一個 children 數組,根據需要創建的線程數創建對應數量的數組。
children = new EventExecutor[nThreads];因為每個 NioEventLoopGroup 都是 NioEventLoop 的集合,所以這裡的 children 數組就是當前 NioEventLoopGroup 的 NioEventLoop。所以 NioEventLoop 的創建的實在 NioEventLoopGroup 初始化的時候。下面看 NioEventLoop 的初始化:
// 逐個創建nioEventLoop實例
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 創建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在創建這些eventLoop過程中,只要有一個創建失敗,則關閉之前所有已經創建好的eventLoop
if (!success) {
// 閉之前所有已經創建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 終止所有eventLoop上所執行的任務
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}先整體看這段 NioEventLoop 的創建代碼,可以看到整個過程中存在一個成功標誌,catch 每個 NioEventLoop 創建完成過程,如果發生異常則將所有已經創建的 NioEventLoop 關閉。重點的代碼也就在 NioEventLoop 的創建了。所以我們繼續跟:children[i] = newChild(executor, args);往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild ,因為當前是 NioEventLoopGroup 的創建,所以知道找到子類的 newChild 實現。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}又將之前合併的 args 參數強轉回來,繼續跟進 NioEventLoop 構造:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 創建一個selector的二元組
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}這裡我們先整體看下,將之前的默認參數初始化到 NioEventLoop 屬性中。其中有兩處:openSelector() 和 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)。這裡先看父類構造:
往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,這些也可以在 NioEventLoop 的繼承體系可以看到:
// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 創建一個收尾隊列
tailTasks = newTaskQueue(maxPendingTasks);
}
// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 這是當前NioEventLoop所包含的executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 創建一個任務隊列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}這裡首先創建的是 SingleThreadEventExecutor ,這裡重點需要關注的代碼是:
this.executor = ThreadExecutorMap.apply(executor, this);這裡this 是 NioEventLoop ,所以this.executor就是前面說的 NioEventLoop 裡的 Executor,這裡我們先稱為 子 Executor(子:對應的就是 NioEventLoop ,前面說的 總:對應的是 NioEventLoopGroup )。
而這裡 子 Executor 的初始化是由一個 executor 參數的,這個就是前面 NioEventLoopGroup 構造方法一直帶入的 總 Executor。那我們繼續往下跟,看看這個子 Executor 是如何完成的初始化的。
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 這裡創建的executor是子executor
return new Executor() {
// 這個execute()是子executor的execute()
@Override
public void execute(final Runnable command) {
// 這裡調用了NioEventLoopGroup所包含的executor的execute()
// 即調用了「總的executor」的execute()
executor.execute(apply(command, eventExecutor));
}
};
}這段代碼細看就會明白,這裡創建的 子 Executor的創建也就是一個線程的創建,但是重點卻在這個線程 Executor 的 execute()方法實現,只做了一件事情:就是調用 傳入的 總 Executor 的 execute()方法。所以這裡 子 Executor 做的事情就是調用 總 Executor 的 execute()。不要覺得這裡繞,因為這還只是初始化,後面這裡執行會更繞。[手動捂臉哭]
其實這裡的 apply(command, eventExecutor),這裡再執行 總 Executor 的 execute() 時還是會記錄當前正在執行的線程,並且再執行完成時將當前記錄值刪除。
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}這裡再 NioEventLoop 的屬性 Executor 創建完成時,又去創建了一個普通任務隊列taskQueue = newTaskQueue(this.maxPendingTasks);並且還創建了一個收尾任務隊列tailTasks = newTaskQueue(maxPendingTasks);。這幾個隊列後面會說到。這裡繼續跟 NioEventLoop 主流程初始化。
到這我們再回去看看 openSelector(),這裡我們要先知道 SelectorTuple :
private static final class SelectorTuple {
final Selector unwrappedSelector; // NIO原生selector
final Selector selector; // 優化過的selector
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}SelectorTuple 只是一個包含兩個 Selector 的內部類,用於封裝優化前後的 Selector。而 openSelector() 方法就是為了返回 Selector 並且根據配置判斷是否需要優化當前 Selector 。下面看具體代碼:
而具體的優化過程有興趣的可以自己去看看,這裡只要知道,若是禁用了優化則 SelectorTuple 的優化後的 Selector 和為優化的 Selector 均為 Nio 原生的 Selector。
而這io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)後面還有在 NioEventLoop 數組創建完成後,還有選擇器創建和關閉監聽器綁定等,感興趣可以自己看看,這裡不再介紹。
到這一個 NioEventLoop 的創建過程的代碼也全部看完了。我想如果只看這個肯定還是有點懵,源碼這個東西需要自己跟進去去看,debug 一點點的跟,跟著運行的代碼去想為何這麼實現,不過這裡我也畫個圖,讓大家更直觀的了解到 NioEventLoopGroup 的創建流程以及主要操作。
我想大家結合這個圖,再結合上面的分析過程,最好可以自己找到源碼,跟一遍,應該可以理解 NioEvnetLoopGroup 的創建。
ServerBootstrap與 ServerBootstrap 屬性配置分析繼承體系:
入口代碼:
//2.創建服務端啟動引導/輔助類:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.給引導類配置兩大線程組,確定了線程模型
b.group(bossGroup, workerGroup)
// (非必備)列印日誌
.handler(new LoggingHandler(LogLevel.INFO))
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定義客戶端消息的業務處理邏輯
p.addLast(new HelloServerHandler());
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SomeSocketClientHandler());
}
});ServerBootstrap與 Bootstrap 都是啟動配置類,唯一不同的是,ServerBootstrap是服務端的啟動配置類,Bootstrap 則是客戶端的啟動配置類,主要用於綁定我們創建的 EventLoopGroup,指定 Channel 的類型以及綁定 Channel 處理器等操作,主要做的都是給 ServerBootstrap與 Bootstrap 的屬性賦值操作,所以稱其為配置類。可以進入 group() 方法裡看一眼:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}其他的方法也是一樣,感興趣可以自己進去看看。這裡只是初始化,都是為了後面的操作做準備。
服務端 bind 方法 ServerBootstrap.bind() 源碼解析這裡我們從這裡進入:
b.bind(port).sync();直接從 bind()方法跟進去:
// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// 繼續跟進
public ChannelFuture bind(SocketAddress localAddress) {
// 驗證group與channelFactory是否為null
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 這裡是一處重點邏輯
return doBind(localAddress);
}這裡顯示校驗了 Bootstrap 的 group 與 channelFactory 是否綁定成功。然後繼續跟進 doBind() 方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 創建、初始化channel,並將其註冊到selector,返回一個異步結果
final ChannelFuture regFuture = initAndRegister();
// 從異步結果中獲取channel
final Channel channel = regFuture.channel();
// 若異步操作執行過程中出現了異常,則直接返回異步對象(直接結束)
if (regFuture.cause() != null) {
return regFuture;
}
// 處理異步操作完成的情況(可能是正常結束,或發生異常,或任務取消,這些情況都屬於有結果的情況)
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 綁定指定的埠
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 處理異步操作尚未有結果的情況
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 為異步操作添加監聽
regFuture.addListener(new ChannelFutureListener() {
// 若異步操作具有了結果(即完成),則觸發該方法的執行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) { // 異步操作執行過程中出現了問題
promise.setFailure(cause);
} else { // 異步操作正常結果
promise.registered();
// 綁定指定的埠
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}首先再這裡,我們先把這個方法整體的邏輯搞清楚,然後再再去研究他的每一步具體的操作,畫個圖,先理解這個方法做了什麼:
可以在圖中結合代碼,找到整個 dobind() 的大局處理思路,然後呢,到這裡我們還有很多重點細節需要繼續跟進,也就是圖中標記的 Tag 1、Tag 2。為了方便後面跟進去代碼之後方便回來,這裡以此標記,然後下面在具體分析 Tag 標記的源碼:
補充 Tag 0 :
ChannelPromise 與 ChannelFuture 了解。
Tag 1 :
異步創建、初始化channel,並將其註冊到selector
final ChannelFuture regFuture = initAndRegister();
Tag 2 :
綁定指定的埠號:
doBind0(regFuture, channel, localAddress, promise);
補充 Tag 0:ChannelPromise 與 ChannelFutureChannelPromise 是一個特殊的 ChannelFuture,是一個可修改的 ChannelFuture。內部提供了修改當前 Future 狀態的方法。在 ChannelFuture 的基礎上實現了設置最終狀態的修改方法。
而 ChannelFuture 只可以查詢當前異步操作的結果,不可以修改當前異步結果的 Future 。這裡需要知道的就是 ChannelPromise 可以修改當前異步結果的狀態,並且在修改狀態是會觸發監聽器。在 doBind 方法中主要用於在處理異步執行一直未結束的的操作,將異步結果存在異常的時,將異常賦值給 ChannelPromise 並返回。
Tag 1 :initAndRegister() 初始化並註冊 Channel先找到代碼:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 創建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將channel註冊到selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}嗯?!代碼意一看,咋就這麼點,也就做了三件事,可是這三件事做的每一個都不是一句代碼的可以完成的。這裡我們一個一個分析,除了這三件事情,其他的也就是異常後的處理邏輯,所以主流程就是下面的三句代碼,也為了跟進繼續打上標記吧:
Tag 1.1 創建channelchannel = channelFactory.newChannel();
Tag 1.2 初始化channelinit(channel);
Tag 1.3 將channel註冊到selectorChannelFuture regFuture = config().group().register(channel);
針對這三處,還是要一處一處分析。
Tag 1.1 channelFactory.newChannel() 創建 Channel找到對應的代碼:io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
// 調用無參構造器創建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}這裡為什麼直接找到 ReflectiveChannelFactory ,需要提一下,在分析 ServerBootstrap與 Bootstrap 啟動配置類的時候,設置 channel 的方法,跟進去可以找到針對屬性 channelFactory 的賦值代碼:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}可以看到這裡 new 的就是 ReflectiveChannelFactory 工廠類,然後再看 ReflectiveChannelFactory 的構造:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 將NioServerSocketChannel的無參構造器初始化到constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}看到的是 ReflectiveChannelFactory 在創建時初始化了 constructor 屬性,將傳入的 channel 類 clazz 中獲取構造賦值給了 ReflectiveChannelFactory 反射工廠的 constructor 屬性。
而我們再 Server 端傳入的 channel 類為NioServerSocketChannel.class ,所以上面看的 constructor.newInstance(); 對應的也就是 NioServerSocketChannel 的無參構造。這樣我們就繼續跟進 NioServerSocketChannel :
// NIO中的provider,其用於創建selector與channel。並且是單例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
// DEFAULT_SELECTOR_PROVIDER 靜態變量
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}繼續跟進 newSocket() :
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 創建NIO原生的channel => ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}就是返回了一個 Java NIO 原生的 Channel,最後將 NIO 原生的Channel 包裝成 NioServerSocketChannel,繼續跟進 this(newSocket(DEFAULT_SELECTOR_PROVIDER)) 找到有參構造具體代碼:
public NioServerSocketChannel(ServerSocketChannel channel) {
// 參數1:父channel
// 參數2:NIO原生channel
// 參數3:指定當前channel所關注的事件為 接受連接
super(null, channel, SelectionKey.OP_ACCEPT);
// 用於對channel進行配置的屬性集合
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}這裡主要做了兩件事情,1. 調用父類構造,2. 對 channel 進行配置屬性集合。
這裡先說下 new NioServerSocketChannelConfig(),這部操作就是給當前 Channel 的 config 進行賦值,用來保存當前 Channel 的屬性配置的集合。好了,這個說了我們繼續跟主線:super(null, channel, SelectionKey.OP_ACCEPT)
// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// 這裡的this.ch為NIO原生channel
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// NIO,非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}直接找到 AbstractNioChannel 父類構造,這也第一步也是調用父類構造 super(parent); 先記著,先看除了調用父類構造還做了什麼事情:
將前面創建的原生 Channel 複製給屬性保存 this.ch = ch;當前 channel 的關注事件屬性賦值 this.readInterestOp = readInterestOp; // SelectionKey.OP_ACCEPT 接受事件將 NIO 原生 Channel 設置為非阻塞 ch.configureBlocking(false);在 AbstractNioChannel 構造中就做了這麼四件事情,主要需要說的還是其調用父類構造又做了什麼事情,找到代碼:
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 為channel生成id,由五部分構成
id = newId();
// 生成一個底層操作對象unsafe
unsafe = newUnsafe();
// 創建與這個channel相綁定的channelPipeline
pipeline = newChannelPipeline();
}在 AbstractChannel 構造中主要做了三件事:
為當前 Channel 生成 id newId(),感興趣可以跟進去看看。生成一個底層操作對象 unsafe,用於 I/O 線程調用傳輸時使用,用戶代碼無法調用。newUnsafe()創建與這個channel相綁定的channelPipeline,這也是一個重點操作,不過在這裡先不展開細說,後面會單獨細跟 channelPipeline 的代碼。所以到此 **Tag 1 : initAndRegister() ** 中的 **Tag 1.1 newChannel() ** 創建 Channel 才算跟完。針對 Tag 1.1 newChannel() 我們也畫圖簡圖整理下思路:
根據圖,在結合上面代碼的分析,最好自己再可以跟一遍代碼,我想這一塊的理解還是沒什麼問題的。到這也只是創建了 Channel。Tag 1.1 的 Channel 創建結束,接著跟進 Tag 1.2 init(channel).
Tag 1.2 init(channel) 初始化 Channel這裡我們是從 ServerBootstrap 中的doBind 進入的,所以這裡直接找到 io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
// 獲取serverBootstrap中的options屬性
final Map<ChannelOption<?>, Object> options = options0();
// 將options屬性設置到channel
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 獲取serverBootstrap中的attrs屬性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
// 遍歷attrs屬性
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
// 將當前遍歷的attr初始化到channel
channel.attr(key).set(e.getValue());
}
}
// 獲取channel的pipeline
ChannelPipeline p = channel.pipeline();
// 將serverBootstrap中所有以child開頭的屬性寫入到局部變量,
// 然後將它們初始化到childChannel中
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 將ServerBootstrapAcceptor處理器添加到pipeline
// ServerBootstrapAcceptor處理器用於接收ServerBootstrap中的屬性值,
// 我們通常稱其為連接處理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}這裡做的事情還是很多的,基本操作我在上面注釋上也標註出來,還有一些需要繼續跟下去的主要操作,還是先標記 Tag 然後繼續跟下去。這裡說一下這裡的 options 與 attrs 屬性的賦值,其實就是講我們 ServerBootstrap 與 Bootstrap 在調用 doBind() 之前通過 option() 與 attr() 設置的參數值,其中 options 屬性設置到了 Channel 的 config 屬性中,attrs 是直接被設置在了 Channel 上的。
在設置完 options 屬性與 attrs 屬性時,接著獲取了當前 channel 的 pipeline,接下來還是獲取我們在 doBind() 之前設置的屬性值,以 child 開頭的方法 childOption() 與 childAttr() 設置的屬性值。
這裡使用局部變量記錄了所有 Child 相關的值 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs 主要用於初始化 childChannel 的屬性,new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)) 主要是創建 連接處理器。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 將ServerBootstrapAcceptor處理器添加到pipeline
// ServerBootstrapAcceptor處理器用於接收ServerBootstrap中的屬性值,
// 我們通常稱其為連接處理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});首先這裡想做的事情是:將當前 channel 的 pipeline 中綁定一個初始化處理器 ChannelInitializer ,因為是抽象類,所以需要匿名實現 initChannel方法。而這些主要的操作是處理 childGroup 裡面的 channel 的初始化操作。這裡我只想主要講一下這個連接處理器 ServerBootstrapAcceptor 主要做了什麼,其他的具體會在後面的 handler 和 pipeline 的時候細說。
**補充:**這裡因為 ServerBootstrap 服務端是對用的有兩個 EventLoopGroup,在服務端,parentGroup 是用於接收客戶端的連接,在 parentGroup 接收到連接之後是將只是將當前轉給了 childGroup去處理後續操作,而 childGroup 是用來專門處理連接後的操作的,不關心 channel 的連接任務。這個其實就是 Netty-Server 的 Reactor 線程池模型的處理邏輯。
這裡主要往下說一下這個連接處理器:ServerBootstrapAcceptor 。
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}ServerBootstrapAcceptor 構造只是將 ServerBootstrap 中配置的 Child 屬性設置保存下來。而這裡一直說這是連接處理器,是因為當客戶端連接發送到服務端時,這個處理器會接收客戶端的連接並處理。主要是處理方法是 channelRead 中的實現:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg為客戶端發送來的數據,其為NioSocketChannel,即子channel,childChannel
final Channel child = (Channel) msg;
// 將來自於ServerBootstrap的child開頭屬性初始化到childChannel中(childHandler、childOptions、childAttrs)
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 將childChannel註冊到selector 需要注意的是,這裡的selector與父channel所註冊的selector不是同一個
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}這裡主要就做了兩件事情:
將成功從 client 連接過來的 channel 註冊到 selector 上。這裡一直說子channel,就是因為這裡註冊的是兩個 EventLoopGroup,在 Server 端的處理上 netty 線程模型採用「服務端監聽線程」和「IO線程」分離的方式。所以這裡 channelRead 方法就是在 client 端請求連接到 server 端時,用於將當前連接的 IO 線程綁定到 childChannel 同時註冊到 ChildGroup 中的 Selector 中。線程,模型可以參考下面的圖:
好了,到這裡 **Tag 1.2 initChannel ** 代碼也分析完了,有些關於 pipeline 、handler、selector 的部分沒有細說因為後面會單獨說,在這裡沒有直接展開。
這裡也畫個圖:到時候將這些圖在整合到一起,現在是的分析過程就像是化整為零,最後在整合到一起化零為整。
這裡除了 init(channel) 方法之外,還主要說了下 ServerBootstrapAcceptor 連接處理器。其實主要是 netty-server 的線程模型與代碼的結合理解。
Tag 1.3 config().group().register(channel) 將channel註冊到selectorchannle 註冊到 Selector 的代碼分析:
//
config().group().register(channel);config().group() :這裡就是 Bootstrap 的 EventLoopGroup,而這裡是 Server 端的 ServerBootstrap 所以這個其實就是 parentGroup。那這裡我們需要找到 register 的方法實現:
這裡因為 group 是 NioEventLoopGroup,根據 NioEventLoopGroup 的繼承體系就可以直接找到 實現 io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)。因為只有 MultithreadEventLoopGroup 在其繼承體系中。所以找到代碼我們繼續:
public ChannelFuture register(Channel channel) {
// next() 從eventLoop數組中選擇一個eventLoop
return next().register(channel);
}這裡需要了解下 next() 方法,因為我們現在是 eventLoopGroup next() 就是從當前 group 中獲取一個 EventLoop,然後這裡在繼續跟進需要找 EventLoop 繼承體系中實現 register 方法的類:SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
// 創建一個 ChannelPromise 然後註冊
return register(new DefaultChannelPromise(channel, this));
}
// ----> 這裡繼續調用 unsafe 的 register
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}這裡調用的 unsafe 的 register 方法,在初始化 eventLoop 的時候說過這個 unsafe 的初始化。是我們直接跟進:
io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 對異常情況的處理
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// channel與eventLoop的綁定就發生在這裡,
// 需要注意,這裡的eventLoop還沒有綁定線程,因為這個線程還沒有創建
AbstractChannel.this.eventLoop = eventLoop;
// 判斷當前線程與eventLoop所綁定線程是否是同一個線程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 執行當前線程所綁定的eventLoop的execute(), 這個execute()會將參數任務寫入到任務隊列,並創建啟動新的線程
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}AbstractUnsafe#register:
將 eventLoop 保存到 channel 的 eventLoop 的屬性中(channel 與 eventLoop 的綁定),注意:這裡的 eventLoop 裡面還沒有綁定 thread。判斷 EventLoop 的 thread 是否是當前線程:eventLoop.inEventLoop()。這裡斷點看一下,初始化的時候這裡 eventLoop 中的 thread = null。所以這裡返回 false。執行當前線程綁定的 eventLoop 的 excute() 方法。執行傳入的 runnable ,主要是做的是將參數任務寫入到任務隊列,並創建啟動新的線程runnable 中的 run 方法實現:register0(promise);這裡標記三個 Tag
Tag 1.3.1 register0(promise) 也就是上面的第 5 步。
Tag 1.3.2 eventLoop.excute() 執行分析 也就是上面的第 4 步。
**Tag 1.3.3 eventLoop.excute() 的 run 方法執行分析 ** 也就是 Tag 1.3.2 最後執行起來的 run 方法
Tag 1.3.1 register0(promise)直接跟進 register0(promise);
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 綁定
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}這裡其他操作感興趣的自己可以進去看看,這邊我們主要看 register 流程,直接找 doRegister(); 的綁定代碼:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 在這裡進行了註冊,將NIO原生channel註冊到了NIO原生selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}這裡就是 channel 註冊 Selector 的代碼:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
javaChannel() :這裡獲取原生的 Nio Channel,跟進去可以找到這裡返回的是 AbstractNioChannel#ch 的 channel。在前面 NioEventGroup 初始化的時候說過這個 NIO Channel 的初始化的過程。然後調用 NIO Channel 的 Regsiter 方法Regsiter 方法中首先傳入的是 unwrappedSelector 前面初始化的 selector 數組。第二個參數 0 ,就是當前監聽的的事件, 0 表示不關注任何事件。為什麼這裡子 Channel 註冊的是不關注任何事件?在前面看到的 Channel 註冊一個指定的關注事件:SelectionKey.OP_ACCEPT 連接事件,那個 channel 是 Netty 封裝的 channel,哪裡監聽了連接事件之後,只要關注客戶端的連接,當 netty 封裝的 channel 獲取到連接就緒的 channel 的時候就可以拿到當前 channel 需要註冊事件了,然後這個時候就可以指定 原生 NIO channel 的需要關注的事件。所以這裡默認不關注任何事件就是為後續修改其需要關注指定類型的就緒事件。到這裡 register0 的方法說完。前面還有 EventLoop 的線程 thread 的事情沒有說明白,也就是 eventLoop 的 excute() 方法執行過程做了什麼,返回去找到代碼:io.netty.channel.AbstractChannel.AbstractUnsafe#register
Tag 1.3.2 eventLoop.excute() 執行分析前面還有 EventLoop 的線程 thread 的事情沒有說明白,也就是 eventLoop 的 excute() 方法執行過程做了什麼,返回去找到代碼:io.netty.channel.AbstractChannel.AbstractUnsafe#register
// 剛剛往裡面跟的是 register0 現在再說一下 execute
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});往下跟,找到 io.netty.util.concurrent.SingleThreadEventExecutor#execute, eventLoop 的父類:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判斷當前線程與eventLoop所綁定線程是否是同一個
boolean inEventLoop = inEventLoop();
// 將任務添加到任務隊列
addTask(task);
if (!inEventLoop) {
// 創建並啟動一個線程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}首先這個參數:**Runnable task ** 其實就是剛剛我們跟過的方法 register0.
首先判斷當前 inEventLoop(); 當前線程是否是 EventLoop 中的 thtrad。這裡還是 false。將任務添加到任務隊列。跟下去可以找到 taskQueue.offer(task) ,這裡的 taskQueue 任務隊列就在跟創建 eventLoop 時 newChild 中初始化創建的.inEventLoop = false。首先做的是:startThread.繼續跟進:startThread()
private void startThread() {
// 若當前eventLoop所綁定線程尚未啟動
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
// 創建並啟動一個線程
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}首先判斷當前 eventLoop 所綁定線程尚未啟動,然後使用 CAS 修改當前線程的啟動狀態 ,修改成功則執行 doStartThread()創建並啟動一個線程,繼續跟:
private void doStartThread() {
assert thread == null;
// 調用NioEventLoop所包含的executor的execute()
// 這個execute()會創建並啟動一個線程
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 執行了一個不會停止的for,用於完成任務隊列中的任務
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略.
}
});
}這裡調用了 NioEventLoop 所包含的 executor 的 execute() 方法,也就是創建線程的邏輯,後面的具體執行邏輯,下一步部分具體看。傳入了一個 Runnable。主要是執行了一個 SingleThreadEventExecutor.this.run(); 線程,用於完成任務隊列的任務。後面說。這裡主要說一下這個 executor.execute()執行的過程。
這裡跟進下面代碼可以找到之前 子 Executor 的初始化創建的匿名內部類:io.netty.util.internal.ThreadExecutorMap#apply
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
// 這裡調用了NioEventLoopGroup所包含的executor的execute()
executor.execute(apply(command, eventExecutor));
}
};
}所以 execute 方法執行的是這裡的 execute方法:executor.execute(apply(command, eventExecutor));
而在 ThreadExecutorMap 這裡的 executor 之前在 NioEventLoopGroup 初始化的時候說了,這個 executor 是 NioEventLoopGroup 初始化過成功構造方法創建的 總 executor。然後 apply 方法又將傳入的 runnable 包裝成了一個新的 Runnable 。
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
// 做了線程隔離
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}只是在執行 command 之前做了異步線程隔離的操作。所以到這裡就是 總 executor 執行了傳入了新包裝的 runnable。然後我們繼續跟進這裡的 executor.execute(apply(command, eventExecutor)); execute 方法。這裡需要找到實現方法在這裡:io.netty.util.concurrent.ThreadPerTaskExecutor#execute
public void execute(Runnable command) {
// 創建並啟動一個線程
threadFactory.newThread(command).start();
}threadFactory 也是之前 NioEventLoopGroup 初始化的線程工廠。這裡主要用到這個 總 executor 裡面的線程工廠來創建線程來著。而這裡的 command 就是 apply() 返回的 runnable,也就是包裝後的 doStartThread 中的匿名內部類 runnable。所以這裡的線程 newThread(command).start() 的 start 就執行了commnnd 的 run 方法。最後就執行到 doStartThread 的裡面的 run 方法。
所以到這裡 EventLoop 中的 thread 的創建並且啟動就都這裡處理完成了。
這裡也畫個圖說下這個調用過程:
到這裡我們的 eventLoop.excute() 中的創建線程並啟動的流程看完了,那下面我們要單獨說一下這個線程啟動之後執行的 run 方法做了什麼。主要就是詳細說下 threadFactory.newThread(command).start(); 這個線程啟動執行執行的 run 代碼的解析。
Tag 1.3.3 eventLoop.excute() 的 run 方法執行分析入口還是從 eventLoop.excute() 中進去,也就是 eventLoop 的 run 方法執行分析,找到裡面的匿名內部類的 runnable 的實現:
// 對應的代碼位置 io.netty.channel.AbstractChannel.AbstractUnsafe#register
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});跟進 execute() 找到 startThread(); 在直接跟進 doStartThread()找到下面代碼:
private void doStartThread() {
assert thread == null;
// 調用NioEventLoop所包含的executor的execute() 這個execute()會創建並啟動一個線程
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 執行了一個不會停止的for,用於完成任務隊列中的任務
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略。。。這段代碼上面都跟過,這裡就跳過直接找我們要看的代碼:SingleThreadEventExecutor.this.run(); 這裡執行了一個無限循環的代碼,用來一直完成任務隊列中的任務:這找到實現代碼 eventLoop 中的 run() :
protected void run() {
for (;;) {
try {
try {
// 選擇就緒的channel
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // NioEventLoop不支持
continue;
case SelectStrategy.BUSY_WAIT: // NioEventLoop不支持
case SelectStrategy.SELECT: // SELECT = -1 能走到這裡,說明當前任務隊列中沒有任務
// 進行阻塞式選擇
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// 該變量用於設置「處理就緒channel的IO所使用的時間」與「處理任務隊列中任務使用時間」的比例 該值為整型,不大於100
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
// 記錄處理就緒channel的IO開始執行的時間點
final long ioStartTime = System.nanoTime();
try {
// 處理就緒channel的IO
processSelectedKeys();
} finally {
// 計算出處理就緒channel的IO所使用的時長
final long ioTime = System.nanoTime() - ioStartTime;
// 執行任務隊列中的任務
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
// 省略。。。
}
}首先我們整體上看一下這個方法。
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())Tag 1.3.3.1:selectStrategy.calculateStrategy
Tag 1.3.3.2 :switch - case
Tag 1.3.3.3 :processSelectedKeys()
Tag 1.3.3.4 :runAllTasks()
Tag 1.3.3.1:selectStrategy.calculateStrategy首先找到代碼:
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
// hasTasks tailTasks 收尾任務隊列
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
// super.hasTasks() taskQueue 普通任務隊列
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}首先看到 hasTask : 返回當前任務隊列和收尾隊列是否有任務。selectNowSupplier : 匿名內部類io.netty.util.IntSupplier
繼續跟進:calculateStrategy:io.netty.channel.DefaultSelectStrategy#calculateStrategy 初始化的默認選擇器
// SelectStrategy.SELECT = -1
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}這裡就是如果存在任務則走選擇器 selectSupplier.get() 否則直接返回 -1:SELECT 。
繼續跟 get 的任務選擇邏輯:selectSupplier : NioEventLoop 中的內部類 IntSupplier
public int get() throws Exception {
return selectNow();
}
// io.netty.channel.nio.NioEventLoop#selectNow
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}selector.selectNow() : 方法為 NIO 的非阻塞選擇,返回就緒的 channel 的數量,可以為 0。
補充:Selector 的阻塞選擇和非阻塞選擇的區別就是,非阻塞選則在當前 select 方法執行時判斷循環判斷所有的 channel 是否就緒並返回所有的就緒數量,而阻塞式選擇則是阻塞指定時間直至阻塞時間內獲取到就緒 channel 或者阻塞時間超時時立刻返回。
wakenUp.get() :返回當前線程是否被阻塞,沒有被阻塞時返回 true,當前線程被阻塞返回 false。
selector.wakeup() :當前線程如果被阻塞,則立刻返回 selector 結果,即喚醒當前線程。
這是 selectNow() 方法執行的結果,是一個必然大於等於 0 的結果。
所以返回 calculateStrategy 方法:如果任務隊列存在任務,則通過 Selector 執行非阻塞選擇返回就緒的 channel 數量,如果不存在任務,則直接返回 -1。
Tag 1.3.3.2 :switch - case現在在返回去看 switch - case 的代碼:
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // -2 NioEventLoop不支持
continue;
case SelectStrategy.BUSY_WAIT: // -3 NioEventLoop不支持
case SelectStrategy.SELECT: // -1 能走到這裡,說明當前任務隊列中沒有任務
// 進行阻塞式選擇
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}因為再 selectStrategy.calculateStrategy 方法中,不可能返回 -2 和 -3。所以 case 的結果只可能走到 SelectStrategy.SELECT 或者直接 default。而只有當所有任務隊列中都沒有任務的時候才會返回 -1。也就意味著當任務隊列中沒有任務時也會景行一次阻塞式選擇,通過 wakenUp.getAndSet(false) 方法將當前線程設置為阻塞狀態。然後就阻塞式 select。
這裡我們具體去看看這 select 阻塞選擇的邏輯:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 計數器:用於記錄空輪詢導致CPU佔用率飆升,select()提前結束的次數(其值大於1時)
int selectCnt = 0;
// 獲取當前時間,也就是for循環第一次開始執行的時間點
long currentTimeNanos = System.nanoTime();
// delayNanos() 表示定時任務隊列中第一個定時任務還有多久就到開始執行的時間了
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 處理小於0.5毫秒的任務
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { // 該條件為true,表示具有立即需要執行的定時任務
if (selectCnt == 0) { // 只有第一次for循環才會執行下面的「非阻塞選擇」
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 若有就緒的channel了,則直接結束
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 若當前線程被中斷
if (Thread.interrupted()) {
selectCnt = 1;
break;
}
// 獲取當前時間
long time = System.nanoTime();
// 下面的式子等價於: time - currentTimeNanos >= timeoutMillis
// 若下面的條件成立,則說明select()是在指定的阻塞時間過期後才跳出的,即正常結束的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt); // 重構selector
selectCnt = 1;
break;
}
// 本輪for循環結束時間點,同時也是下一輪for循環的開始時間點
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
}
}這個方法直接畫圖解釋把:
而在 switch-case 唯一的代碼邏輯也就是在任務隊列中沒有任務時執行的阻塞 select,而在其他的任何情況下或者阻塞選擇存在就緒 channel 或者任務隊列新增任務之後都會跳出 switch - case,執行後續邏輯。
Tag 1.3.3.3 :processSelectedKeys()首先我要說的時這個 processSelectedKeys 方法時處理就緒的 channel 的 IO,而代碼邏輯走到這裡其實並不一定就有已經就緒的 channel,因為看了上面的邏輯會發現代碼任務處理為先,而存在任務就會走到這裡邏輯,雖然在走到這裡之前也執行了 select 的 channel 但是也都是去查看一遍是否存在就緒 channel,所以這裡看下面的邏輯需要先有這個理解,最後我們再具體看 processSelectedKeys 代碼:
final int ioRatio = this.ioRatio; // 默認值 50
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// 記錄處理就緒channel的IO開始執行的時間點
final long ioStartTime = System.nanoTime();
try {
// 處理就緒channel的IO
processSelectedKeys();
} finally {
// 計算出處理就緒channel的IO所使用的時長
final long ioTime = System.nanoTime() - ioStartTime;
// 執行任務隊列中的任務
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}這段代碼的整體邏輯再我們看完 runAllTasks 之後再分析,這裡存在一個 io 處理與 task 處理的時間分配邏輯。後面再看,這裡繼續跟進 processSelectedKeys
private void processSelectedKeys() {
// 判斷channel的selectedKeys是否是優化過的
if (selectedKeys != null) {
processSelectedKeysOptimized(); // 優化處理方式
} else {
processSelectedKeysPlain(selector.selectedKeys()); // 普通處理方式
}
}到這裡,代碼有限判斷了當前是否啟用的 selectedKey 的優化,再 NioEventLoopGroup 的時候說過,優化就是將selectedKeys 的 set 集合轉換成了數組,而再這裡也可以得到驗證,selectedKeys直接產看這個屬性就可以看到,這裡不進去看了,感興趣進去看看。然後針對優化和非優化的處理唯一的區別就是處理的 selectedKeys對象是數組還是集合。這裡直接分析 processSelectedKeysOptimized 方法,processSelectedKeysPlain 方法可以自己看,一樣的處理。
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
// 從數組中取出一個元素
final SelectionKey k = selectedKeys.keys[i];
// 移除已經取出的 SelectionKey,使 GC 可以處理到已經關閉的 channel
selectedKeys.keys[i] = null;
// 獲取selectionKey的附件,該附件中可以存放任意數據,不過這裡存放的是NIO原生channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a); // 處理就緒事件
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task); // 這裡是測試代碼。跟進去可以看到實現方法是測試類
}
// 省略.這裡就是直接輪詢 selectedKeys 的集合,每取出一個 selectKey 都會在原數組中移除當前元素,就是為了當 channel 關閉後, GC 可以釋放當前 channel 佔用的內存。
然後獲取 selectKey 中保存的 Nio 原生的 channel,處理就緒後邏輯:processSelectedKey 繼續跟進:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 處理selectionKey失效的情況
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 判斷當前 channnel 就緒的事件類型
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 獲取當前selectionKey的interestOps
int ops = k.interestOps();
// 先將SelectionKey.OP_CONNECT按位取或,再與ops進行按位與
ops &= ~SelectionKey.OP_CONNECT;
// 將修改過的ops再寫入到selectionsKey中
k.interestOps(ops);
// 連接server
unsafe.finishConnect();
}
// 處理寫就緒的情況
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 強制刷新(將user buffer中的數據寫入到網關緩存)
ch.unsafe().forceFlush();
}
// readyOps為0表示當前沒有任何channel就緒
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 將網卡緩存中的數據寫入到user buffer
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}這段邏輯就是處理就緒 channel 的 IO 事件的邏輯。
判斷當前 SelectionKey 是否有效。失效結束處理並關閉資源。
判斷當前 channel 的關注事件,針對處理:獲取 SelectionKey 的 readyOps。這裡的判斷邏輯都是使用高效的位運算。readyOps 為當前 SelectionKey 的就緒的事件類型。
(readyOps & SelectionKey.OP_CONNECT) != 0 :連接就緒事件
這個事件在 server 端不會關注,只有 client 用來連接 server 時才會關注連接就緒事件。
連接就緒後,獲取當前 SelectionKey 的 interestOps 值,將當前 interestOps 值修改後,調用底層 unsafe 連接server
(readyOps & SelectionKey.OP_WRITE) != 0 :寫就緒事件
當前 channel 關注的是寫就緒事件,此時寫操作已經就緒,所以直接調用unsafe將數據寫入的網卡緩存。
(readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 :當前channel 關注的是讀就緒事件,或者前面因為有新增任務而觸發的就緒 channel 處理邏輯,只有因為任務觸發的情況下 readyOps 才可能會是 0 ,readyOps = 0 意味著沒有就緒 channel。
直接調用 unsafe 繼續讀操作,將網卡緩存的數據讀取到用戶空間。如果是 readyOps = 0 的情況相當於網卡緩存並沒有就緒數據,則時進行的讀操作不會讀取到數據。
這就是完整的 IO 處理邏輯,主要根據當前 channel 關注的事件進行相應的 unsafe 操作。
Tag 1.3.3.4 :runAllTasks()下面我們在看下 runAllTask 方法。
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
protected boolean runAllTasks(long timeoutNanos) {
// 從定時任務隊列中取出所有當前馬上就要到期的定時任務放入到任務隊列
fetchFromScheduledTaskQueue();
// 從任務隊列中取出一個任務
Runnable task = pollTask();
// 若該任務為空,則說明任務隊列中已經沒有任務了,此時就可以執行收尾任務了
if (task == null) {
// 執行收尾隊列中的收尾任務
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
// 計數器
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 執行任務
safeExecute(task);
runTasks ++;
// 每64個任務查看一次超時
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// 從任務隊列中再取出一個任務
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
} // end-for
// 處理收尾隊列中的任務
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}畫圖理解一下這個方法:
這裡面有幾個方法大家感興趣可以進去看看:這部分邏輯不複雜,大家可以自己研究下。
fetchFromScheduledTaskQueue()
io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue
從定時任務隊列中取出所有當前馬上就要到期的定時任務放入到任務隊列
pollTask()
io.netty.util.concurrent.SingleThreadEventExecutor#pollTask
從任務隊列中取出一個任務
afterRunningAllTasks()
io.netty.util.concurrent.SingleThreadEventExecutor#afterRunningAllTasks
執行收尾任務隊列中的所有收尾任務
safeExecute(task)
io.netty.util.concurrent.AbstractEventExecutor#safeExecute
執行任務
到此我們的 channel 的 initAndRegister 介紹完成,並且介紹了 channel 就緒後的執行方法 eventLoop 的 execute 調用的 run 方法的邏輯。其實 run 方法不是說註冊初始化的時候就調用的,而是通過任務或者就緒 channel 觸發的,只是註冊時候說到這個代碼就直接跟完這個邏輯,讓大家也好理解一點。
Tag 2 doBind0() 綁定埠號先看一下面需要跟蹤的代碼在哪裡,上面饒了一個圈,現在回來先看看回到哪裡,不然都不知道自己是誰,自己在哪。
找到 doBind0方法:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) { // 只有當channel初始化註冊成功後,才會進行綁定
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}如果初始化失敗,則直接返回失敗,我們這裡跟正確邏輯,直接跟 channel.bind方法:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}這裡不多少直接進去,繼續探索:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}一樣。繼續探索:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
。。。。
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}這裡涉及到 pipeline 的邏輯後面細說,這裡我們直接去看看一下 bind 的代碼:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}這裡找到 bind 實現:io.netty.channel.DefaultChannelPipeline.HeadContext#bind
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}繼續跟進:unsafe.bind : io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 省略。。。
// 獲取當前channel是否被激活。注意,現在還沒有被激活,所以其值為false
boolean wasActive = isActive();
try {
// 綁定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive(); // 觸發重寫的channelActivate方法的執行
}
});
}
safeSetSuccess(promise);
}最終找到 AbstractUnsafe 中找到最終調用的 doBind 方法,在調用前又獲取了當前 channel 是否被激活,若已經激活則觸發 pipeline 的 fireChannelActive 方法執行。這個都在 pipeline 再具體細說。當 channel 沒有被激活時才去調用 NIO 原生的channel 進行綁定。代碼:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}javaChannel() 即獲取 NIO 原生 channel 的方法,再獲取到 NIO 原生 channel 之後調用 bind 方法完成綁定。
綁定 bind 方法我們就先初步了解最終怎麼完成綁定的,要清楚到最後完成 bind 的依然是 NIO 的 channel。關於 pipeline 也是 netty 的一塊重點,後面我們再細說。
這邊完整的 server 啟動初始化到啟動的代碼我們跟完了,現在可以類比這再把 client 的代碼跟一遍,不過現在再看 client 的代碼,會有很多不同的理解出來也應該會有很多原來如此的理解吧。
Client 端啟動分析上面我們跟完了 Server 端的代碼初始化到啟動分析,下面我們在跟下 Client 端的代碼,這兩部分有重複代碼有相同的邏輯也有不同的地方。從啟動類就可以看到他們是有區別的,但是上面理解了在看下面這個會容易很多。第一步找到代碼:
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SomeSocketClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
創建 NioEventLoopGroup,這個因為和 Server 端是一樣的,所以這裡不在分析。初始化 Bootstrap 啟動配置類,配置啟動參數,這個在上面 Server 端的 Bind 方法分析的時候,也有看到都是在哪裡使用這個配置的屬性的。這裡也不細說。客戶端啟動,也就是:bootstrap.connect,這裡就是與 Server 端的主要區別。在 Server 端是啟動一個服務端服務,使用的是 bind 綁定當前機器的埠,對外暴露服務,而在 Client 端就是主動去連接 Server 端,與伺服器建立連接。所以這裡是 connect。這裡我們主要跟進去看看這個方法Bootstrap.connect() 分析:先找到 Bootstrap 的代碼,跟進去:
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
// 驗證bootstrap的group、channelFactory與handler是否為空
validate();
// 解析並連接地址
return doResolveAndConnect(remoteAddress, config.localAddress());
}InetSocketAddress.createUnresolved 將輸入的連接地址和埠號保存創建創建 InetSocketAddress 對象返回。
validate() 方法主要校驗 bootstrap 的必須配置是否為空:group、channelFactory與handler是否為空。
然後調用 doResolveAndConnect 方法建立連接
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 創建、初始化並註冊channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 若channel註冊完畢
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 解析並連接server端地址
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}這段代碼我想你看到之後應該很眼熟,這裡如果拋開 doResolveAndConnect0 方法不看,其他的邏輯可以說是與 Server 端的 bind 方法跟進去看到的是一樣的。首先通過異步的方式初始化並註冊 channel,然後獲取異步結果,判斷是否異常,處理異常情況。沒有異常,判斷當前異步方法是否結束,如果結束根據結束的狀態處理結束的邏輯,因為結束可以是正常也可以是異常結束。如果是異步結果一直沒有結果,那就建立監聽,監聽異步結果返回時,觸發最終邏輯。
這裡我們也會將當前方法跟進去說一遍,但是只會細說與 Server 端不同的地方。
按照 Server 端的介紹模式,這裡分成三段詳細介紹
Tag 1:initAndRegister();
Tag 2:doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
Tag 1:initAndRegister() 分析final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 創建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將channel註冊到selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}這裡調用的 io.netty.bootstrap.AbstractBootstrap#initAndRegister 的方法與 Server 端調用的完全一樣。我們h還是把當前方法分成三部分然後一個一個細說:
Tag 1.1:channelFactory.newChannel();
Tag 1.2:init(channel);
Tag 1.3:config().group().register(channel);
Tag 1.1 newChannel() 分析首先在這裡不一樣的是 Bootstrap 的啟動配置類傳入的 channel 是 .channel(NioSocketChannel.class)。而在 Server 端傳入的 NioServerSoucketChannel 。
所以在 Client 端 channelFactory 調用的則是 NioSocketChannel 的無參構造來初始化創建 channel。所以這裡直接找到 NioSocketChannel 的構造方法:
public NioSocketChannel() {
// DEFAULT_SELECTOR_PROVIDER:全局唯一的provider,通過它可以創建出selector與channel
this(DEFAULT_SELECTOR_PROVIDER);
}
// 繼續跟進 this
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}到這裡調用了 newSocket(provider) 方法。主要是創建了原生的 SocketChannel ,provider.openSocketChannel() 。初始化 Nio 原生的 channel 之後,就是創建 Netty 包裝的 channel 的過程了。
繼續往後跟:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}這裡的 new NioSocketChannelConfig(this, socket.socket()) 與 Server 端一樣,此處就不再跟。我們直接跟 super(parent, socket) :點擊去找到 io.netty.channel.nio.AbstractNioByteChannel#AbstractNioByteChannel:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 第三個參數: 指定關注事件為讀事件 OP_READ
super(parent, ch, SelectionKey.OP_READ);
}這裡就出現了與 Server 端不一樣的地方了。不知道大家還記得在 Server 端創建 channel 的時候,指定的關注事件是什麼,直接貼上代碼:
當時跟 Server 端註冊 channel 時關注就的是 OP_ACCEPT,而在 Client 端創建的 channel 變成了關注的事件為 OP_READ 事件,因為當 Client 連接 Server 完成時,會由 Server 端通知 Client 端連接成功,所以此時 Client 直接註冊 OP_READ 使事件來監聽來自 Server 的返回。
然後下面的 super 的代碼與 server 端的是同一個父類 io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel 。所以後面的代碼是與 Server 端一樣。如果忘了可以往上翻翻,找到 Server 端的 newChannel 方法就可以看到。
Tag 1.2 init(channel) 分析上面跟完了 client 端的 channel 創建,接著就是看 channel 的初始化。同樣是先找到代碼:
因為這是 Client,啟動類是 Bootstrap。所以找到對應方法:這也是與 Server 端的區別。
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// 將bootstrap中創建的ChannelInitializer處理器添加到pipeline
p.addLast(config.handler());
// 將bootstrap中的options初始化到channel
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 將bootstrap中的attrs初始化到channel
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}這裡可以明顯看到與 Server 端的 init 方法相比,做的事情要少很多。首先將 Bootstrap 配置傳入的 handler 添加到 channel 的 pipeline 中 p.addLast(config.handler());。然後將 bootstrap 中配置的 attr 和 options 的屬性值初始化到 channel 中。
這裡稍微比較下 Client 端和 Server 端的 init 方法區別:
可以看到當時再 Server 端我們不僅處理了 ParentGroup 的屬性初始化,還初始化了以 Child 開頭的 ChildGroup 的屬性初始化,而再 Server 端是獲取 pipeline 將傳入的 childHandler 再次註冊成一個新的 hanndler 然後添加到當前的 pipeline 中。再 Clinet 端則是直接將配置類傳入的 handler 添加到 pipeline。這就是兩邊主要的區別,也就是因為 Server 傳入的是兩個 NioEventLoopGrou 才有的處理邏輯上的區別。也就是Server 端使用的是 Reactor 線程池模型,而Client 使用的 Reactor 模型。
Netty-Client 端的 Reactor 模型:
所以因為選擇模型的區別,再處理邏輯上也有區別。
Tag 1.3 register(channel) 分析這部分就完全與 Server 端一致,所以這裡也不再囉嗦。不明白的可以直接翻到前面就可以看到。不在贅述。
Tag 2:doResolveAndConnect0 分析上面 Client 端的 channel 初始化與註冊也看了一遍。下面我們繼續看 doResolveAndConnect0 方法:io.netty.bootstrap.Bootstrap#doResolveAndConnect0
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
// 創建一個地址解析器,其中包含一個地址格式匹配器
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
// 若解析器不支持該地址 或 該地址已經解析過了,則直接對該地址進行連接,
// 返回可修改的promise,即成功了就成功,失敗了則promise中有失敗信息
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
doConnect(remoteAddress, localAddress, promise);
return promise;
}
// 以異步方式解析server地址
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) { // 處理解析完成的情況(成功或異常)
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) { // 若異步解析中出現了問題,則直接關閉channel
channel.close();
promise.setFailure(resolveFailureCause);
} else { // 處理異步解析成功的情況
// resolveFuture.getNow() 從異步對象中獲取解析結果,即解析過的地址
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}doResolveAndConnect0 方法我們直接畫個圖看下邏輯流程,具體的需要我們細看的時 doConnect 的連接方法。
下面我們繼續跟 doConnect 方法:
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
// 為promise添加一個異常監聽器。連接過程發生異常,則關閉channel
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}這裡獲取 channel 中綁定的 EventLoop 調用 execute 方法,上面關於 channel 的註冊綁定流程大家還有印象沒?這裡調用的代碼就是下面這段:SingleThreadEventExecutor#execute
做的事情就是添加任務,因為當前的線程就是 eventLoop ,所以 !inEventLoop == false。在這裡的邏輯只是添加任務。上面在介紹的 channel 綁定註冊時候走的邏輯還有啟動線程,在這裡連接的時候相當於線程已經啟動,這裡只是添加任務,最後任務會在前面介紹的 run 方法裡面執行(Tag 1.3.3 eventLoop.excute() 的 run 方法執行分析)。
這裡就直接看這個匿名內部內的方法,繼續跟 connect 方法:io.netty.channel.AbstractChannel#connect(java.net.SocketAddress, java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}通過 pipeline 進行連接:
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}pipeline 獲取為節點進行調用連接:
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 查找要處理該請求的處理器節點
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
// 獲取處理器節點的executor
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}這裡首先 findContextOutbound 找到處理器節點,後面說。然後獲取處理器的 EventExecutor。執行 invokeConnect。這裡主要看 invokeConnect 連接處理:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) { // 判斷該處理器節點中對應的處理器是否已經添加
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}直接繼續跟connect 方法,這裡找到的是匿名內部內 HeadContext:io.netty.channel.DefaultChannelPipeline.HeadContext#connect
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
// 連接
unsafe.connect(remoteAddress, localAddress, promise);
}獲取到底層 unsafe 對象進行連接:io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) { // 連接
// 省略+.這裡進行連接,找到 doConnect 連接方法:io.netty.channel.socket.nio.NioSocketChannel#doConnect
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress); // 將localAddress綁定到channel
}
boolean success = false;
try {
// 連接server地址,若本次連接成功,則成功;若不成功,則當前channel的連接就緒
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
// 指定其關注的事件為 連接就緒
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}doBind0(localAddress) 將Client 端指定的埠號綁定到 channel,localAddress 為配置類設置的 Client 埠號。
然後進行連接,這裡首先在執行時直接進行連接,如果第一次連接成功則直接返回成功,如果失敗,註冊 Selector 事件 OP_CONNECT ,即將當前 channel 修改為連接就緒,後續執行到 run 方法時就會再次執行連接,直到連接成功,結束當前連接就緒。
到這就是整個 Client 的啟動。整體看下來可以類比 Server 端,大體流程還是差不多的。學就完了。
Pipeline前面說了那麼學完,大家對於 Netty 執行流程,Server 端和 Client 端的啟動,大家都有了很深的認識,前面也留了很大一塊關於 Netty 服務啟動之後的處理過程,這部分也就是前面沒有說的 Pipeline 的部分知識點。下面我們就重點說說 Pipeline 。
Pipeline 的創建先找到代碼,Pipeline 的創建其實在前面也有看到,這個入口就在Server 端和 Client 端啟動的時創建 Channel 的時候。找到代碼:io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 為channel生成id,由五部分構成
id = newId();
// 生成一個底層操作對象unsafe
unsafe = newUnsafe();
// 創建與這個channel相綁定的channelPipeline
pipeline = newChannelPipeline();
}newChannelPipeline() 這個就是 Pipeline 的創建入口,跟進去:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}可以看到這裡是直接創建了 DefaultChannelPipeline。直接找到構造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 創建尾節點
tail = new TailContext(this);
// 創建頭節點
head = new HeadContext(this);
// 將頭尾節點連接
head.next = tail;
tail.prev = head;
}
創建一個記錄成功的 succeededFuture 。創建一個記錄異常的 voidPromise,在 VoidChannelPromise 方法中創建了一個異常監聽器,觸發重寫的 fireExceptionCaught(cause) 的方法執行。這裡我們去看看兩個 TailContext 和 HeadContext。兩個差不多,先看 new TailContext(this);.這裡 TailContext 是一個內部類:
實現了 ChannelInboundHandler 處理器,是一個 InboundHandler。關於 InboundHandler 和 OutboundHandler 處理器下面單獨說,這裡不展開。只要知道 InboundHandler 的方法都是處理回調的方法。
這裡還是看 TailContext 的構造方法。第一步調用了父類構造,然後修改節點的處理器狀態,先進去看看修改節點處理器狀態的方法:
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState; // 獲取處理器狀態
if (oldState == REMOVE_COMPLETE) { // 處理器狀態為移除狀態
return false;
}
// 通過CAS方式將處理器狀態修改為 添加完畢
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}就是通過 CAS 的方式將當前節點的處理器狀態修改為添加完畢。然後我們再回去跟父類構造:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
// 每個處理器節點都會綁定一個executor
this.executor = executor;
// 執行標記
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}這裡先是綁定 pipeline 和 executor,不過這裡的 executor 傳入是 null,每個處理器節點都會綁定一個 Ececutor ,如果當前處理器的 executor 為空則直接使用 channel 的 Executor 來執行當前處理器節點裡的處理器方法。
這裡我們跟進去看下 mask(handlerClass) 方法:
static int mask(Class<? extends ChannelHandler> clazz) {
// 從緩存中嘗試著獲取標記
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
// 創建一個標記
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}可以看到這個方法是一個靜態的,首先從緩存中獲取標記數據,獲取不到為當前處理器類創建緩存標記:mask0(clazz):
這是充分使用了二進位的開關的性質,這裡方法的作用就是將所有的 InboundHandler 處理器和 OutboundHandler 處理器中定義的方法進行標記,如果其中的方法被實現了,並且方法中沒有 @Skip 註解,則當前方法對應的二進位位的值是 1,噹噹前標記位等於 1 時,則標記當前方法時需要執行的。
其實這裡在 TailContext 和 HeadContext 中所有的標記位都是 1,因為 TailContext 和 HeadContext 分別都實現了 InboundHandler 和 OutboundHandler 接口中的接口。這裡說這個主要因為這裡在我們自定義的處理器時就會使用到。扎到一個自定義的處理器:
我們沒有直接實現自 InboundHandler,而是直接繼承了 ChannelInboundHandlerAdapter ,大家可以進去看看,在 ChannelInboundHandlerAdapter 中每一個實現方法上都有一個 @Skip 註解,而且它默認實現了所有的 InboundHandler 接口的方法,就是為了我們在定義自定義處理器減少一些默認實現的處理,而且為了性能在初始化時將所有的方法打上標記,保證只執行我們自己實現的方法,這就是這個標記的用處。這裡 mask 處理的都是 InboundHandler 和 OutboundHandler 處理器中的接口方法。
好了到這裡 TailContext 節點創建完成,我們接著看 HeadContext 節點:
// 頭節點既是inbound處理器,也是outbound處理器
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
// 。。。。HeadContext 也是內部類,這裡與 TailContext 不同的是,HeadContext 同時實現了 InboundHandler 和 OutboundHandler。並且創建了一個用於底層 IO 處理的 unsafe 對象。到這裡 Pipeline 的初始化創建看完了,可以看到 Pipeline 在 Channel 的創建的時初始化創建的。
下面說一下一個特殊的處理器,前面也是一直看到:ChannelInitializer處理器。
ChannelInitializer 處理器節點先看下類圖:
ChannelInitializer 繼承於 ChannelInboundHandler 接口,是一個抽象類,定義了一個抽象方法:
protected abstract void initChannel(C ch) throws Exception;以 Server 端為例,我們在使用 ChannelInitializer 時都需要實現 initChannel 方法:
在 ServerBootstrap 中 io.netty.bootstrap.ServerBootstrap#init 方法中可以找到代碼:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});負責在 accept 新連接的 Channel 的 pipeline 被添加了一個 ChannelInitializer,由於此時這個 Channel 還沒有被註冊到EventLoop,於是在 addLast 方法的調用鏈中,會給 pipeline 添加一個 PendingHandlerAddedTask ,其目的是在 Channel被註冊到 EventLoop 的時候,觸發一個回調事件然後在 AbstractBootstrap.initAndRegister() 方法中,這個Channel會被註冊到 ParentEventLoopGoup,接著會被註冊到 ParentEventLoopGoup 中的某一個具體的 EventLoop 然後在AbstractChannel.register0() 方法中,之前註冊的 PendingHandlerAddedTask 會被調用,經過一系列調用之後,最終 ChannelInitializer.handleAdded() 方法會被觸發。所以我們進去看 ChannelInitializer.handleAdded() 方法:
// 當前處理器所封裝的處理器節點被添加到pipeline後就會觸發該方法的執行
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) { // 若channel已經完成了註冊
if (initChannel(ctx)) {
// 將當前處理器節點從initMap集合中刪除
removeState(ctx);
}
}
}可以看到 ChannelInitializer 的方法觸發時必須在 Channel 註冊完成之後,然後開始執行 initChannel 方法,在初始化操作完成之後又將當前處理器節點從 initMap 集合中移除。現在先看看 initChannel 方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 將當前處理器節點添加到initMap集合中
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 調用重寫的initChannel()
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 立即將該處理器節點從pipeline上刪除
ChannelPipeline pipeline = ctx.pipeline();
// 查找在pipeline中是否存在當前處理器
if (pipeline.context(this) != null) {
// 將當前處理器節點從pipeline中刪除
pipeline.remove(this);
}
}
return true;
}
return false;
}在 initChannel 方法中,先將當前處理器節點添加到 initMap 中,然後調用抽象方法 initChannel ,由此調用到抽象類的實現方法,也就是在前面 Server 端代碼中我們初始化 childHandler 時添加的實現方法,不過 ChannelInitializer 在 Netty 自己的代碼中也有多處使用,上面說的 Server 端啟動初始化的時候在 init 方法中也就有使用。不過我們自己定義的可以回顧一下:
在 Server 端 ServerBootstrap 中我們使用 ChannelInitializer 給 pipeline 中添加處理器節點。
再回到 initChannel 方法。繼續看再其執行完重新的 initChannel 方法之後必然執行 finally 的代碼,首先獲取當前Pipeline,pipeline.context(this) 從 Pieline 查找但其處理器節點是否存在。存在然後 pipeline.remove(this) 將其從當前 Pipeline 中移除。
感興趣可以看下,pipeline.context 方法和 pipeline.remove 方法,因為 Pipeline 中的處理器節點 時鍊表形式保存的,所以在這兩個方法方法的處理就是鍊表的查找和刪除。
看到這裡可以發現 ChannelInitializer 的主要目的是為程式設計師提供了一個簡單的工具,用於在某個 Channel 註冊到EventLoop 後,對這個 Channel 執行一些初始化操作。ChannelInitializer 雖然會在一開始會被註冊到 Channel 相關的pipeline 裡,但是在初始化完成之後,ChannelInitializer 會將自己從pipeline中移除,不會影響後續的操作。剛剛看到的 pipeline.remove 就是將當前處理器節點從 pipeline 移除的方法,而在執行完 initChannel 方法後,在 handlerAdded 方法中又將添加到 initMap 中的處理器節點也移除了。
Hanndler 添加到 Pipeline上面說了 ChannelInitializer 處理器節點,看到我們在重寫 initChannel 方法時調用的都有 pipeline 新增處理器方法,也就是 addLast 方法,這裡我們詳細看看 addLast 怎麼將處理器添加到 pipeline 的:
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
public final ChannelPipeline addLast(ChannelHandler... handlers) {
// 第一個參數為group,其值默認為null
return addLast(null, handlers);
}前面說過,每個 Pipeline 都有一個 EventLoop 綁定,這裡添加方法默認傳入一個 EventLoopGroup 參數,不過這裡傳了空;繼續往下跟:
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 遍歷所有handlers,逐個添加到pipeline
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);// 這裡第二個參數是處理器name
}
return this;
}遍歷傳入參數,將 Handler 循環添加到 Pipeline 中:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 檢測處理器是否被多次添加
checkMultiplicity(handler);
// 將處理器包裝為一個節點 filterName() 獲取到節點的名稱
newCtx = newContext(group, filterName(name, handler), handler);
// 將新的節點添加到pipeline
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 獲取新建節點綁定的eventLoop
EventExecutor executor = newCtx.executor();
// 若該eventLoop綁定的線程與當前線程不是同一個,則執行下面的代碼
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 若該eventLoop綁定的線程與當前線程是同一個線程,
// 則調用重寫的handlerAdded()方法
callHandlerAdded0(newCtx);
return this;
}
校驗當前處理器是否被多次添加,沒有被 @Sharable 註解標註的處理器只可以被添加一次。將處理器包裝一個新的節點 newContext(group, filterName(name, handler), handler)將新的節點添加到 Pipeline 中。addLast0(newCtx)判斷 channel 沒有註冊,處理異常情況。callHandlerCallbackLater(newCtx, true)獲取新節點的 EventLoop ,判斷是否是當前線程,如果不是當前線程執行 callHandlerAddedInEventLoop(newCtx, executor)新節點的 EventLoop 是當前線程執行 callHandlerAdded0(newCtx)這裡我們標記幾個需要往下細跟的代碼:
Tag Handler 1 :newContext
Tag Handler 2 :addLast0
Tag Handler 3 callHandlerCallbackLater/callHandlerAdded0
Tag Handler 1 :newContext這裡處理還是比較複雜,我們具體進去看:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// childExecutor() 獲取與當前處理器節點相綁定的eventLoop
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}首先這裡我們傳入的 group 是 null ,前面一直沒有傳,這裡調用 childExecutor(group) 直接跟進去:
private EventExecutor childExecutor(EventExecutorGroup group) {
// 若group為null,則與該節點綁定的eventLoop為null
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}其實在我們沒有指定 Group 的時候,這裡代碼是被直接返回了出去,下面執行邏輯就沒有執行,而此時節點的 EventLoop 是在 EventExecutor executor = newCtx.executor(); 中綁定的:
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop(); // 獲取到channel所綁定的eventLoop
} else {
return executor;
}
}在新節點獲取當前節點綁定的 Executor 時如果未綁定,則直接獲取當前 Channle 的 eventLoop 。這裡再回去看 childExecutor 方法,如果要走到下面邏輯則需要到添加 Handler 時傳入一個 EventLoopGroup:
好了,這樣我們的 Group 就不為 null。繼續看 childExecutor 下面的邏輯:
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
// 輪詢獲取 eventLoop
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
獲取 channel 的 option 配置的 SINGLE_EVENTEXECUTOR_PER_GROUP 值。用來配置當前處理器節點是否綁定使用同一個 EventLoop。不為空並且是 False : 則表示一個group中的每一個處理器都會分配一個 eventLoop,調用 EventLoopGroup 的 next 方法,而 next 是輪詢的方式從 Group 中選取 EventLoop。沒有配置使用同一個 eventLoop 則先獲取緩存中保存的 EventLoopGroup 對應的 eventLoop,如果緩存中存在則直接返回,如果緩存中不存在則從 EventLoopGroup 獲取一個 eventLoop,保存到緩存中並返回。這就是創建處理器節點中綁定 EventLoop 的方法。所以到這可以看到處理器節點的 EventLoop 可以指定,不指定則直接使用當前 channel 的 EventLoop 。拿到 EventLoop 之後則直接創建處理器節點 new DefaultChannelHandlerContext
Tag Handler 2 :addLast0.這裡直接找到代碼跟進去:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}這就是一個鍊表操作,先獲取到當前 Pipeline 的 TailContext 節點,因為這裡是我們從 addLast 方法跟進來的,所以這裡是添加在尾節點前,也就是末尾添加。同樣類比其他的 Handler 添加方法 addBefore。這裡獲取到 Tail 節點,將鍊表收尾關聯到新的節點上,完成鍊表的新增即完成新節點添加。
Tag Handler 3 callHandlerCallbackLater/callHandlerAdded0直接跟 callHandlerCallbackLater方法:
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
newCtx.setAddPending(); // 將當前節點狀態設置成處理中,等待 callHandlerAdded0 執行完成
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
}可以看到這個方法也是調用的 callHandlerAdded0 方法,只不過因為當前節點綁定的 EventLoop 不是當前執行線程,所以需要通過 EventLoop 創建一個新的任務,由任務來完成 callHandlerAdded0 方法的執行,而是當前線程則直接執行 callHandlerAdded0 方法。繼續看 callHandlerAdded0 方法:
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}這裡直接調用當前處理器的 callHandlerAdded 方法,如果異常則將資源移除。進到 callHandlerAdded 方法中:
final void callHandlerAdded() throws Exception {
if (setAddComplete()) { // CAS 方式將處理器狀態修改為 添加完畢 ADD_COMPLETE
handler().handlerAdded(this);
}
}先通過 CAS 修改處理器狀態為添加完成,狀態修改成功則調用處理器的添加方法完成處理器添加。
到這裡處理器的添加完成。添加處理器方法畫個圖再回顧一下:
說到這裡,pipeline 的初始化到 Pipeline 中添加 Handler 。那到這裡我們在整體的理解下 Pipeline:
現在在結合這個圖去理解一下 Pipeline 的創建和添加的過程應該心裏面會有點不一樣認知。當然說到這裡還是沒有說到 Pipeline 的執行順序,下面我們從 InboundHandler 處理器和 OutboundHandler 處理器來說 Pipeline 的執行過程。
Pipeline 中消息的傳遞與處理要說 Pipeline 的中的消息處理,首先還是先說下我們的 Handler,因為 Pipeline 中的消息都是由每一個處理器來完成處理的。
ChannelHandler 是 handler 的頂級接口,我們所有的處理器都實現自該接口,當前接口只定義了兩個方法:hanndlerAdded 和 handlerRemoved,用於當前處理器被從 Pipeline 中添加或移除時調用的方法,還有一個過時的處理異常的方法:exceptionCaught。
再說 ChannelHandler 兩個子類接口:ChannelInboundHandler 和 ChannelOutboundHandler 這兩個接口將處理器方法定義成了兩類,ChannelInboundHandler 定義了所有的用於處理回調的方法,都是需要出動觸發的方法。ChannelOutboundHandler 定義的所有的用於主動調用的處理器方法,需要主動調用。
而 ChannelHandlerAdapter 則是處理器適配器頂級抽象類,用於定義處理器適配器的規範。
ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 處理器適配器中它幫我們默認實現了所有的 handler 的方法,並且每個方法上面都標記了 @Skip 註解,在前面也看到,被標記 @Skip 註解的 Handler 方法會被標記,不會被 Handler 執行,這讓我們再使用適配器之定義處理器時只要重寫我們關心方法即可,重寫的方法不會標記 @Skip 。
還有一個重要的類需要說下:ChannelHandlerContext,上下文對象,每個 Handler 每個方法都需要傳遞上下文對象,再 Pipeline 中處理器的調用就是通過 ChannelHandlerContext 上下文對象完成執行鏈的調用,也可以用來保存上下文數據。
下面就說重點,Pipeline 中 handler 的執行過程。先定義一個處理器:
只實現 channelRead 方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("InboundHandler1 進入 " + msg);
ctx.fireChannelRead(msg);
System.out.println("InboundHandler1 退出 " + msg);
}只實現 channelRead 方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("InboundHandler2 進入 " + msg);
ctx.fireChannelRead(msg);
System.out.println("InboundHandler2 退出 " + msg);
}只實現 channelRead 方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("InboundHandler3 進入 " + msg);
ctx.fireChannelRead(msg);
System.out.println("InboundHandler3 退出 " + msg);
}只實現 write 方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("OutboundHandler1 進入 " + msg);
ctx.write(msg, promise);
System.out.println("OutboundHandler1 退出 " + msg);
}只實現 write 方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("OutboundHandler2 進入 " + msg);
ctx.write(msg, promise);
System.out.println("OutboundHandler2 退出 " + msg);
}只實現 write 方法
@Override
InboundHandler 執行流程
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("OutboundHandler3 進入 " + msg);
ctx.write(msg, promise);
System.out.println("OutboundHandler3 退出 " + msg);
}添加三個 InboundHandler處理器,添加順序為:
InboundHandler1 -> InboundHandler2 -> InboundHandler3
添加完成後 Pipeline 如圖:
此時請求過,Handler 在 Pipeline 的執行流程:
當 client 請求到 Server 並寫入數據時,觸發 Server 端 Head 節點的 channelRead 方法,此時調用鏈開始執行,Head 節點執行 channelRead 方法調用 fireChannelRead 觸發 next 節點的 channelRead 方法執行,不過這裡會先判斷 next 節點的方法標記也就是 mask 是否標記當前 Handler 是否需要執行 channelRead 方法,如果重寫則調用,否則繼續找 next 的 next 節點。舉個慄子:如果 handler2 沒有重寫 channelRead 方法,則調用會變成這樣:
Head -> handler1 -> handler3 -> Tail。
所以 Pipeline 的處理器執行是由 Head 節點開始,由 Head 觸發 Inbound 方法,完成調用鏈執行。
這裡我們可以找到執行調用的代碼,先看 Head 節點的 channelRead 方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg); // 準備調用其下一個節點的channelRead()
}ctx.fireChannelRead(msg) 進去看看如果調用下一個節點:io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}這裡通過 findContextInbound 方法獲取下一個節點,傳入的參數 MASK_CHANNEL_READ 用來判斷當前處理器的 channelRead 方法是否被標記需要執行。進入 findContextInbound 方法:
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}獲取當前節點的 Next 節點,判斷 Next 節點的 channelRead 方法是否需要執行,不需要則繼續獲取下一個,知道獲取到 Tail 節點,因為 Tail 實現了所有的 InboundHandler 處理器方法。
然後執行 invokeChannelRead 方法,觸發 Next 節點的 channelRead 的執行:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}直接找到 invokeChannelRead 方法:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}invokeHandler() 方法判斷當前處理器狀態是否完成,然後調用下一個節點的 channelRead 方法。
Inbound 執行過程的代碼以 channelRead 為例就說到這,然後現在看著代碼再結合上面的圖大家應該很清楚整個執行的流程。
OutboundHandler 執行流程OutboundHandler 與 InboundHandler 不一樣,因為 OutboundHandler 的方法是需要我們自己調用,而不像 InboundHandler 處理器的方法是觸發調用。所以在這種情況下,如果我們只添加 OutboundHandler 處理器的話,當 Client 發起請求也不會觸發 OutboundHandler 的方法執行。因此我們加一個 InboundHandler 方法,用來觸發 OutboundHandler 的執行。
定義 WriteInboundHandler :重寫 channelRead 方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("WriteInboundHandler 進入 " + msg);
ctx.channel().write("我是 Server。");
System.out.println("InboundHWriteInboundHandler andler3 退出 " + msg);
}這裡在 WriteInboundHandler 中我們獲取到 Channel,然後通過 Channel 將消息寫回 Clinet 端。
Hanndler 添加順序:WriteInboundHandler -> OutboundHandler1 -> OutboundHandler2 -> OutboundHandler3
添加完成後 Pipeline 如圖:
而此時 client 請求到 Server 並寫入數據時,一樣是先觸發 Head 節點的 channelRead 方法。執行流程如下:
Outbound 的處理流程與 Inbound 的流程是相反的,在 Inbound 中處理是獲取 next 節點執行,而在 Outbound 節點中獲取的確是 prev 節點的重寫方法執行。所以這裡的執行順序會變成跟添加順序相反的順序執行。
這裡首先請求進入 Server 觸發 Head 節點的 channelRead 方法執行,然後再獲取下一個重寫了 channelRead 方法的 Inbound 處理器,找到 WirteHandler 處理器,在 WirteHandler 處理器中掉用 Channel 的 writeAndFlush 方法,找到Tail 節點實現方法,調用了 Write 方法,然後獲取 Prev 節點中重寫了 write 方法的處理器,找到 Outbound3,然後依次調用 Outbound2 的 write 方法和 Outbound1 的 write 方法,執行完成依次返回。
這裡我們可以找到執行調用的代碼:
先看 Inbound 處理器執行調用的觸發過程,從 ctx.channel().write 的方法進入,找到 io.netty.channel.DefaultChannelPipeline#write(java.lang.Object):
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}可以看到在 Channel 中的 Write 方法直接找到 Tail 調用其重寫的 Write 方法:
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}往下跟 write 方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 查找下一個節點(prev)
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
task.cancel();
}
}
}可以看到這裡先通過 findContextOutbound 方法獲取下一個節點:
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}在這裡我們可以看到,Outbound 的獲取順序是獲取的 prev ,並且判斷該 handler 中當前方法是否標記需要執行。
獲取到下一個節點之後在 wirte 方法中通過 next.invokeWrite 方法完成執行器鏈調用。當然如果我們一個 OutboundHandler 都沒有定義的話,findContextOutbound 方法最終會獲取到 Head 節點,然後執行 Head 節點 write 方法,因為我們在前面看到 HeadContext 節點同時實現了 Inboundhandler 和 OutboundHandler。這裡我們去看一下 Head 節點:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}最終在 HeadContext 中完成底層 unsafe 的 write 操作。
最後至此整個 Netty 的源碼應該了解的都差不多了,大家沒事就可以拿出來讀一下鞏固一遍,腦海裡再過一遍,這整個體系就在腦海中建立起來了,基本上穩了。