彻底弄明白JDK中ConcurrentHashMap实现原理和底层数据结构!

1 ConcurrentHashMap简介

ConcurrentHashMap也是存储存储的是 key-vlaue 格式数据,是集合Map下的集合之一。

其实我们可以理解ConcurrentHashMapHashMap的升级版,熟悉HashMap都知道其性能已经很拉菲了,但HashMap因不是线程安全的带来了高并发问题,因此就有了ConcurrentHashMap。当然因为ConcurrentHashMap主要就是解决HashMap线程安全的问题,所以性能上肯定会稍逊于HashMap的。


2 ConcurrentHashMap数据结构分析

其实ConcurrentHashMap底层数据结构和HashMap是一样的。在JDK7中ConcurrnetHashMap 由很多个 Segment 组合,而每个 Segment 其实可以理解为就是一个 HashMap。要注意的是如果 Segment 设定了初始化值后就不可再改变了(默认是 16 个,也就是其默认支持16个线程)。

在JDK7中ConcurrentHashMapSegment 数组 + HashEntry 数组 + 链表 组成。

JDK7ConcurrentHashMap数据结构分析图:

在JDK8中ConcurrentHashMapNode 数组 + 链表 / 红黑树 组成。

JDK8ConcurrentHashMap数据结构分析简图:

其实ConcurrentHashMap在JDK中的分界点就是JDK8,所以这里我们主要分析JDK7和JDK8中ConcurrentHashMap


3 ConcurrentHashMap底层实现原理

3.1 JDK7中的ConcurrentHashMap

3.1.1 初始化分析

通过Segment继承了ReentrantLock实现线程安全


/**
 * Stripped-down version of helper class used in previous version,
 * declared for the sake of serialization compatibility
 */
static class Segment extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    final float loadFactor;
    Segment(float lf) { this.loadFactor = lf; }
}

无参构造函数和其三个默认常量参数

//三个默认常量参数

/**
* 默认初始化容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 默认负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 默认并发级别
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;


//无参构造函数,可以看出无参构造传入这3个默认值
/**
* Creates a new, empty map with a default initial capacity (16),
* load factor (0.75) and concurrencyLevel (16).
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

有参构造实现逻辑分析

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {

if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 校验初始化容量值(即发级别大小),大于 1<<16,重置为 65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2的多少次方
int sshift = 0;
int ssize = 1;
// 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记录段偏移量
this.segmentShift = 32 - sshift;
// 记录段掩码
this.segmentMask = ssize - 1;
// 设置容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 容量 / ssize ,默认 16 / 16 = 1,这里是计算每个 Segment 中的类似于 HashMap 的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//Segment 中的类似于 HashMap 的容量至少是2或者2的倍数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建 Segment 数组,设置 segments[0]
Segment s0 = new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

初始化流程总结:

1、校验参数和并发级别 concurrencyLevel 大小(默认 16)。

2、把并发数之上最近的 2 的幂次方值,作为初始化容量大小(默认是 16)。

3、记录 segmentShift 偏移量,该值为【容量 = 2 的N次方】中的 N 值(默认是 32 - sshift = 28)。

4、记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15.

5、初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容。

3.1.2 添加元素put()方法分析

/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* 

The value can be retrieved by calling the get method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with key, or * null if there was no mapping for key * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { Segment s; if (value == null) throw new NullPointerException(); int hash = hash(key); // hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算 // 其实也就是把高4位与segmentMask(1111)做与运算 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 如果查找到的 Segment 为空,初始化 s = ensureSegment(j); return s.put(key, hash, value, false); } /** * Returns the segment for the given index, creating it and * recording in segment table (via CAS) if not already present. * * @param k the index * @return the segment */ @SuppressWarnings("unchecked") private Segment ensureSegment(int k) { final Segment[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment seg; // 判断 u 位置的 Segment 是否为null if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment proto = ss[0]; // use segment 0 as prototype // 获取0号 segment 里的 HashEntry 初始化长度 int cap = proto.table.length; // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的 float lf = proto.loadFactor; // 计算扩容阀值 int threshold = (int)(cap * lf); // 创建一个 cap 容量的 HashEntry 数组 HashEntry[] tab = (HashEntry[])new HashEntry[cap]; if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作 Segment s = new Segment(lf, threshold, tab); // 自旋检查 u 位置的 Segment 是否为null while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用CAS 赋值,只会成功一次 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }

流程理解:

1、计算要 put() 的 key 的位置,获取指定位置的 Segment

2、如果指定位置的 Segment 为空,则初始化该 Segment。

Segment 初始化流程:

a、获取到的 Segment 是否为null,为 null 则初始化,使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组。

b、再次计算获取到的 Segment 是否为null,为null则用创建的 HashEntry 数组初始化它

3、Segment.put 插入 key,value 值。

看一下put()方法涨工获取锁的源码,这里处理线程安全的逻辑:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。
HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
// 计算要put的数据位置
int index = (tab.length - 1) & hash;
// CAS 获取 index 坐标的值
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
// 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// first 有值没说明 index 位置已经有值了,有冲突,链表头插法。
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
// 容量大于扩容阀值,小于最大容量,进行扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

//获取锁
private HashEntry scanAndLockForPut(K key, int hash, V value) {
HashEntry first = entryForHash(this, hash);
HashEntry e = first;
HashEntry node = null;
int retries = -1; // negative while locating node
// 自旋获取锁
while (!tryLock()) {
HashEntry f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 自旋达到指定次数后,阻塞等到只到获取到锁
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

3.1.3 扩容rehash()方法分析

private void rehash(HashEntry node) {
HashEntry[] oldTable = table;
// 老容量
int oldCapacity = oldTable.length;
// 新容量,扩大两倍
int newCapacity = oldCapacity << 1;
// 新的扩容阀值
threshold = (int)(newCapacity * loadFactor);
// 创建新的数组
HashEntry[] newTable = (HashEntry[]) new HashEntry[newCapacity];
// 新的掩码,默认2扩容后是4,-1是3,二进制就是11。
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 遍历老数组
HashEntry e = oldTable[i];
if (e != null) {
HashEntry next = e.next;
// 计算新的位置,新的位置只可能是不便或者是老的位置+老的容量。
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 如果当前位置还不是链表,只是一个元素,直接赋值
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 如果是链表了
HashEntry lastRun = e;
int lastIdx = idx;
// 新的位置只可能是不便或者是老的位置+老的容量。
// 遍历结束后,lastRun 后面的元素位置都是相同的
for (HashEntry last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// ,lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry p = e; p != lastRun; p = p.next) {
// 遍历剩余元素,头插法到指定 k 位置。
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k];
newTable[k] = new HashEntry(h, p.key, v, n);
}
}
}
}
// 头插法插入新的节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

可以看出JDK7中ConcurrentHashMap 扩容是原来的两倍。

3.1.4 获取元素get()方法分析

public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 计算得到 key 的存放位置
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
// 如果是链表,遍历查找到相同 key 的 value。
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

get()方法就很简单了,也就是计算到key的位置,然后根据key拿到值。

3.2 JDK8中的ConcurrentHashMap

这里我们之分析和JDK7有不同变化的部分逻辑。

3.2.1 初始化initTable()方法分析

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。
if ((sc = sizeCtl) < 0)
// 让出 CPU 使用权
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

根据源码可以看出,JDK8中 ConcurrentHashMap 是通过 自旋和 CAS 完成初始化的。

3.2.2 添加元素put()方法分析

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key 和 value 不能为空
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node[] tab = table;;) {
        // f = 目标位置元素
        Node f; int n, i, fh;// fh 后面存放目标位置的元素 hash 值
        if (tab == null || (n = tab.length) == 0)
            // 数组桶为空,初始化数组桶(自旋+CAS)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出
            if (casTabAt(tab, i, null,new Node(hash, key, value, null)))
                break;  // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 使用 synchronized 加锁加入节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 说明是链表
                    if (fh >= 0) {
                        binCount = 1;
                        // 循环加入新的或者覆盖节点
                        for (Node e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 红黑树
                        Node p;
                        binCount = 2;
                        if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

JDK8中ConcurrentHashMapput()方法流程:

1、根据 key 计算出 hashcode() ,然后判断是否需要进行初始化。

3、根据 key 定位到的 Node,如果为null则可添加元素,这里利用 CAS 尝试写入,如果失败则自旋保证成功。

4、如果当前Node对应的 hashcode == MOVED == -1,则需扩容。如果都不满足,则利用 synchronized 锁写入数据。

5、如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

3.2.3 获取元素get()方法分析

get()方法就很简单了,也就是计算到key的位置,然后根据key拿到值。

public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
// key 所在的 hash 位置
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果指定位置元素存在,头结点hash值相同
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// key hash 值相等,key值相同,直接返回元素 value
return e.val;
}
else if (eh < 0)
// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 是链表,遍历查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}


4 ConcurrentHashmap和HashMap区别

1、数据结构

HashMap的数据结构 数组 + 链表 + 红黑树ConcurrentHashMap的数据结构 分段数组 + 链表 + 红黑树。

2、线程安全

HashMap是线程不安全的,ConcurrentHashMap是线程安全的。

3、性能

ConcurrentHashMap主要就是解决HashMap线程安全的问题,所以性能上肯定会稍逊于HashMap的。

4、相同点

扩容、缩容功能两者是一样的。

展开阅读全文

页面更新:2024-04-22

标签:数据结构   结点   常量   数组   初始化   线程   底层   元素   原理   性能   位置   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top