ConcurrentHashMap源码解析

ConcurrentHashMap源码解析

简介

ConcurrentHashMap是Java SDK提供的线程安全的HashMap类。在多线程并发的场景下,用它做添加、删除是不会出现线程安全问题的。

它实现了ConcurrentMap和Serializable接口。

实现逻辑

ConcurrentHashMap底层的实现是Node + 散列表+红黑树。

主要属性

  • DEFAULT_CAPACITY:ConcurrentHashMap的默认大小是16
  • LOAD_FACTOR:负载因子。ConcurrentHashMap在超过16 * 0.75之后,就需要扩容了。
  • TREEIFY_THRESHOLD:发生哈希冲突的链表长度如果大于等于8,就会转变为红黑树
  • UNTREEIFY_THRESHOLD:发生哈希冲突的红黑树,在小于等于6个元素之后,就会回退成链表
  • MIN_TREEIFY_CAPACITY:当ConcurrentHashMap的大小大于等于64的时候,才允许将冲突的链表转为红黑树
  • Node<K,V>[] table 保存哈希桶的数组,大小是2的倍数
  • CounterCell[] counterCells:保存各个桶大小

主要函数

构造函数

无参构造函数

创建一个空map,默认大小为16

1
2
3
public ConcurrentHashMap() {

}

有参构造函数(指定初始化大小)

初始化时候指定Map大小,然后会初始化为比指定大小最近的一个2的幂次数值。比如,指定大小为12的话,就初始化为16,为17的话,就初始化为32.

1
2
3
4
5
6
7
8
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

有参构造函数(指定map)

1
2
3
4
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

有参构造函数(指定初始化大小和负载因子)

调用的是下面的一个构造函数

1
2
3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

有参构造函数(指定初始化大小、负载因子以及并发度)

需要注意的是,map的桶个数至少是要和并发度相等的。即并发度为N,那么桶的数目就必须大于等于N

1
2
3
4
5
6
7
8
9
10
11
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

size()

size方法实现很简单,就是调用sumCount方法,然后判断下是否溢出。

1
2
3
4
5
6
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

sumCount()

先看下sumCount的实现

1
2
3
4
5
6
7
8
9
10
11
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

sumCount的逻辑也比较简单,就是使用了baseCount这个变量和CounterCell数组。

baseCount和CounterCell的具体使用方式,我们在put方法里面在详细说明吧。

先简单看下baseCount变量

baseCount

baseCount变量是主要用在没有竞争场景下计数使用的。通过CAS修改。

CounterCell数组

在并发的场景下,如果CAS更新baseCount失败了,那么失败的线程就会创建CounterCell对象,用来保存部分总数。

isEmpty()

判断下sumCount的返回是否为0即可。

1
2
3
public boolean isEmpty() {
return sumCount() <= 0L; // ignore transient negative values
}

get(Object)

先看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

get方法的流程如下:

  1. 首先通过spread方法,计算出key的hash值
  2. 如果Node数组不为空,并且有值,判断头结点的key是不是和给定的Key相等
  3. 如果头结点的key和给定的key相等,直接返回头结点的值
  4. 否则,判断当前节点是树节点还是链表节点
  5. 树节点通过find方法寻找
  6. 链表节点就遍历寻找

spread方法

1
2
3
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

此方法将key的哈希值的低16位和高16位做异或运算

containsKey(Object)

基于上文get方法的实现,判断get出来的值是否为null

1
2
3
public boolean containsKey(Object key) {
return get(key) != null;
}

put(Object, Object)

调用了内部方法putVal

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

putVal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value都不能为空,否则空指针异常
if (key == null || value == null) throw new NullPointerException();
// 计算出key的哈希值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 如果是第一次插入数据的话,初始化table
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果当前table位置上没有元素,说明是第一个,CAS新建一个Node
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// CAS 新建Node成功,返回
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 如果当前桶的hash值是-1,说明当前map正在扩容,进入协助扩容
tab = helpTransfer(tab, f);
else {
// 否则插入链表或者是树中
V oldVal = null;
synchronized (f) {
// 锁住头结点
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 如果Key相等,更新value
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 新节点插入链表
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 插入红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果当前链表的长度超过了阈值,转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 当前map的大小加一,并判断是否需要扩容
addCount(1L, binCount);
return null;
}

总的来说,put方法的步骤是:

  1. 判断table是否为空,是的话,就先初始化table
  2. 如果对应位置的桶是空的,就新建一个Node节点CAS插入
  3. 如果要插入的桶的Hash值为Moved,也就是-1,说明其他线程正在扩容,进入协助扩容方法
  4. 插入链表或者是树
  5. 如果完成之后,如果链表个数超过阈值,转换为树
  6. map的大小加1

initTable方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

initTable在执行的时候,发现有其他线程也在做初始化动作的话,它会调用Thread.yield方法,释放cpu,让其他线程继续做初始化动作。

helpTransfer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//返回一个 16 位长度的扩容校验标识
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//sizeCtl 如果处于扩容状态的话
//前 16 位是数据校验标识,后 16 位是当前正在扩容的线程总数
//这里判断校验标识是否相等,如果校验符不等或者扩容操作已经完成了,直接退出循环,不用协助它们扩容了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//否则调用 transfer 帮助它们进行扩容
//sc + 1 标识增加了一个线程进行扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

addCount方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果as不为null(存在并发),或者CAS更新baseCount失败
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 通过as来计数
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果 as 是空(还不是并发)或者 (ss 中随机取余一个数组位置为空 或者 ss 这个位置的变量失败)
// 说明通过as计数失败,调用fullAddCount
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 统计当前的总大小
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 检查扩容条件:
// 1. 是否达到阀值: s >= sizeCtl
// 2. 是否可以扩容: tab != null && tab 当前的长度小于 1 << 30
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据当前桶的数量生成一个标志位
int rs = resizeStamp(n);
// 如果正在扩容
if (sc < 0) {
// 检查当前扩容的进展:
// 1. 如果 sc 的低 16 位不等于标识位( sizeCtl 变化了,说明容器状态已经变化),退出
// 2. 如果 sc == 标识位 + 1 (通过下面代码可知,刚开始扩容时, sc = rs + 2,如果 sc = rs + 1,说明已经没有线程在扩容),退出
// 3. 如果 sc == 标识符 + 65535,参与扩容的线程已经达到最大数量,当前线程不再参与,退出
// 4. 如果 nextTable == null 说明扩容结束(nextTable 在扩容中起中转作用,所有的元素会被限移到 nextTable 中,最后让 tab = nextTable,nextTable == null 来完成扩容),退出
// 5. transferIndex <= 0 说明没有桶还需要迁移了(transferIndex 用于标识当前迁移到哪个桶了,小于等于 0 说明已经迁移到最后一个桶或者已经迁移完成,迁移的顺序是从最后一个桶开始),退出。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果迁移还是进行,当前线程尝试参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果当前不在扩容中,则发起一个新的扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

参考资料

Java容器系列-ConcurrentHashMap源码分析

为并发而生的 ConcurrentHashMap(Java 8)

0%