Recycler 是 Netty 中基於 ThreadLocal 的輕量化的對象池實現。既然是基於 ThreadLocal,那麼就可以將其理解為當前線程在通過對象池 Recycler 得到一個對象之後,在回收對象的時候,不需要將其銷毀,而是放回到該線程的對象池中即可,在該線程下一次用到該對象的時候,不需要重新申請空間創建,而是直接重新從對象池中獲取。
Recycler 對象池在 netty 中最重要的使用,就在於 netty 的池化 ByteBuf 的場景下。首先,何為池化?以 PooledDirectByteBuf 舉例,每一個 PooledDirectByteBuf 在應用線程中使用完畢之後,並不會被釋放,而是等待被重新利用,類比線程池每個線程在執行完畢之後不會被立即釋放,而是等待下一次執行的時候被重新利用。所謂的對象池也是如此,池化減少了 ByteBuf 創建和銷毀的開銷,也是 netty 高性能表現的基石之一。
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) { return new PooledDirectByteBuf(handle, 0); }};static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf;}
PooledDirectByteBuf 在其類加載的過程中,初始化了一個靜態的 RECYCLER 成員,通過重寫其 newObject()方法達到使 Recycler 可以初始化一個 PooledDirectByteBuf。而在接下來的使用中,只需要通過靜態方法 newInstance()就可以從 RECYCLER 對象池的 get()方法獲取一個新的 PooledDirectByteBuf 對象返回,而重寫的方法 newObject()中的入參 Handler 則提供了 recycle()方法給出了對象重新放入池中回收的能力,這裡的具體實現在下文展開。因此,newInstance()方法和 recycle()方法就提供了對象池出池和入池的能力,也通過此,PooledDirectByteBuf 達到了池化的目標。
Recycler 的實現原理很抽象,可以先直接閱讀文末的例子再閱讀這部分內容。
Recycler 中,最核心的是兩個通過 ThreadLocal 作為本地線程私有的兩個成員,而其實現原理只需要圍繞這兩個成員分析,就可以對對象池的設計有直接的理解和認識。
•第一個成員是在 Recycler 被定義的 Stack 成員對象。
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); }};
顧名思義,這個 Stack 主體是一個堆棧,但是其還維護著一個鍊表,而鍊表中的每一個節點都是一個隊列。
private DefaultHandle<?>[] elements;private WeakOrderQueue cursor, prev;
上述的 elements 數組便是存放當前線程被回收的對象,噹噹前線程從該線程的 Recycler 對象池嘗試獲取新的對象的時候,首先就會從當前 Stack 的這個數組中嘗試獲取已經在先前被創建並且在當前線程被回收的對象,因為當對象池的對象在當前線程被調用 recycle()的時候,是會直接放到 elements 數組中等待下一次的利用。那麼問題來了,如果從該線程中被申請的這個對象是在另外一個線程中被調用 recycle()方法回收呢?那麼該對象就會處於鍊表中的隊列中,當堆棧數組中的對象不存在的時候,將會嘗試把鍊表隊列中的對象轉移到數組中供當前線程獲取。那麼其他線程是如何把被回收的對象放到這些鍊表中的隊列的呢?接下來就是另一個成員的使命了。
•第二個成員是在 Recycler 中也是通過 ThreadLocal 所實現的一個線程本地變量,DELAYED_RECYCLED ,是一個 Stack 和隊列的映射 Map。
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() { @Override protected Map<Stack<?>, WeakOrderQueue> initialValue() { return new WeakHashMap<Stack<?>, WeakOrderQueue>(); }};
第二個成員 DELAYED_RECYCLED 可以通過上文的 Stack 獲取一個隊列。
在前一個成員的解釋中提到,當別的線程調用另一個線程的對象池的 recycle()方法進行回收的時候,並不會直接落到持有對象池的線程的 Stack 數組當中,當然原因也很簡單,在並發情況下這樣的操作顯然是線程不安全的,而加鎖也會帶來性能的開銷。因此,netty 在 Recycler 對象池中通過更巧妙的方式解決這一問題。
在前面提到,除了數組,Stack 還持有了一系列隊列的組成的鍊表,這些鍊表中的每一個節點都是一個隊列,這些隊列又存放著別的線程所回收到當前線程對象池的對象。那麼,這些隊列就是各個線程針對持有對象池的專屬回收隊列,說起來很拗口,看下面的代碼。
private void pushLater(DefaultHandle<?> item, Thread thread) { // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { // Add a dummy queue so we know we should drop the object delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // Check if we already reached the maximum number of delayed queues and if we can allocate at all. if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { // drop object return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { // drop object return; } queue.add(item);}private WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<Thread>(thread); synchronized (stack) { next = stack.head; stack.head = this; } // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. availableSharedCapacity = stack.availableSharedCapacity;}
pushLater()方法發生在當一個對象被回收的時候,噹噹前線程不是這個對象所申請的時候的線程時,將會通過該對象的 Stack 直接去通過 DELAYED_RECYCLED 映射到一條隊列上,如果沒有則創建並建立映射,再把該對象放入到該隊列中,以上操作結束後該次回收即宣告結束
private WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<Thread>(thread); synchronized (stack) { next = stack.head; stack.head = this; } // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. availableSharedCapacity = stack.availableSharedCapacity;}
如果在操作中,隊列是被創建的,會把該隊列放置在 Stack 中的鍊表裡的頭結點,保證創建該對象的線程在數組空了之後能夠通過鍊表訪問到該隊列並將該隊列中的回收對象重新放到數組中等待被下次重新利用,隊列交給 A 線程的鍊表是唯一的阻塞操作。在這裡通過一次阻塞操作,避免後續都不存在資源的競爭問題。
A 線程申請,A 線程回收的場景。
•顯然,當對象的申請與回收是在一個線程中時,直接把對象放入到 A 線程的對象池中即可,不存在資源的競爭,簡單輕鬆。
A 線程申請,B 線程回收的場景。
•首先,當 A 線程通過其對象池申請了一個對象後,在 B 線程調用 recycle()方法回收該對象。顯然,該對象是應該回收到 A 線程私有的對象池當中的,不然,該對象池也失去了其意義。
•那麼 B 線程中,並不會直接將該對象放入到 A 線程的對象池中,如果這樣操作在多線程場景下存在資源的競爭,只有增加性能的開銷,才能保證並發情況下的線程安全,顯然不是 netty 想要看到的。
•那麼 B 線程會專門申請一個針對 A 線程回收的專屬隊列,在首次創建的時候會將該隊列放入到 A 線程對象池的鍊表首節點(這裡是唯一存在的資源競爭場景,需要加鎖),並將被回收的對象放入到該專屬隊列中,宣告回收結束。
•在 A 線程的對象池數組耗盡之後,將會嘗試把各個別的線程針對 A 線程的專屬隊列裡的對象重新放入到對象池數組中,以便下次繼續使用。