由於在Java中創建一個實例的消耗不小,很多框架為了提高性能都使用對象池,Netty也不例外。
本文主要分析Netty對象池Recycler的實現原理。
源碼分析基於Netty 4.1.52
Recycler的內部類Stack負責管理緩存對象。
Stack關鍵欄位
// Stack所屬主線程,注意這裡使用了WeakReference
WeakReference<Thread> threadRef;
// 主線程回收的對象
DefaultHandle<?>[] elements;
// elements最大長度
int maxCapacity;
// elements索引
int size;
// 非主線程回收的對象
volatile WeakOrderQueue head;
Recycler將一個Stack劃分給某個主線程,主線程直接從Stack#elements中存取對象,而非主線程回收對象則存入WeakOrderQueue中。
DefaultHandle,對象的包裝類,在Recycler中緩存的對象都會包裝成DefaultHandle類。
head欄位指向的WeakOrderQueue,用於存放其他線程的對象
WeakOrderQueue主要屬性
// Head#link指向Link鍊表首對象
Head head;
// 指向Link鍊表尾對象
Link tail;
// 指向WeakOrderQueue鍊表下一對象
WeakOrderQueue next;
// 所屬線程
WeakReference<Thread> owner;
Link中也有一個DefaultHandle<?>[] elements欄位,負責存儲數據。
注意,Link繼承了AtomicInteger,AtomicInteger的值存儲elements的最新索引。
WeakOrderQueue也是屬於某個線程,並且WeakOrderQueue繼承了WeakReference<Thread>,當所屬線程消亡時,對應WeakOrderQueue也可以被垃圾回收。
注意:每個WeakOrderQueue都只屬於一個Stack,並且只屬於一個非主線程。
DefaultHandle#recycle -> Stack#push
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// #1
pushNow(item);
} else {
// #2
pushLater(item, currentThread);
}
}
#1 當前線程是主線程,直接將對象加入到Stack#elements中。
#2 當前線程非主線程,需要將對象放到對應的WeakOrderQueue中
private void pushLater(DefaultHandle<?> item, Thread thread) {
...
// #1
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
// #2
if (delayedRecycled.size() >= maxDelayedQueues) {
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// #3
if ((queue = newWeakOrderQueue(thread)) == null) {
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// #4
return;
}
// #5
queue.add(item);
}
#1 DELAYED_RECYCLED是一個FastThreadLocal,可以理解為Netty中的ThreadLocal優化類。它為每個線程維護了一個Map,存儲每個Stack和對應WeakOrderQueue。
所有這裡獲取的delayedRecycled變量是僅用於當前線程的。
而delayedRecycled.get獲取的WeakOrderQueue,是以Thread + Stack作為維度區分的,只能是一個線程操作。
#2 當前WeakOrderQueue數量超出限制,添加WeakOrderQueue.DUMMY作為標記
#3 構造一個WeakOrderQueue,加入到Stack#head指向的WeakOrderQueue鍊表中,並放入DELAYED_RECYCLED。這時是需要一下同步操作的。
#4 遇到WeakOrderQueue.DUMMY標記對象,直接拋棄對象
#5 將緩存對象添加到WeakOrderQueue中。
WeakOrderQueue#add
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
// #1
if (handleRecycleCount < interval) {
handleRecycleCount++;
return;
}
handleRecycleCount = 0;
Link tail = this.tail;
int writeIndex;
// #2
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
Link link = head.newLink();
if (link == null) {
return;
}
this.tail = tail = tail.next = link;
writeIndex = tail.get();
}
// #3
tail.elements[writeIndex] = handle;
handle.stack = null;
// #4
tail.lazySet(writeIndex + 1);
}
#1 控制回收頻率,避免WeakOrderQueue增長過快。
每8個對象都會拋棄7個,回收一個
#2 當前Link#elements已全部使用,創建一個新的Link
#3 存入緩存對象
#4 延遲設置Link#elements的最新索引(Link繼承了AtomicInteger),這樣在該stack主線程通過該索引獲取elements緩存對象時,保證elements中元素已經可見。
Recycler#threadLocal中存放了每個線程對應的Stack。
Recycler#get中首先獲取屬於當前線程的Stack,再從該Stack中獲取對象,也就是,每個線程只能從自己的Stack中獲取對象。
Recycler#get -> Stack#pop
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
// #1
if (!scavenge()) {
return null;
}
size = this.size;
if (size <= 0) {
return null;
}
}
// #2
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
this.size = size;
...
return ret;
}
#1 elements沒有可用對象時,將WeakOrderQueue中的對象遷移到elements
#2 從elements中取出一個緩存對象
scavenge -> scavengeSome -> WeakOrderQueue#transfer
boolean transfer(Stack<?> dst) {
Link head = this.head.link;
if (head == null) {
return false;
}
// #1
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
head = head.next;
this.head.relink(head);
}
// #2
final int srcStart = head.readIndex;
int srcEnd = head.get();
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
// #3
final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
// #4
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle<?> element = srcElems[i];
...
srcElems[i] = null;
// #5
if (dst.dropHandle(element)) {
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
// #6
if (srcEnd == LINK_CAPACITY && head.next != null) {
this.head.relink(head.next);
}
head.readIndex = srcEnd;
// #7
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
就是把WeakOrderQueue中的對象遷移到Stack中。
#1 head.readIndex 標誌現在已遷移對象下標
head.readIndex == LINK_CAPACITY,表示當前Link已全部移動,查找下一個Link
#2 計算待遷移對象數量
注意,Link繼承了AtomicInteger
#3 計算Stack#elements數組長度,不夠則擴容
#4 遍歷待遷移的對象
#5 控制回收頻率
#6 當前Link對象已全部移動,修改WeakOrderQueue#head的link屬性,指向下一Link,這樣前面的Link就可以被垃圾回收了。
#7 dst.size == newDstSize 表示並沒有對象移動,返回false
否則更新dst.size
其實對象池的實現難點在於線程安全。
Recycler中將主線程和非主線程對象回收劃分到不同的存儲空間中(stack#elements和WeakOrderQueue.Link#elements),並且對於WeakOrderQueue.Link#elements,存取操作劃分到兩端進行(非主線程從尾端存入,主線程從首部開始讀取),
從而減少同步操作,並保證線程安全。