我是這樣給阿里面試官吹 ConcurrentHashMap的

2021-02-14 菜鳥要飛

公眾號關注 「菜鳥要飛」

ConcurrentHashMap主要分為JDK<=7跟JDK>=8的兩個版本,ConcurrentHashMap的空間利用率更低一般只有10%~20%,接下來分別介紹。

JDK7

先宏觀說下JDK7中的大致組成,ConcurrentHashMap由Segment數組結構和HashEntry數組組成。Segment是一種可重入鎖,是一種數組和鍊表的結構,一個Segment中包含一個HashEntry數組,每個HashEntry又是一個鍊表結構。正是通過Segment分段鎖,ConcurrentHashMap實現了高效率的並發。缺點是並發程度是有segment數組來決定的,並發度一旦初始化無法擴容。先繪製個ConcurrentHashMap的形象直觀圖。要想理解currentHashMap,可以簡單的理解為將數據「分表分庫」。ConcurrentHashMap是由 Segment 數組 結構和HashEntry 數組 結構組成。

❝Segment 是一種可重入鎖ReentrantLock的子類 ,在 ConcurrentHashMap 裡扮演鎖的角色,HashEntry則用於存儲鍵值對數據。ConcurrentHashMap 裡包含一個 Segment 數組來實現鎖分離,Segment的結構和 HashMap 類似,一個 Segment裡包含一個 HashEntry 數組,每個 HashEntry 是一個鍊表結構的元素, 每個 Segment守護者一個 HashEntry 數組裡的元素,當對 HashEntry數組的數據進行修改時,必須首先獲得它對應的 Segment 鎖。❞
static final class Segment<K,V> extends ReentrantLock implements Serializable {
     transient volatile HashEntry<K,V>[] table; //包含一個HashMap 可以理解為
}

可以理解為我們的每個segment都是實現了Lock功能的HashMap。如果我們同時有多個segment形成了segment數組那我們就可以實現並發咯。

我們看下currentHashMap的構造函數,先總結幾點。每一個segment裡面包含的table(HashEntry數組)初始化大小也一定是2的次冪initialCapacity:初始容量大小 ,默認16。loadFactor: 擴容因子,默認0.75,當一個Segment存儲的元素數量大於initialCapacity* loadFactor時,該Segment會進行一次擴容。concurrencyLevel:並發度,默認16。並發度可以理解為程序運行時能夠「同時更新」ConccurentHashMap且不產生鎖競爭的最大線程數,實際上就是ConcurrentHashMap中的分段鎖個數,即Segment[]的數組長度。如果並發度設置的過小,會帶來嚴重的鎖競爭問題;如果並發度設置的過大,原本位於同一個Segment內的訪問會擴散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。

構造函數詳解:

   //initialCapacity 是我們保存所以KV數據的初始值
   //loadFactor這個就是HashMap的負載因子
   // 我們segment數組的初始化大小
      @SuppressWarnings("unchecked")
       public ConcurrentHashMap(int initialCapacity,
                                float loadFactor, int concurrencyLevel) {
           if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
               throw new IllegalArgumentException();
           if (concurrencyLevel > MAX_SEGMENTS) // 最大允許segment的個數,不能超過 1< 24
               concurrencyLevel = MAX_SEGMENTS;
           int sshift = 0; // 類似擾動函數
           int ssize = 1; 
           while (ssize < concurrencyLevel) {
               ++sshift;
               ssize <<= 1; // 確保segment一定是2次冪
           }
           this.segmentShift = 32 - sshift;  
           //有點類似與擾動函數,跟下面的參數配合使用實現 當前元素落到那個segment上面。
           this.segmentMask = ssize - 1; // 為了 取模 專用
           if (initialCapacity > MAXIMUM_CAPACITY) //不能大於 1< 30
               initialCapacity = MAXIMUM_CAPACITY;
   
           int c = initialCapacity / ssize; //總的數組大小 被 segment 分散後 需要多少個table
           if (c * ssize < initialCapacity)
               ++c; //確保向上取值
           int cap = MIN_SEGMENT_TABLE_CAPACITY; 
           // 每個table初始化大小為2
           while (cap < c) // 單獨的一個segment[i] 對應的table 容量大小。
               cap <<= 1;
           // 將table的容量初始化為2的次冪
           Segment<K,V> s0 =
               new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
               // 負載因子,閾值,每個segment的初始化大小。跟hashmap 初始值類似。
               // 並且segment的初始化是懶加載模式,剛開始只有一個s0,其餘的在需要的時候才會增加。
           Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
           UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
           this.segments = ss;
       }

hash 不管是我們的get操作還是put操作要需要通過hash來對數據進行定位。
   //  整體思想就是通過多次不同方式的位運算來努力將數據均勻的分不到目標table中,都是些擾動函數
   private int hash(Object k) {
       int h = hashSeed;
       if ((0 != h) && (k instanceof String)) {
           return sun.misc.Hashing.stringHash32((String) k);
       }
       h ^= k.hashCode();
       // single-word Wang/Jenkins hash.
       h += (h <<  15) ^ 0xffffcd7d;
       h ^= (h >>> 10);
       h += (h <<   3);
       h ^= (h >>>  6);
       h += (h <<   2) + (h << 14);
       return h ^ (h >>> 16);
   }

get 相對來說比較簡單,無非就是通過hash找到對應的segment,繼續通過hash找到對應的table,然後就是遍歷這個鍊表看是否可以找到,並且要注意 get的時候是沒有加鎖的。
   public V get(Object key) {
       Segment<K,V> s;
       HashEntry<K,V>[] tab;
       int h = hash(key); // JDK7中標準的hash值獲取算法
       long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到對應的segment上
       if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
           //  無非就是獲得hash值對應的segment 是否存在,
           for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                e != null; e = e.next) {
               // 看下這個hash值對應的是segment(HashEntry)中的具體位置。然後遍歷查詢該鍊表
               K k;
               if ((k = e.key)  key || (e.hash  h && key.equals(k)))
                   return e.value;
           }
       }
       return null;
   }

put 相同的思路,先找到hash值對應的segment位置,然後看該segment位置是否初始化了(因為segment是懶加載模式)。選擇性初始化,最終執行put操作。
   @SuppressWarnings("unchecked")
   public V put(K key, V value) {
       Segment<K,V> s;
       if (value  null)
           throw new NullPointerException();
       int hash = hash(key);// 還是獲得最終hash值
       int j = (hash >>> segmentShift) & segmentMask; // hash值位操作對應的segment數組位置
       if ((s = (Segment<K,V>)UNSAFE.getObject          
            (segments, (j << SSHIFT) + SBASE))  null)
           s = ensureSegment(j); 
       // 初始化時候因為只有第一個segment,如果落在了其餘的segment中 則需要現初始化。
       return s.put(key, hash, value, false);
       // 直接在數據中執行put操作。
   }

其中put操作基本思路跟HashMap幾乎一樣,只是在開始跟結束進行了加鎖的操作tryLock and unlock,然後JDK7中都是先擴容再添加數據的,並且獲得不到鎖也會進行自旋的tryLock或者lock阻塞排隊進行等待(同時獲得鎖前提前new出新數據)。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖,獲取失敗嘗試獲取自旋鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 內部的數組
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求應該放置的數組下標
        int index = (tab.length - 1) & hash;
        // first 是數組該位置處的鍊表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
 
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key)  key ||
                    (e.hash  hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順著鍊表走
                e = e.next;
            }
            else {
                // node 是不是 null,這個要看獲取鎖的過程。沒獲得鎖的線程幫我們創建好了節點,直接頭插法
                // 如果不為 null,那就直接將它設置為鍊表表頭;如果是 null,初始化並設置為鍊表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容
                else
                    // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
                    // 將新的結點設置成原鍊表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}

如果加鎖失敗了調用scanAndLockForPut,完成查找或新建節點的工作。當獲取到鎖後直接將該節點加入鍊表即可,「提升」了put操作的性能,這裡涉及到自旋。大致過程:

❝在我獲取不到鎖的時候我進行tryLock,準備好new的數據,同時還有一定的次數限制,還要考慮別的已經獲得線程的節點修改該頭節點。❞
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
 
    // 循環獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e  null) {
                if (node  null) // speculatively create node
              // 進到這裡說明數組該位置的鍊表是空的,沒有任何元素
             // 當然,進到這裡的另一個原因是 tryLock() 失敗,所以該槽存在並發,不一定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順著鍊表往下走
                e = e.next;
        }
    // 重試次數如果超過 MAX_SCAN_RETRIES(單核 1 次多核 64 次),那麼不搶了,進入到阻塞隊列等待鎖
    //    lock() 是阻塞方法,直到獲取鎖後返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1)  0 &&
                 // 進入這裡,說明有新的元素進到了鍊表,並且成為了新的表頭
                 // 這邊的策略是,重新執行 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

Size

這個size方法比較有趣,他是先無鎖的統計下所有的數據量看下前後兩次是否數據一樣,如果一樣則返回數據,如果不一樣則要把全部的segment進行加鎖,統計,解鎖。並且size方法只是返回一個統計性的數字,因此size謹慎使用哦。

public int size() {
       // Try a few times to get accurate count. On failure due to
       // continuous async changes in table, resort to locking.
       final Segment<K,V>[] segments = this.segments;
       int size;
       boolean overflow; // true if size overflows 32 bits
       long sum;         // sum of modCounts
       long last = 0L;   // previous sum
       int retries = -1; // first iteration isn't retry
       try {
           for (;;) {
               if (retries++  RETRIES_BEFORE_LOCK) {  //  超過2次則全部加鎖
                   for (int j = 0; j < segments.length; ++j)
                       ensureSegment(j).lock(); // 直接對全部segment加鎖消耗性太大
               }
               sum = 0L;
               size = 0;
               overflow = false;
               for (int j = 0; j < segments.length; ++j) {
                   Segment<K,V> seg = segmentAt(segments, j);
                   if (seg != null) {
                       sum += seg.modCount; // 統計的是modCount,涉及到增刪該都會加1
                       int c = seg.count;
                       if (c < 0 || (size += c) < 0)
                           overflow = true;
                   }
               }
               if (sum  last) // 每一個前後的修改次數一樣 則認為一樣,但凡有一個不一樣則直接break。
                   break;
               last = sum;
           }
       } finally {
           if (retries > RETRIES_BEFORE_LOCK) {
               for (int j = 0; j < segments.length; ++j)
                   segmentAt(segments, j).unlock();
           }
       }
       return overflow ? Integer.MAX_VALUE : size;
   }

rehashsegment 數組初始化後就不可變了,也就是說「並發性不可變」,不過segment裡的table可以擴容為2倍,該方法沒有考慮並發,因為執行該方法之前已經獲取了鎖。其中JDK7中的rehash思路跟JDK8 中擴容後處理鍊表的思路一樣,個人不過感覺沒有8寫的精髓好看。
// 方法參數上的 node 是這次擴容後,需要添加到新的數組中的數據。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創建新數組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進位 『000...00011111』
    int sizeMask = newCapacity - 1;
    // 遍歷原數組,將原數組位置 i 處的鍊表拆分到 新數組位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鍊表的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應該放置在新數組中的位置,
            // 假設原數組長度為 16,e 在 oldTable[3] 處,那麼 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask; // 新位置
            if (next  null)   // 該位置處只有一個元素
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鍊表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前鍊表的頭結點 e 的新位置
                int lastIdx = idx;
                // for 循環找到一個 lastRun 結點,這個結點之後的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之後的所有結點組成的這個鍊表放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的結點,
                //這些結點可能分配在另一個鍊表中,也可能分配到上面的那個鍊表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數組中剛剛的 兩個鍊表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

CAS操作 在JDK7裡在ConcurrentHashMap中通過原子操作sun.misc.Unsafe查找元素、替換元素和設置元素。通過這樣的硬體級別獲得數據可以保證及時是多線程我也每次獲得的數據是最新的。這些原子操作起著非常關鍵的作用,你可以在所有ConcurrentHashMap的基本功能中看到,隨機距離如下:
     final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab  null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }
   static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

常見問題ConcurrentHashMap實現原理是怎麼樣的或者ConcurrentHashMap如何在保證高並發下線程安全的同時實現了性能提升?❝

ConcurrentHashMap允許多個修改操作並發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不同部分進行的修改。內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,只要多個修改操作發生在不同的段上,它們就可以並發進行。

❞❝

用於存儲鍵值對數據的HashEntry,在設計上它的成員變量value跟next都是volatile類型的,這樣就保證別的線程對value值的修改,get方法可以馬上看到。

ConcurrentHashMap的弱一致性體現在迭代器,clear和get方法,原因在於沒有加鎖。

比如迭代器在遍歷數據的時候是一個Segment一個Segment去遍歷的,如果在遍歷完一個Segment時正好有一個線程在剛遍歷完的Segment上插入數據,就會體現出不一致性。clear也是一樣。get方法和containsKey方法都是遍歷對應索引位上所有節點,都是不加鎖來判斷的,如果是修改性質的因為可見性的存在可以直接獲得最新值,不過如果是新添加值則無法保持一致性。JDK8

JDK8相比與JDK7主要區別如下:

❝取消了segment數組,直接用table保存數據,鎖的粒度更小,減少並發衝突的概率。採用table數組元素作為鎖,從而實現了對每一行數據進行加鎖,進一步減少並發衝突的概率,並發控制使用Synchronized和CAS來操作。❞❝

private static final int MAXIMUM_CAPACITY = 1 << 30; // 數組的最大值 

private static final int DEFAULT_CAPACITY = 16; // 默認數組長度 

static final int TREEIFY_THRESHOLD = 8; // 鍊表轉紅黑樹的一個條件 

static final int UNTREEIFY_THRESHOLD = 6; // 紅黑樹轉鍊表的一個條件 

static final int MIN_TREEIFY_CAPACITY = 64; // 鍊表轉紅黑樹的另一個條件

static final int MOVED     = -1;  // 表示正在擴容轉移 

static final int TREEBIN   = -2; // 表示已經轉換成樹 

static final int RESERVED  = -3; // hash for transient reservations 

static final int HASH_BITS = 0x7fffffff; // 獲得hash值的輔助參數

transient volatile Node<K,V>[] table;// 默認沒初始化的數組,用來保存元素 

private transient volatile Node<K,V>[] nextTable; // 轉移的時候用的數組 

static final int NCPU = Runtime.getRuntime().availableProcessors();// 獲取可用的CPU個數 

private transient volatile Node<K,V>[] nextTable; // 連接表,用於哈希表擴容,擴容完成後會被重置為 null 

private transient volatile long baseCount;保存著整個哈希表中存儲的所有的結點的個數總和,有點類似於 HashMap 的 size 屬性。private transient volatile int sizeCtl; 

負數:表示進行初始化或者擴容,-1:表示正在初始化,-N:表示有 N-1 個線程正在進行擴容 正數:0 表示還沒有被初始化,> 0的數:初始化或者是下一次進行擴容的閾值,有點類似HashMap中的threshold,不過功能「更強大」

      static class Node<K,V> implements Map.Entry<K,V> {
              final int hash;    // key的hash值
              final K key;       // key
              volatile V val;    // value
              volatile Node<K,V> next; 
               //表示鍊表中的下一個節點
      }

TreeNode繼承於Node,用來存儲紅黑樹節點
      static final class TreeNode<K,V> extends Node<K,V> {
              TreeNode<K,V> parent;  
              // 紅黑樹的父親節點
              TreeNode<K,V> left;
              // 左節點
              TreeNode<K,V> right;
             // 右節點
              TreeNode<K,V> prev;    
             // 前節點
              boolean red;
             // 是否為紅點
      }

ForwardingNode 在 Node 的子類 ForwardingNode 的構造方法中,可以看到此變量的hash = 「-1」 ,類中還存儲nextTable的引用。該初始化方法只在 transfer方法被調用,如果一個類被設置成此種情況並且hash = -1 則說明該節點不需要resize了。
static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //注意這裡
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
 //
}

TreeBin TreeBin從字面含義中可以理解為存儲樹形結構的容器,而樹形結構就是指TreeNode,所以TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制.
static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
}

構造函數

整體的構造情況基本跟HashMap類似,並且為了跟原來的JDK7中的兼容性還可以傳入並發度。不過JDK8中並發度已經有table的具體長度來控制了。

❝ConcurrentHashMap():創建一個帶有默認初始容量 (16)、加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射ConcurrentHashMap(int):創建一個帶有指定初始容量tableSizeFor、默認加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射ConcurrentHashMap(Map<? extends K, ? extends V> m):構造一個與給定映射具有相同映射關係的新映射ConcurrentHashMap(int initialCapacity, float loadFactor):創建一個帶有指定初始容量、加載因子和默認 concurrencyLevel (1) 的新的空映射ConcurrentHashMap(int, float, int):創建一個帶有指定初始容量、加載因子和並發級別的新的空映射。❞put

假設table已經初始化完成,put操作採用 CAS + synchronized 實現並發插入或更新操作,具體實現如下。

❝沒初始化就初始化,初始化後看下對應的桶是否為空,為空就原子性的嘗試插入。用syn來加鎖當前節點,然後操作幾乎跟就跟hashmap一樣了。❞

// Node 節點的 hash值在HashMap中存儲的就是hash值,在currenthashmap中可能有多種情況哦!
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key  null || value  null) throw new NullPointerException(); //邊界處理
    int hash = spread(key.hashCode());// 最終hash值計算
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //循環表
        Node<K,V> f; int n, i, fh;
        if (tab  null || (n = tab.length)  0)
            tab = initTable(); // 初始化表 如果為空,懶漢式
        else if ((f = tabAt(tab, i = (n - 1) & hash))  null) {
        // 如果對應桶位置為空
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 
                         // CAS 原子性的嘗試插入
                break;
        } 
        else if ((fh = f.hash)  MOVED) 
        // 如果當前節點正在擴容。還要幫著去擴容。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) { //  桶存在數據 加鎖操作進行處理
                if (tabAt(tab, i)  f) {
                    if (fh >= 0) { // 如果存儲的是鍊表 存儲的是節點的hash值
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 遍歷鍊表去查找,如果找到key一樣則選擇性
                            if (e.hash  hash &&
                                ((ek = e.key)  key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next)  null) {// 找到尾部插入
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {// 如果桶節點類型為TreeBin
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { 
                             // 嘗試紅黑樹插入,同時也要防止節點本來就有,選擇性覆蓋
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { // 如果鍊表數量
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); //  鍊表轉紅黑樹哦!
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 統計大小 並且檢查是否要擴容。
    return null;
}

涉及到重要函數initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函數。

initTable

「只允許一個線程」對表進行初始化,如果不巧有其他線程進來了,那麼會讓其他線程交出 CPU 等待下次系統調度Thread.yield。這樣,保證了表同時只會被一個線程初始化,對於table的大小,會根據sizeCtl的值進行設置,如果沒有設置szieCtl的值,那麼默認生成的table大小為16,否則,會根據sizeCtl的大小設置table大小。

// 容器初始化 操作
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table)  null || tab.length  0) {
        if ((sc = sizeCtl) < 0) // 如果正在初始化-1,-N 正在擴容。
            Thread.yield(); // 進行線程讓步等待
     // 讓掉當前線程 CPU 的時間片,使正在運行中的線程重新變成就緒狀態,並重新競爭 CPU 的調度權。
     // 它可能會獲取到,也有可能被其他線程獲取到。
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 
          //  比較sizeCtl的值與sc是否相等,相等則用 -1 替換,這表明我這個線程在進行初始化了!
            try {
                if ((tab = table)  null || tab.length  0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默認為16
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2); // sc = 0.75n
                }
            } finally {
                sizeCtl = sc; //設置sizeCtl 類似threshold
            }
            break;
        }
    }
    return tab;
}

unsafe

在ConcurrentHashMap中使用了unSafe方法,通過直接操作內存的方式來保證並發處理的安全性,使用的是硬體的安全機制。

 // 用來返回節點數組的指定位置的節點的原子操作
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// cas原子操作,在指定位置設定值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子操作,在指定位置設定值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

// 比較table數組下標為i的結點是否為c,若為c,則用v交換操作。否則,不進行交換操作。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

可以看到獲得table[i]數據是通過Unsafe對象通過反射獲取的,取數據直接table[index]不可以麼,為什麼要這麼複雜?在java內存模型中,我們已經知道每個線程都有一個工作內存,裡面存儲著table的「副本」,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接獲取指定內存的數據,「保證了每次拿到數據都是最新的」

helpTransfer
// 可能有多個線程在同時幫忙運行helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // table不是空  且 node節點是轉移類型,並且轉移類型的nextTable 不是空 說明還在擴容ing
        int rs = resizeStamp(tab.length); 
        // 根據 length 得到一個前16位的標識符,數組容量大小。
        // 確定新table指向沒有變,老table數據也沒變,並且此時 sizeCtl小於0 還在擴容ing
        while (nextTab  nextTable && table  tab && (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 || sc  rs + MAX_RESIZERS || transferIndex <= 0)
            // 1. sizeCtl 無符號右移16位獲得高16位如果不等 rs 標識符變了
            // 2. 如果擴容結束了 這裡可以看 trePresize 函數第一次擴容操作:
            // 默認第一個線程設置 sc = rs 左移 16 位 + 2,當第一個線程結束擴容了,
            // 就會將 sc 減一。這個時候,sc 就等於 rs + 1。
            // 3. 如果達到了最大幫助線程個數 65535個
            // 4. 如果轉移下標調整ing 擴容已經結束了
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
            // 如果以上都不是, 將 sizeCtl + 1,增加一個線程來擴容
                transfer(tab, nextTab); // 進行轉移
                break;// 結束循環
            }
        }
        return nextTab;
    }
    return table;
}

Integer.numberOfLeadingZeros(n)❝

該方法的作用是「返回無符號整型i的最高非零位前面的0的個數」,包括符號位在內;如果i為負數,這個方法將會返回0,符號位為1. 比如說,10的二進位表示為 0000 0000 0000 0000 0000 0000 0000 1010 java的整型長度為32位。那麼這個方法返回的就是28

❞resizeStamp 主要用來獲得標識符,可以簡單理解是對當前系統容量大小的一種監控。
static final int resizeStamp(int n) {
   return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 
   //RESIZE_STAMP_BITS = 16
}

addCount

主要就2件事:一是更新 baseCount,二是判斷是否需要擴容。

private final void addCount(long x, int check) {
 CounterCell[] as; long b, s;
 // 首先如果沒有並發 此時countCells is null, 此時嘗試CAS設置數據值。
 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
     // 如果 counterCells不為空以為此時有並發的設置 或者 CAS設置 baseCount 失敗了
     CounterCell a; long v; int m;
     boolean uncontended = true;
     if (as  null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m])  null ||
         !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
         // 1. 如果沒出現並發 此時計數盒子為 null
         // 2. 隨機取出一個數組位置發現為空
         // 3. 出現並發後修改這個cellvalue 失敗了
         // 執行funAddCount
         fullAddCount(x, uncontended);// 死循環操作
         return;
     }
     if (check <= 1)
         return;
     s = sumCount(); // 吧counterCells數組中的每一個數據進行累加給baseCount。
 }
 // 如果需要擴容
 if (check >= 0) {
  Node<K,V>[] tab, nt; int n, sc;
  while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
   int rs = resizeStamp(n);// 獲得高位標識符
   if (sc < 0) { // 是否需要幫忙去擴容
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
     sc  rs + MAX_RESIZERS || (nt = nextTable)  null || transferIndex <= 0)
     break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     transfer(tab, nt);
   } // 第一次擴容
   else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
   s = sumCount();
  }
 }
}

baseCount添加ConcurrentHashMap提供了baseCount、counterCells 兩個輔助變量和一個 CounterCell輔助內部類。sumCount() 就是迭代 counterCells來統計 sum 的過程。put 操作時,肯定會影響 size(),在 put() 方法最後會調用 addCount()方法。整體的思維方法跟LongAdder類似,用的思維就是借鑑的ConcurrentHashMap。每一個Cell都用Contended修飾來避免偽共享。❝JDK1.7 和 JDK1.8 對 size 的計算是不一樣的。1.7 中是先不加鎖計算三次,如果三次結果不一樣在加鎖。JDK1.8 size 是通過對 baseCount 和 counterCell 進行 CAS 計算,最終通過 baseCount 和 遍歷 CounterCell 數組得出 size。JDK 8 推薦使用mappingCount 方法,因為這個方法的返回值是 long 類型,不會因為 size 方法是 int 類型限制最大值。❞關於擴容在addCount第一次擴容時候會有騷操作sc=rs << RESIZE_STAMP_SHIFT) + 2)其中rs = resizeStamp(n)。這裡需要核心說一點,

如果不是第一次擴容則直接將低16位的數字 +1 即可。

putTreeVal

這個操作幾乎跟HashMap的操作完全一樣,核心思想就是一定要決定向左還是向右然後最終嘗試放置新數據,然後balance。不同點就是有鎖的考慮。

treeifyBin

這裡的基本思路跟HashMap幾乎一樣,不同點就是先變成TreeNode,然後是「單向鍊表」串聯。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //如果整個table的數量小於64,就擴容至原來的一倍,不轉紅黑樹了
        //因為這個閾值擴容可以減少hash衝突,不必要去轉紅黑樹
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) { //鎖定當前桶
                if (tabAt(tab, index)  b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        //遍歷這個鍊表然後將每個節點封裝成TreeNode,最終單鍊表串聯起來,
                        // 最終 調用setTabAt 放置紅黑樹
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl)  null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //通過TreeBin對象對TreeNode轉換成紅黑樹
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

TreeBin

主要功能就是鍊表變化為紅黑樹,這個紅黑樹用TreeBin來包裝。並且要注意 轉成紅黑樹以後以前鍊表的結構信息還是有的,最終信息如下:

TreeBin.first = 鍊表中第一個節點。TreeBin.root = 紅黑樹中的root節點。
TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);   
            //創建空節點 hash = -2 
            this.first = b;
            TreeNode<K,V> r = null; // root 節點
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r  null) {
                    x.parent = null;
                    x.red = false;
                    r = x; // root 節點設置為x 
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                   // x代表的是轉換為樹之前的順序遍歷到鍊表的位置的節點,r代表的是根節點
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc  null &&
                                  (kc = comparableClassFor(k))  null) ||
                                 (dir = compareComparables(kc, k, pk))  0)
                            dir = tieBreakOrder(k, pk);    
                            // 當key不可以比較,或者相等的時候採取的一種排序措施
                            TreeNode<K,V> xp = p;
                        // 放一定是放在葉子節點上,如果還沒找到葉子節點則進行循環往下找。
                        // 找到了目前葉子節點才會進入 再放置數據
                        if ((p = (dir <= 0) ? p.left : p.right)  null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x); 
                     // 每次插入一個元素的時候都調用 balanceInsertion 來保持紅黑樹的平衡
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

tryPresize

當數組長度小於64的時候,擴張數組長度一倍,調用此函數。擴容後容量大小的核對,可能涉及到初始化容器大小。並且擴容的時候又跟2的次冪聯繫上了!,其中初始化時候傳入map會調用putAll方法直接put一個map的話,在「putAll」方法中沒有調用initTable方法去初始化table,而是直接調用了tryPresize方法,所以這裡需要做一個是不是需要初始化table的判斷。

PS:默認第一個線程設置 sc = rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等於 rs + 1,這個時候說明擴容完畢了。

     /**
     * 擴容表為指可以容納指定個數的大小(總是2的N次方)
     * 假設原來的數組長度為16,則在調用tryPresize的時候,size參數的值為16<<1(32),此時sizeCtl的值為12
     * 計算出來c的值為64, 則要擴容到 sizeCtl ≥ c
     *  第一次擴容之後 數組長:32 sizeCtl:24
     *  第三次擴容之後 數組長:128 sizeCtl:96 退出
     */
    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1); // 合理範圍
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
                if (tab  null || (n = tab.length)  0) {
                // 初始化傳入map,今天putAll會直接調用這個。
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                //初始化tab的時候,把 sizeCtl 設為 -1
                    try {
                        if (table  tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2); // sc=sizeCtl = 0.75n
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
             // 初始化時候如果  數組容量<=sizeCtl 或 容量已經最大化了則退出
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                    break;//退出擴張
            }
            else if (tab  table) {
                int rs = resizeStamp(n);

                if (sc < 0) { // sc = siztCtl 如果正在擴容Table的話,則幫助擴容
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
                        sc  rs + MAX_RESIZERS || (nt = nextTable)  null ||
                        transferIndex <= 0)
                        break; // 各種條件判斷是否需要加入擴容工作。
                     // 幫助轉移數據的線程數 + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                 // 沒有在初始化或擴容,則開始擴容
                 // 此處切記第一次擴容 直接 +2 
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                      (rs << RESIZE_STAMP_SHIFT) + 2)) {
                        transfer(tab, null);
                }
            }
        }
    }

transfer

這裡代碼量比較大主要分文三部分,並且感覺思路很精髓,尤其「是其他線程幫著去擴容的騷操作」

主要是 單個線程能處理的最少桶結點個數的計算和一些屬性的初始化操作。每個線程進來會先領取自己的任務區間[bound,i],然後開始 --i 來遍歷自己的任務區間,對每個桶進行處理。如果遇到桶的頭結點是空的,那麼使用 ForwardingNode標識舊table中該桶已經被處理完成了。如果遇到已經處理完成的桶,直接跳過進行下一個桶的處理。如果是正常的桶,對桶首節點加鎖,正常的遷移即可(跟HashMap第三部分一樣思路),遷移結束後依然會將原表的該位置標識位已經處理。

該函數中的finish= true 則說明整張表的遷移操作已經「全部」完成了,我們只需要重置 table的引用並將 nextTable 賦為空即可。否則,CAS 式的將 sizeCtl減一,表示當前線程已經完成了任務,退出擴容操作。如果退出成功,那麼需要進一步判斷當前線程是否就是最後一個在執行擴容的。

f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
   return;

第一次擴容時在addCount中有寫到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示當前只有一個線程正在工作,「相對應的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,說明當前線程就是最後一個還在擴容的線程,那麼會將 finishing 標識為 true,並在下一次循環中退出擴容方法。

幾乎跟HashMap大致思路類似的遍歷鍊表/紅黑樹然後擴容操作。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用來控制不要佔用太多CPU
        stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16 每個CPU處理最小長度個數

    if (nextTab  null) { // 新表格為空則直接新建二倍,別的輔助線程來幫忙擴容則不會進入此if條件
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // transferIndex 指向最後一個桶,方便從後向前遍歷
    }
    int nextn = nextTab.length; // 新表長度
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 創建一個fwd節點,這個是用來控制並發的,當一個節點為空或已經被轉移之後,就設置為fwd節點
    boolean advance = true;    //是否繼續向前查找的標誌位
    boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍數組,看看有沒完成的沒
     // 第一部分
    // i 指向當前桶, bound 指向當前線程需要處理的桶結點的區間下限【bound,i】 這樣來跟線程劃分任務。
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
       // 這個 while 循環的目的就是通過 --i 遍歷當前線程所分配到的桶結點
       // 一個桶一個桶的處理
        while (advance) {//  每一次成功處理操作都會將advance設置為true,然裡來處理區間的上一個數據
            int nextIndex, nextBound;
            if (--i >= bound || finishing) { //通過此處進行任務區間的遍歷
                advance = false;
            }
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;// 任務分配完了
                advance = false;
            }
            // 更新 transferIndex
           // 為當前線程分配任務,處理的桶結點區間為(nextBound,nextIndex)
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
               // nextIndex本來等於末尾數字,
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 當前線程所有任務完成 
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {  // 已經完成轉移 則直接賦值操作
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);    //設置sizeCtl為擴容後的0.75
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示當前線程任務完成。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { 
                // 判斷當前線程完成的線程是不是最後一個在擴容的,思路精髓
                        return;
                }
                finishing = advance = true;// 如果是則相應的設置參數
                i = n; 
            }
        }
        else if ((f = tabAt(tab, i))  null) // 數組中把null的元素設置為ForwardingNode節點(hash值為MOVED[-1])
            advance = casTabAt(tab, i, null, fwd); // 如果老節點數據是空的則直接進行CAS設置為fwd
        else if ((fh = f.hash)  MOVED) //已經是個fwd了,因為是多線程操作 可能別人已經給你弄好了,
            advance = true; // already processed
        else {
            synchronized (f) { //加鎖操作
                if (tabAt(tab, i)  f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) { //該節點的hash值大於等於0,說明是一個Node節點
                    // 關於鍊表的操作整體跟HashMap類似不過 感覺好像更擾一些。
                        int runBit = fh & n; // fh= f.hash first hash的意思,看第一個點 放老位置還是新位置
                        Node<K,V> lastRun = f;

                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;    //n的值為擴張前的數組的長度
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;//最後導致發生變化的節點
                            }
                        }
                        if (runBit  0) { //看最後一個變化點是新還是舊 舊
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun; //看最後一個變化點是新還是舊 舊
                            ln = null;
                        }
                        /*
                         * 構造兩個鍊表,順序大部分和原來是反的,不過順序也有差異
                         * 分別放到原來的位置和新增加的長度的相同位置(i/n+i)
                         */
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n)  0)
                                    /*
                                     * 假設runBit的值為0,
                                     * 則第一次進入這個設置的時候相當於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同為0的節點)設置到舊的table的第一個hash計算後為0的節點下一個節點
                                     * 並且把自己返回,然後在下次進來的時候把它自己設置為後面節點的下一個節點
                                     */
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                    /*
                                     * 假設runBit的值不為0,
                                     * 則第一次進入這個設置的時候相當於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同不為0的節點)設置到舊的table的第一個hash計算後不為0的節點下一個節點
                                     * 並且把自己返回,然後在下次進來的時候把它自己設置為後面節點的下一個節點
                                     */
                                hn = new Node<K,V>(ph, pk, pv, hn);    
                        }
                        setTabAt(nextTab, i, ln);    
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { // 該節點hash值是個負數否則的話是一個樹節點
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null; // 舊 頭尾
                        TreeNode<K,V> hi = null, hiTail = null; //新頭圍
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n)  0) {
                                if ((p.prev = loTail)  null)
                                    lo = p;
                                else
                                    loTail.next = p; //舊頭尾設置
                                loTail = p;
                                ++lc;
                            }
                            else { // 新頭圍設置
                                if ((p.prev = hiTail)  null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                         //ln  如果老位置數字<=6 則要對老位置鍊表進行紅黑樹降級到鍊表,否則就看是否還需要對老位置數據進行新建紅黑樹
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd); //老表中i位置節點設置下
                        advance = true;
                    }
                }
            }
        }
    }
}

get

這個就很簡單了,獲得hash值,然後判斷存在與否,遍歷鍊表即可,注意get沒有任何鎖操作!

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 計算key的hash值
        int h = spread(key.hashCode()); 
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { // 表不為空並且表的長度大於0並且key所在的桶不為空
            if ((eh = e.hash)  h) { // 表中的元素的hash值與key的hash值相等
                if ((ek = e.key)  key || (ek != null && key.equals(ek))) // 鍵相等
                    // 返回值
                    return e.val;
            }
            else if (eh < 0) // 是個TreeBin hash = -2 
                // 在紅黑樹中查找,因為紅黑樹中也保存這一個鍊表順序
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 對於結點hash值大於0的情況鍊表
                if (e.hash  h &&
                    ((ek = e.key)  key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

clear

關於清空也相對簡單 ,無非就是遍歷桶數組,然後通過CAS來置空。

public void clear() {
    long delta = 0L;
    int i = 0;
    Node<K,V>[] tab = table;
    while (tab != null && i < tab.length) {
        int fh;
        Node<K,V> f = tabAt(tab, i);
        if (f  null)
            ++i; //這個桶是空的直接跳過
        else if ((fh = f.hash)  MOVED) { // 這個桶的數據還在擴容中,要去擴容同時等待。
            tab = helpTransfer(tab, f);
            i = 0; // restart
        }
        else {
            synchronized (f) { // 真正的刪除
                if (tabAt(tab, i)  f) {
                    Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                        //循環到鍊表/者紅黑樹的尾部
                        while (p != null) {
                            --delta; // 記錄刪除了多少個
                            p = p.next;
                        } 
                        //利用CAS無鎖置null  
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        if (delta != 0L)
            addCount(delta, -1); //調整count
    }

end

ConcurrentHashMap是如果來做到「並發安全」,又是如何做到「高效」的並發的呢?

首先是讀操作,讀源碼發現get方法中根本沒有使用同步機制,也沒有使用unsafe方法,所以讀操作是支持並發操作的。

. 數據擴容函數是transfer,該方法的只有addCount,helpTransfer和tryPresize這三個方法來調用。addCount是在當對數組進行操作,使得數組中存儲的元素個數發生了變化的時候會調用的方法。helpTransfer是在當一個線程要對table中元素進行操作的時候,如果檢測到節點的·hash·= MOVED 的時候,就會調用helpTransfer方法,在helpTransfer中再調用transfer方法來幫助完成數組的擴容❝tryPresize是在treeIfybin和putAll方法中調用,treeIfybin主要是在put添加元素完之後,判斷該數組節點相關元素是不是已經超過8個的時候,如果超過則會調用這個方法來擴容數組或者把鍊表轉為樹。注意putAll在初始化傳入一個大map的時候會調用。·❞

總結擴容情況發生:

❝在往map中添加元素的時候,在某一個節點的數目已經超過了8個,同時數組的長度又小於64的時候,才會觸發數組的擴容。當數組中元素達到了sizeCtl的數量的時候,則會調用transfer方法來進行擴容❞

3. 擴容時候是否可以進行讀寫。

對於讀操作,因為是沒有加鎖的所以可以的. 對於寫操作,JDK8中已經將鎖的範圍細膩到table[i]l了,當在進行數組擴容的時候,如果當前節點還沒有被處理(也就是說還沒有設置為fwd節點),那就可以進行設置操作。如果該節點已經被處理了,則當前線程也會加入到擴容的操作中去。

❞多個線程又是如何同步處理的 在ConcurrentHashMap中,同步處理主要是通過Synchronized和unsafe的硬體級別原子性 這兩種方式來完成的。❝在取得sizeCtl跟某個位置的Node的時候,使用的都是unsafe的方法,來達到並發安全的目的當需要在某個位置設置節點的時候,則會通過Synchronized的同步機制來鎖定該位置的節點。在數組擴容的時候,則通過處理的步長和fwd節點來達到並發安全的目的,通過設置hash值為MOVED=-1。當把某個位置的節點複製到擴張後的table的時候,也通過Synchronized的同步機制來保證線程安全

說個題外話,鳥哥是個比較喜歡折騰的程式設計師,業餘喜歡開發自己網站、小程序、App等,這些東西統統離不開伺服器!最近就圍繞伺服器的主題創建了一個微信群,喜歡玩伺服器或者想自己開發一款產品的讀者可以進來,相互學習交流!群通知中給大家分享了一套搭建伺服器的視頻教程哦。非常適合新手學習!我也會時不時的帶大家擼點和伺服器相關的優惠券!不感興趣,不喜歡折騰的就沒必要湊著鬧了!

識別二維碼,添加微信後

發送【伺服器】即可獲取邀請連結

這是我部署的機器人,請勿調戲!

推薦閱讀

噓!剛剛發現了一個山寨版某庫....

可怕!公司部署了一個東西,悄悄盯著你···

發小被綠,我竭盡所學黑科技,動用雲控捉姦尋找證據….

終於把廢舊電腦變成了伺服器!差點被女票拿去換洗臉盆,真香!

相關焦點

  • 最新阿里面試回來總結分享
    阿里一面: ArrayList 和 linkedlist 區別。ArrayList 是否會越界。 二面: list set map 底層使用什麼實現的有哪些典型實現 hashmap 擴容是怎麼擴容的,為什麼是 2 的冪 concurrenthashmap 為什麼線程安全,採用了什麼措施應對高並發
  • 阿里java面試被pass後,奮戰1個月,最終拿下美團offer!
    前言一位小夥伴準備了許久的阿里Java面試,原以為能夠順利拿下offer,但在第三面還是被摁在地上反覆摩擦,喪氣一段時間後,小夥伴調整了心態重新嘗試了一下
  • 多線程高頻題:讀寫鎖,Volatile,ScheduledExecutorService 和 ConcurrentHashMap
    Reading Materials:(1) https://howtodoinjava.com/core-java/multi-threading/best-practices-for-using-concurrenthashmap/(2) Java 關鍵字 volatile 的理解與正確使用下面的視頻是 xcode 的課程的試聽視頻
  • 滴滴Android崗面經分享:面試真題+經驗總結
    前言大家期望已久的金九銀十面試季的九月已經到來,我特此前來分享一個我剛剛在滴滴的面試經歷。java的集合類的介紹我把能想到的全都說了,map和collection的區別,collection下的list和set的區別,list中的arraylist和linkedlist的介紹(如何實現)以及區別。set中的hashset和treeset。map這邊有hashmap和treemap,底層實現。
  • 一個妹子的後臺面試經驗總結(螞蟻金服+美團+攜程+滴滴+....)
    我問對於沒有實現經驗和實際項目經驗的,阿里會考慮嗎?面試官說對於應屆生,阿里還是最看重基礎。我:你們平時怎麼學習技術?面試官:從實際項目中學習。。。面完之後,面試官說我基礎可以,給我過,我能走多遠就不知道了,反正很謝謝這位面試官,給我的秋招增加了很多信心,畢竟是阿里的面試官說我基礎可以。
  • Java面試高頻考點:HashMap的底層原理
    作為一個Java開發工程師,在面試的過程中,最高頻被問到的一個問題就是:「請簡述一下HashMap的實現原理」,在日常開發中,大多數程式設計師只會使用,對於其實現細節,卻不了解,殊不知這是較基礎卻也最重要的知識點。這篇文章將向大家詳細解釋hashmap的底層到底做了哪些事情。
  • 去阿里面試失敗後,我總結了7條小經驗.
    如果你通過了阿里的第三面,面試官可能還需要糾結,這個時候面試官往往會告訴你一句話:來阿里之後你不缺挑戰,可能你更多需要關注第一年你能否活得下來。聽到這句話的時候,往往是個好消息,說明阿里會考慮你,只不過你進入了最後的二選一或者三選一環節。
  • 關於面試:如何看待王垠被阿里面試官拒絕,其後手撕面試官
    然而阿里卻因一名清華天才面試被拒的消息上了熱搜,相信不少喜歡網友也在網絡上看到了相關的議點。不了解其中緣由的小夥伴們一起來看看吧。 原本是王垠收到阿里的P9崗位面試邀請,但是在面試的過程當中被P10的面試官趙海平給刷了下來。隨後王垠便在網絡上發文吐槽自己的不公平面試遭遇,並且經過網絡的發酵這件事情就成為了當下的熱議點。
  • 清華天才王垠受邀面試阿里P9被斃後手撕P10面試官趙海平!
    由於本事件像電影的情節一環扣一環,文章比較長,直接一句話說下該事件始末:網紅王垠受邀面試阿里 P9 崗位,被 P10 的面趙海平面試,王垠被掛後網上發文吐槽自己遭遇了不公面試,事件發酵該後面試官 P10 被上級打了低績效!後續 P10 回應稱自己是被其他部門邀請去做面試官的並無超出職責範圍,疑自己被出賣了。好了 想繼續看細節的讀者繼續往下看。
  • 清華天才王垠受邀面試阿里,卻大談自己的博客,手撕P10面試官趙海平
    【清華天才王垠受邀面試阿里P9崗位,被斃後網上發文爆內情,手撕P10面試官趙海平】昨天看了這位清華天才王垠去阿里巴巴面試P9崗位新聞,我感覺人生觀又被刷新了!我不是做網際網路的,我可以算做人力資源的。
  • 面試必問的 ConcurrentHashMap
    大致過程:❝在我獲取不到鎖的時候我進行tryLock,準備好new的數據,同時還有一定的次數限制,還要考慮別的已經獲得線程的節點修改該頭節點。通過這樣的硬體級別獲得數據可以保證及時是多線程我也每次獲得的數據是最新的。
  • 阿里面試官:說一下JDK1.7 ConcurrentHashMap的實現原理
    對於getObjectVolatile而言,可以看到它在返回前加了read_barrier,這個讀屏障的作用就是強制去讀取主存中的數據而不是線程自己的本地工作內存,這樣就確保了讀取到的一定是最新的數據。
  • 五輪面試,阿里offer到手!
    反問面試官的問題:一面大概面了50多分鐘,從面試官口中得知他是一個老員工,比我大不了多少,總體上還是聊得蠻投機的。最後的三個問題是我問面試官的,在回答我是否還有機會下次面試的時候說:競爭很激烈,不過機會還是有的。
  • 高並發編程系列:ConcurrentHashMap的實現原理(JDK1.7和JDK1.8)
    HashMap、CurrentHashMap 的實現原理基本都是BAT面試必考內容,阿里P8架構師談:深入探討HashMap的底層結構、原理、擴容機制深入談過hashmap的實現原理以及在JDK 1.8的實現區別,今天主要談CurrentHashMap的實現原理,以及在JDK1.7和1.8的區別。
  • 985碩,秋招面試30家企業,怒斬阿里、字節、美團offer
    因為好多題是看題解做的,做完就忘了,我不是很開竅的那種,所以我能做的就是反覆刷,光劍指offer我就刷了三遍,勤能補拙,到後來面試的時候,算法題基本都是秒做。還記得快手二面一共三道代碼題,十分鐘不到就寫完了,面試官說,小夥子,算法題沒少刷啊,我倆相視一笑,我覺得代碼題真是特別重要的,基本上代碼題寫不出來,這場面試就掛了,代碼題寫好了,哪怕基礎弱點,也能進二面。
  • 面試官:你手寫過堵塞隊列嗎?
    :你好,你先做個自我介紹吧某人:面試官你好,我叫開局一張嘴面試全靠吹,某某年畢業,畢業自家裡蹲大學,做過某某項目。。。。。。某人:沒有手寫過。面試官:哦,那你說下堵塞隊列吧某人支支吾吾:這個有點忘了面試官:沒事,那我們下一個。此處省略一萬字。面試官扭了扭嚴重負荷的頸椎:先到這裡吧,你先回去等通知。某人:好的。不出意外,某人等了一個月,等的望眼欲穿,也沒等到那個期待的電話。
  • 阿里螞蟻金服Java程式設計師面試的11個問題,你會幾個呢?
    在分享螞蟻金服Java程式設計師面經前,不妨來看下Java程式設計師面試時要注意3大要點:0、重視基礎在面試之前,有必要將基礎的知識點重新過一遍,比如並發優缺點、內存可見性、鎖、同步、線程池框架等。有些人就栽在這些基礎的東西上面,可能就這樣失去一個月薪過完的機會。1、常考的Java知識點複習:對於Java常考的知識點,用博客進行總結,以後要跳槽的時候也可以用到。可謂是一次總結就能實現多次利用的效果。
  • 給大家分享一下阿里三面的面試真題
    雖然阿里對外招聘條件寫的比較寬鬆,實際上對年齡和學歷的要求還是挺嚴格的,除非你業餘做了很牛的事情,比如自己開發了個牛逼的軟體,對某些技術有深度研究什麼的…我工作快5年,最近很幸運的拿到了阿里offer,作為一個大專學歷的我已經很知足了。
  • 一道面試題,我把阿里面試官唬住了
    這個問題是安琪拉之前面試被問到的一個問題,正好順著上一篇文章介紹完線程調用時的用戶態和內核態的切換,後續把 Java 並發的都一起講了。面試官:聽前一個面試官說你 Java 並發這塊掌握的不錯,我們深入的交流一下;我:  看了看面試官頭部稀疏的結締組織,已然覺得這場面試不簡單,不過好在事前把安琪拉的博客看了個遍,有所準備,我回答說:咳咳,掌握的還算可以。
  • 面試系列——Java工作6年面試拼多多和阿里經歷附帶面試題
    面試官問我怎麼優化?面試官緊接著問這樣做的問題在哪裡?問題肯定非常明顯了,就是catch中失敗如何去做。當時我們只是記錄了相關接口調用日誌,會有1min的job會去輪詢這些狀態記錄,並告警通知的。我們也考慮過調研過一些分布式事務框架,目前也在調研中。