Netty源碼解析 -- 對象池Recycler實現原理

2021-02-14 binecy

由於在Java中創建一個實例的消耗不小,很多框架為了提高性能都使用對象池,Netty也不例外。
本文主要分析Netty對象池Recycler的實現原理。

源碼分析基於Netty 4.1.52

Recycler的內部類Stack負責管理緩存對象。

Stack關鍵欄位

// Stack所屬主線程,注意這裡使用了WeakReference
WeakReference<Thread> threadRef;    
// 主線程回收的對象
DefaultHandle<?>[] elements;
// elements最大長度
int maxCapacity;
// elements索引
int size;
// 非主線程回收的對象
volatile WeakOrderQueue head;   

Recycler將一個Stack劃分給某個主線程,主線程直接從Stack#elements中存取對象,而非主線程回收對象則存入WeakOrderQueue中。

DefaultHandle,對象的包裝類,在Recycler中緩存的對象都會包裝成DefaultHandle類。

head欄位指向的WeakOrderQueue,用於存放其他線程的對象

WeakOrderQueue主要屬性

// Head#link指向Link鍊表首對象
Head head;  
// 指向Link鍊表尾對象
Link tail;
// 指向WeakOrderQueue鍊表下一對象
WeakOrderQueue next;
// 所屬線程
WeakReference<Thread> owner;

Link中也有一個DefaultHandle<?>[] elements欄位,負責存儲數據。
注意,Link繼承了AtomicInteger,AtomicInteger的值存儲elements的最新索引。

WeakOrderQueue也是屬於某個線程,並且WeakOrderQueue繼承了WeakReference<Thread>,當所屬線程消亡時,對應WeakOrderQueue也可以被垃圾回收。
注意:每個WeakOrderQueue都只屬於一個Stack,並且只屬於一個非主線程。


thread2要存放對象到Stack1中,只能存放在WeakOrderQueue1
thread1要存放對象到Stack2中,只能存放在WeakOrderQueue3回收對象

DefaultHandle#recycle -> Stack#push

void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    if (threadRef.get() == currentThread) {
        // #1
        pushNow(item);
    } else {
        // #2
        pushLater(item, currentThread);
    }
}

#1 當前線程是主線程,直接將對象加入到Stack#elements中。
#2 當前線程非主線程,需要將對象放到對應的WeakOrderQueue中

private void pushLater(DefaultHandle<?> item, Thread thread) {
    ...
    // #1
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {
        // #2
        if (delayedRecycled.size() >= maxDelayedQueues) {
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // #3
        if ((queue = newWeakOrderQueue(thread)) == null) {
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // #4
        return;
    }
    // #5
    queue.add(item);
}

#1 DELAYED_RECYCLED是一個FastThreadLocal,可以理解為Netty中的ThreadLocal優化類。它為每個線程維護了一個Map,存儲每個Stack和對應WeakOrderQueue。
所有這裡獲取的delayedRecycled變量是僅用於當前線程的。
而delayedRecycled.get獲取的WeakOrderQueue,是以Thread + Stack作為維度區分的,只能是一個線程操作。
#2 當前WeakOrderQueue數量超出限制,添加WeakOrderQueue.DUMMY作為標記
#3 構造一個WeakOrderQueue,加入到Stack#head指向的WeakOrderQueue鍊表中,並放入DELAYED_RECYCLED。這時是需要一下同步操作的。
#4 遇到WeakOrderQueue.DUMMY標記對象,直接拋棄對象
#5 將緩存對象添加到WeakOrderQueue中。

WeakOrderQueue#add

void add(DefaultHandle<?> handle) {
    handle.lastRecycledId = id;

    // #1
    if (handleRecycleCount < interval) {
        handleRecycleCount++;
        return;
    }
    handleRecycleCount = 0;


    Link tail = this.tail;
    int writeIndex;
    // #2
    if ((writeIndex = tail.get()) == LINK_CAPACITY) {
        Link link = head.newLink();
        if (link == null) {
            return;
        }
        this.tail = tail = tail.next = link;
        writeIndex = tail.get();
    }
    // #3
    tail.elements[writeIndex] = handle;
    handle.stack = null;
    // #4
    tail.lazySet(writeIndex + 1);
}

#1 控制回收頻率,避免WeakOrderQueue增長過快。
每8個對象都會拋棄7個,回收一個
#2 當前Link#elements已全部使用,創建一個新的Link
#3 存入緩存對象
#4 延遲設置Link#elements的最新索引(Link繼承了AtomicInteger),這樣在該stack主線程通過該索引獲取elements緩存對象時,保證elements中元素已經可見。

獲取對象

Recycler#threadLocal中存放了每個線程對應的Stack。
Recycler#get中首先獲取屬於當前線程的Stack,再從該Stack中獲取對象,也就是,每個線程只能從自己的Stack中獲取對象。
Recycler#get -> Stack#pop

DefaultHandle<T> pop() {
    int size = this.size;
    if (size == 0) {
        // #1
        if (!scavenge()) {
            return null;
        }
        size = this.size;
        if (size <= 0) {
            return null;
        }
    }
    // #2
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    this.size = size;

    ...
    return ret;
}

#1 elements沒有可用對象時,將WeakOrderQueue中的對象遷移到elements
#2 從elements中取出一個緩存對象

scavenge -> scavengeSome -> WeakOrderQueue#transfer

boolean transfer(Stack<?> dst) {
    Link head = this.head.link;
    if (head == null) {
        return false;
    }
    // #1
    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        head = head.next;
        this.head.relink(head);
    }
    // #2
    final int srcStart = head.readIndex;
    int srcEnd = head.get();
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }
    // #3
    final int dstSize = dst.size;
    final int expectedCapacity = dstSize + srcSize;

    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
    }

    if (srcStart != srcEnd) {
        final DefaultHandle[] srcElems = head.elements;
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        // #4
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle<?> element = srcElems[i];
            ...
            srcElems[i] = null;
            // #5
            if (dst.dropHandle(element)) {
                continue;
            }
            element.stack = dst;
            dstElems[newDstSize ++] = element;
        }
        // #6
        if (srcEnd == LINK_CAPACITY && head.next != null) {
            this.head.relink(head.next);
        }

        head.readIndex = srcEnd;
        // #7
        if (dst.size == newDstSize) {
            return false;
        }
        dst.size = newDstSize;
        return true;
    } else {
        // The destination stack is full already.
        return false;
    }
}

就是把WeakOrderQueue中的對象遷移到Stack中。
#1 head.readIndex 標誌現在已遷移對象下標
head.readIndex == LINK_CAPACITY,表示當前Link已全部移動,查找下一個Link
#2 計算待遷移對象數量
注意,Link繼承了AtomicInteger
#3 計算Stack#elements數組長度,不夠則擴容
#4 遍歷待遷移的對象
#5 控制回收頻率
#6 當前Link對象已全部移動,修改WeakOrderQueue#head的link屬性,指向下一Link,這樣前面的Link就可以被垃圾回收了。
#7 dst.size == newDstSize 表示並沒有對象移動,返回false
否則更新dst.size

其實對象池的實現難點在於線程安全。
Recycler中將主線程和非主線程對象回收劃分到不同的存儲空間中(stack#elements和WeakOrderQueue.Link#elements),並且對於WeakOrderQueue.Link#elements,存取操作劃分到兩端進行(非主線程從尾端存入,主線程從首部開始讀取),
從而減少同步操作,並保證線程安全。

相關焦點

  • Netty 實現原理淺析
    本文將主要分析Netty實現方面的東西,由於精力有限,本人並沒有對其源碼做了極細緻的研 究。如果下面的內容有錯誤或不嚴謹的地方,也請指正和諒解。對於Netty使用者來說,Netty提供了幾個典型的example,並有詳盡的API doc和guide doc,本文的一些內容及圖示也來自於Netty的文檔,特此致謝。
  • Java8線程池ThreadPoolExecutor底層原理及其源碼解析
    小侃一下日常開發中, 或許不會直接new線程或線程池, 但這些線程相關的基礎或思想是非常重要的, 參考林迪效應;就算沒有直接用到, 可能間接也用到了類似的思想或原理, 例如tomcat, jetty, 資料庫連接池, MQ;本文不會對線程的基礎知識進行介紹
  • Netty 實戰:如何實現文件伺服器?
    :  import java.io.RandomAccessFile;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.stream.ChunkedFile;public class FileServerHandler
  • 通過源碼解析,深入Java 線程池原理
    從池化技術到底層實現,一篇文章帶你貫通線程池技術。1、池化技術簡介在系統開發過程中,我們經常會用到池化技術來減少系統消耗,提升系統性能。在編程領域,比較典型的池化技術有:線程池、連接池、內存池、對象池等。對象池通過復用對象來減少創建對象、垃圾回收的開銷;連接池(資料庫連接池、Redis連接池和HTTP連接池等)通過復用TCP連接來減少創建和釋放連接的時間。線程池通過復用線程提升性能。
  • RPC 框架,底層到底什麼原理?
    RPC框架的原理解析最近自己寫了一個簡單的RPC框架KRPC,本文原理分析結合中代碼,均為該框架源碼,RPC與RMI的區別看這篇文章《Java RMI 和 RPC 的區別》。關於netty網上學習的資料很多,這裡也只是宏觀的講解RPC原理,就不展開。
  • 華為應用市場總架構師超神之作:用19個案例透解Netty
    前言讀者評價:實際上,本書更像是以 netty 作為參考,闡述了構建一個高性能通信框架的主要考量(當然,也講了 netty 的一些坑);第一,是線程模型,兩方面:Netty 採用了傳統的 reactor 模式,利用 boss event
  • Netty責任鏈Pipeline詳解
    一起說說Netty責任鏈,責任鏈這塊是netty運行機制的核心,一起來完整的了解下netty的責任鏈。 實現責任鏈模式 處理器抽象類,具體的處理器實現類,保存處理器信息,處理執行。
  • 手撕React學算法:React.Children-遞歸和對象池
    React.Children裡面的幾個api都是數組處理相關的,這裡就分析一下map實現原理,看看它與Array.map有什麼不同。
  • 阿里P9都窺視已久的「Java並發實現原理:JDK源碼剖析」
    本書基於JDK 7和JDK 8,對整個Concurrent包進行全面的源碼剖析。JDK 8中大部分並發功能的實現和JDK 7一樣,但新增了一些額外特性。例如CompletableFuture、ConcurrentHashMap的新實現、StampedLock、LongAdder等。
  • Jetpack源碼解析--ViewModel基本使用及源碼解析
    1.背景Jetpack源碼解析系列文章:1. Android_Jetpack組件---Naviagtion源碼解析2. Jetpack源碼解析—Navigation為什麼切換Fragment會重繪?3. Jetpack源碼解析---用Lifecycles管理生命周期4.
  • 敖C肝了一個月的Netty知識點
    高能預警,本文是我一個月前就開始寫的,所以內容會非常長,當然也非常硬核,dubbo源碼系列結束之後我就想著寫一下netty系列的,但是netty的源碼概念又非常多,所以才寫到了現在。對應的實現再其父類之中:io.netty.util.concurrent.AbstractEventExecutorGroup#execute
  • Spring-Task源碼解析
    調度總結異步執行配置原理開頭scheduled-tasks其解析的源碼較長,在此不再貼出,解析之後形成的BeanDefinition結構如下圖:taskScheduler屬性即指向task:scheduler標籤,如果沒有配置,此屬性不存在。
  • 強大的反射功能詳解與應用源碼解析
    那如果我們要實現一個對象比較功能,比較兩個User對象的屬性有什麼不同。則可以通過下面的代碼實現。(以上均參考自《通用源碼閱讀指導書——MyBatis源碼詳解》)以上例子的來源於《通用源碼閱讀指導書——MyBatis源碼詳解》中的擴展閱讀部分提及的開源項目ObjectLogger( https://github.com/yeecode/ObjectLogger )。我們這裡只是ObjectLogger的一個非常簡化的實現。
  • netty writeAndFlush源碼分析
    writeAndFlush,顧名思義,就是寫入(發送緩衝區)並且刷新,熟悉netty編碼的同學對這個方法一定不會感到陌生, 這方法既能將數據寫到發送緩存中
  • 「源碼解析 」這一次徹底弄懂react-router路由原理
    筆者個人感覺學習react-router,有助於我們學習單頁面應用(spa)路由跳轉原理,讓我們理解從history.push,到組件頁面切換的全套流程,使我們在面試的時候不再為路由相關的問題發怵,廢話不說,讓我們開啟深入react-router源碼之旅吧。一 正確理解react-router1 理解單頁面應用什麼是單頁面應用?
  • Netty堆外內存洩漏排查,這一篇全講清楚了
    下可以用top命令查看),但Java堆內存佔用並不高(jmap命令查看),常見的使用堆外內存除了Netty,還有基於java.nio下相關接口申請堆外內存,JNI調用等,下面側重介紹Netty堆外內存洩漏問題排查堆外內存釋放底層實現1 java.nio堆外內存釋放Netty堆外內存是基於原生java.nio的DirectByteBuffer對象的基礎上實現的
  • Netty的EventLoop和線程模型
    基本的線程池化模式:從池的空閒線程列表中選擇一個 Thread,並被指派運行一個已提交的任務(Runnable 實現)任務完成時,將該 Thread 返回給該列表,使其被重用3.2 Netty#EventLoop 調度任務JDK 的ScheduledExecutorService 實現局限性作為線程池管理的部分功能,將有額外線程創建:若有大量任務被密集調度,這將成為瓶頸。
  • Netty 內存模型分析(一)ByteBuf總覽
    本文開始,將主要圍繞netty相關知識展開,力求從宏觀上把握整個內存結構。
  • 深度解析 new 原理及模擬實現
    如果構造函數沒有顯式返回一個對象,則使用步驟1創建的對象。模擬實現第一步new 是關鍵詞,不可以直接覆蓋。這裡使用 create 來模擬實現 new 的效果。new 返回一個新對象,通過 obj.不熟悉 apply / call 的點擊查看:【進階3-3期】深度解析 call 和 apply 原理、使用場景及實現不熟悉繼承的點擊查看:JavaScript常用八種繼承方案模擬實現第二步上面的代碼已經實現了 80%,現在繼續優化。
  • 社交媒體登錄Spring Social的源碼解析
    本文就通過對Spring Social源碼進行一下解析,從而在我們後續開發第三方媒體平臺的登錄認證功能時,能更加的清晰。一、Spring Social結構化角度解析源碼Spring Social是一個幫助我們連接社交媒體平臺,方便在我們自己的應用上開發第三方登錄認證等功能的Spring 類庫。其中比較核心的類和接口,如下圖所示,我們來一一解析。