前言
學習ConcurrentHashMap之前,我們還是先來複習一下CAS、volatile、樂觀鎖與悲觀鎖的知識吧(本文使用JDK1.8+)。
一、CAS算法介紹
CAS(Compare and swap)比較與交換, 是一種有名的無鎖算法,CAS的3個操作數內存值V舊的預期值A要修改的新值B若且唯若內存值V和預期值A相同時,就將內存值V修改為新值B,否則不做任何操作
當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值(A和內存值V相同時,將內存值V修改為B),而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試(否則什麼都不做)缺點:CAS操作是先取出內存值,然後才將內存值與期望值比較的,這樣可能會出現這樣一個問題,假如線程1獲取了內存值V,然後線程2也從內存中取出V,並且線程2進行了一些修改操作,將內存值修改為B,然後two又將內存值修改為V,當前線程的CAS操作無法分辨內存值V是否發生過變化,所以儘管CAS成功,但可能存在潛在的問題。解決使用AtomicStampedReference和AtomicMarkableReference實現標記的功能。小結:先比較是否相等,如果相等則替換(CAS算法)
CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。
CAS無鎖算法的C實現
int compare_and_swap (int* reg, int oldval, int newval) {ATOMIC(); int old_reg_val = *reg; if (old_reg_val == oldval) *reg = newval; END_ATOMIC(); return old_reg_val;}//CAS比較與交換的偽代碼可以表示為do{ 備份舊數據; 基於舊數據構造新數據; }while(!CAS( 內存地址,備份的舊數據,新數據 )) //假設有這樣一種場景,一個CPU去更新一個值,但如果想改得值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。 //就是指當兩者進行比較時,如果相等,則證明共享數據沒有被修改,替換成新值,然後繼續往下運行;如果不相等,說明共享數據已經被修改,放棄已經所做的操作,然後重新執行剛才的操作。容易看出 CAS 操作是基於共享數據不會被修改的假設,採用了類似於資料庫的 commit-retry 的模式。當同步衝突出現的機會很少時,這種假設能帶來較大的性能提升。
二、volatile介紹
volatile變量是一個更輕量級的同步機制,因為在使用這些變量時不會發生上下文切換和線程調度等操作,但是volatile變量也存在一些局限,比如不能用於構建原子的複合操作,因此當一個變量依賴舊值時就不能使用volatile變量,volatile內部已經做了synchronized。
volatile只能保證變量對各個線程的可見性,但不能保證原子性
保證該變量對所有線程的可見性在多線程的環境下:當這個變量修改時,所有的線程都會知道該變量被修改了,也就是所謂的「可見性」不保證原子性修改變量(賦值)實質上是在JVM中分了好幾步,而在這幾步內(從裝載變量到修改),它是不安全的。三、樂觀鎖與悲觀鎖
獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的情況,並且只有在確保其它線程不會造成幹擾的情況下執行,會導致其它所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操作,如果因為衝突失敗就重試,直到成功為止。
四、Java中的原子操作( atomic operations)
原子操作指的是在一步之內就完成而且不能被中斷,原子操作在多線程環境中是線程安全的,無需考慮同步的問題。
學習ConcurrentHashMap之前,先了解JDK1.7與JDK1.8的區別,本文使用JDK1.8+
五、JDK1.7和JDK1.8區別
1、JDK1.7
JDK1.7中ConcurrentHashMap是通過「鎖分段」來實現線程安全的。啥?什麼是「鎖分段」?其實就是ConcurrentHashMap將哈希表分成許多片段(segments),每個片段(table)都類似於HashMap,它有一個HashEntry數組,數組的每項又是HashEntry組成的鍊表。每個片段都是Segment類型的。
核心存儲結構是segment,它是繼承ReentrantLock的源碼如下:
static class Segment<K,V> extends ReentrantLock implements Serializable {private static final long serialVersionUID = 2249069246763182397L; final float loadFactor; Segment(float lf) { this.loadFactor = lf; } }
所以Segment本質上是一個可重入的互斥鎖,這樣每個片段都有了一個鎖,這就是「鎖分段」。
2、JDK1.8
在JDK1.8中,ConcurrentHashMap沒有用「鎖分段」來實現線程安全,而是使用CAS算法和synchronized來確保線程安全,但是底層segment並沒有被刪除的。
六、ConcurrentHashMap的重要內部類及構造方法
1、重要的內部類
//基本的內部類static class Node<K,V> implements Map.Entry<K,V> {......}//紅黑樹節點,供TreeBins使用static final class TreeNode<K,V> extends Node<K,V> {......}//紅黑樹結構,該類並不包裝key-value鍵值對,而是TreeNode的列表和它們的根節點,這個類含有讀寫鎖static final class TreeBin<K,V> extends Node<K,V> {......}//是包含一個nextTable指針,和find方法 static final class ForwardingNode<K,V> extends Node<K,V> {......}
2、構造方法
//第一個構造器//使用默認的初始表大小(16)創建一個新的容量//構造一個具有默認初始容量(16)、默認負載因子(0.75)和默認並發級別(16)的空ConcurrentHashMappublic ConcurrentHashMap() {}//第二個構造器//該構造函數是在構造一個具有指定容量、默認負載因子(0.75)和默認並發級別(16)的空ConcurrentHashMappublic ConcurrentHashMap(int initialCapacity) {this(initialCapacity, LOAD_FACTOR, 1);}//第三個構造器//構造一個與指定 Map 具有相同映射的 ConcurrentHashMap,其初始容量不小於 16 (具體依賴於指定Map的大小),負載因子是 0.75,並發級別是 16, 是 Java Collection Framework 規範推薦提供的public ConcurrentHashMap(Map<? extends K, ? extends V> m) {。。。。。。}//第四個構造器//該構造函數是在構造一個具有指定容量、指定負載因子和默認並發級別(16)的空ConcurrentHashMappublic ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1);//第五個構造器//該構造函數是在構造一個具有指定容量、指定負載因子和指定段數目/並發級別(若不是2的冪次方,則會調整為2的冪次方)的空ConcurrentHashMappublic ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {。。。。。。}
七、ConcurrentHashMap核心方法
1、put方法
其實只要讀懂之前HashMap的方法就輕鬆的,基本類似的,只不過ConcurrentHashMap多加幾個方法而已。
public V put(K key, V value) {return putVal(key, value, false);}//實現方法final V putVal(K key, V value, boolean onlyIfAbsent) { //鍵值對都不允許為空,如果為空就NullPointerException() if (key == null || value == null) throw new NullPointerException(); //計算哈希值 //spread=(h ^ (h >>> 16)) & HASH_BITS;保證了hash >= 0 int hash = spread(key.hashCode()); int binCount = 0; //死循環,只有插入成功才能跳出循環 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果table沒有初始化,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //根據哈希值計算在table中的位置 //先1,Node<K,V>(hash, key, value, null) //後2,casTabAt else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果這個位置沒有值,直接將鍵值對放進去,不需要加鎖 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //退出,如果直接執行addCount(1L, binCount); //說明節點不是被替換的,是被插入的,則將map的元素數量加1 break; // no lock when adding to empty bin } //如果要插入的位置,MOVED為-1,則頭節點為forwordingNode節點,表明該位置正在進行擴容 //如果tab[i]不為空並且hash值為MOVED, //說明該鍊表正在進行transfer操作,返回擴容完成後的table else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //這一步,說明要插入的位置有值,需要加鎖 synchronized (f) { //確定f是tab中的頭節點 if (tabAt(tab, i) == f) { //如果頭結點的哈希值大於等於0,說明要插入的節點在鍊表中 if (fh >= 0) { binCount = 1; //遍歷鍊表中的所有節點 for (Node<K,V> e = f;; ++binCount) { K ek; //如果某一節點的key哈希值和key與參數相等,替換節點的value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //遍歷到了最後一個節點,還沒找到key對應的節點,根據參數新建節點,插入鍊表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null);break;}}} //如果首節點為TreeBin類型,說明為紅黑樹結構,執行putTreeVal操作 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //判斷鍊表是否需要轉變為樹 if (binCount != 0) { //判斷節點數量是否>=8,如果大於就需要把鍊表轉化成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) //若key存在則返回舊的value值 return oldVal; // 若key不存在,則說明構造了一個新節點,跳出循環,調用addCount break;}}} //能執行到這一步,說明節點不是被替換的,是被插入的 //所以將map的元素數量增加1,有可能觸發transfer操作(擴容) addCount(1L, binCount); return null;}//addCount方法,table結構使用了synchronized,有興趣可以進入源碼查看,這裡就不一一羅列了private final void addCount(long x, int check) {。。。。。。}
ConcurrentHashMap的put方法實現步驟;
1、首先,判斷key和value是否為null,其中一個為null,則拋出NullPointerException()。
註:ConcurrentHashMap的key和Value都不能為null
2、計算哈希值:spread(key.hashCode());
3、根據哈希值計算放在table中的位置
4、通過哈希值執行插入或替換操作
如果這個位置沒有值,直接將鍵值對放進去,不需要加鎖如果要插入的位置是一個forwordingNode節點,表示正在擴容,那麼當前線程幫助擴容3.3 加鎖。以下操作都需要加鎖如果要插入的節點在鍊表中,遍歷鍊表中的所有節點,如果某一節點的key哈希值和key與參數相等,替換節點的value,記錄被替換的值;如果遍歷到了最後一個節點,還沒找到key對應的節點,根據參數新建節點,插入鍊表尾部如果要插入的節點在樹中,則按照樹的方式插入或替換節點。如果是替換操作,記錄被替換的值5、判斷如果節點數量是大於8,就將鍊表轉化成紅黑樹(binCount >= TREEIFY_THRESHOLD)
6、如果操作3中執行的是替換操作,返回被替換的value,然後程序結束
7、如果能執行到這一步,說明節點不是被替換的,是被插入的,所以要將map的元素數量加1
為什麼ConcurrentHashMap效率高於HashTable?
從上面源碼已經了解了ConcurrentHashMap,它通過在鍊表上加鎖來實現同步的。則看出ConcurrentHashMap其實就多增加了鎖的個數,效率效率就提高;而HashTable是通過在每個方法上加Synchronized來實現同步的,這樣使得效率略低於ConcurrentHashMap的原因。
八、為什麼在高並發的情況下高效?
ConcurrentHashMap的原始碼會涉及到散列算法和鍊表數據結構,所以,讀者需要對散列算法和基於鍊表的數據結構有所了解,特別是對HashMap的進一步了解和回顧。
ConcurrentHashMap中,無論是讀操作還是寫操作都能保證很高的性能:在進行讀操作時(幾乎)不需要加鎖,而在寫操作時通過鎖分段技術只對所操作的段加鎖而不影響客戶端對其它段的訪問。特別地,在理想狀態下,ConcurrentHashMap 可以支持 16 個線程執行並發寫操作(如果並發級別設為16),及任意數量線程的讀操作。
ConcurrentHashMap本質上是一個Segment數組,而一個Segment實例又包含若干個桶,每個桶中都包含一條由若干個 HashEntry 對象連結起來的鍊表
ConcurrentHashMap的高效並發機制是通過以下三方面來保證的
通過鎖分段技術保證並發環境下的寫操作通過 HashEntry的不變性、Volatile變量的內存可見性和加鎖重讀機制保證高效、安全的讀操作;通過不加鎖和加鎖兩種方案控制跨段操作的的安全性。本篇文章如有錯的地方,歡迎在評論指正
另,如果覺得這本篇文章寫得不錯,有點東西的話,各位人才記得來個三連【點讚+關注+分享】這就是我創作最大的動力。
下一遍可能會講解高並發,敬請期待哦。。。