ConcurrentHashMap在Java1.7和1.8中的区别

ConcurrentHashMap在Java 7和Java 8中的区别

ConcurrentHashMap在Java中一直是一个多线程的大杀器。在多线程环境下,使用它就可以免去使用HashMap造成的线程不安全的问题。鉴于Java 8已经出来N久了,在各个方面和Java 7都有所不同。下面我们就从源码级别对ConcurrentHashMap进行分析,看看从Java 7 到Java 8,它到底有哪些变化?

Java 7中的ConcurrentHashMap

put方法实现

在Java 7 中,put一个值到ConcurrentHashMap主要有以下这些步骤:

  1. 首先计算出这个key对应的segment,(hash >>> segmentShift) & segmentMask;
  2. 通过key再次计算出,这个值在table数组中的位置
  3. 首先尝试获取锁,成功获取锁之后,转到4,否则,转到8
  4. 获取table数组的头结点,遍历;
  5. 如果当前key在链表中,则直接覆盖其值,不在的话,转为6
  6. 将节点插入头结点位置,作为第一个节点;
  7. 判断是否需要扩容,如有,扩容
  8. 遍历是否存在相同的key
  9. 在步骤8遍历时,自旋获取锁,如果超过最大限制64次,则阻塞。

put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//Segment继承ReentrantLock,尝试获取独占锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//定位键值对在HashEntry数组上的位置
int index = (tab.length - 1) & hash;
//获取这个位置的第一个键值对
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {//此处有链表结构,一直循环到e==null
K k;
//存在与待插入键值对相同的键,则替换value
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {//onlyIfAbsent默认为false
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//node不为null,设置node的next为first,node为当前链表的头节点
if (node != null)
node.setNext(first);
//node为null,创建头节点,指定next为first,node为当前链表的头节点
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//扩容条件 (1)entry数量大于阈值 (2) 当前数组tab长度小于最大容量。满足以上条件就扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//扩容
rehash(node);
else
//tab的index位置设置为node,
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

scanAndLockForPut

在不超过最大重试次数MAX_SCAN_RETRIES通过CAS尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//first,e:键值对的hash值定位到数组tab的第一个键值对
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//线程尝试通过CAS获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
//当e==null或key.equals(e.key)时retry=0,走出这个分支
if (e == null) {
if (node == null) // speculatively create node
//初始化键值对,next指向null
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//超过最大自旋次数,阻塞
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//头节点发生变化,重新遍历
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

size方法实现

在Java 7 中求ConcurrentHashMap的方式如下步骤:

  1. 先不加锁,计算两次ConcurrentHashMap的大小,如果两次结果是一样的,说明是正确的,返回
  2. 如果两次结果不一样,则锁住所有的segment,重新计算所有segment的count之和。

size方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//达到RETRIES_BEFORE_LOCK
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
//遍历计算segment的modCount和count的和
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
//是否溢出int范围
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//last是上一次的sum值,相等跳出循环
if (sum == last)
break;
last = sum;
}
} finally {
//解锁
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

Java 8中的ConcurrentHashMap

数据结构

Java 8 中去除了Java 7 的segment分段锁,转而使用Node+synchronized+CAS来保证并发安全。

put方法实现

ConcurrentHashMap在Java 8 的put步骤如下:

  1. 根据key的hash值定位到table的首节点first;
  2. 如果为null,新增节点,通过cas的方式;
  3. 如果不为null,并且,first.hash == -1,说明其他线程正在扩容,参与一起扩容
  4. 如果不为null,并且first.hash != -1,Synchronized锁住first节点,判断是链表还是红黑树,遍历插入

size方法实现

由于没有segment的概念,所以只需要用一个 baseCount 变量来记录ConcurrentHashMap 当前 节点的个数

  1. 先尝试通过CAS 修改 baseCount
  2. 如果多线程竞争激烈,某些线程CAS失败,那就CAS尝试将 CELLSBUSY 置1,成功则可以把 baseCount变化的次数 暂存到一个数组 counterCells 里,后续数组 counterCells 的值会加到 baseCount 中。
  3. 如果 CELLSBUSY 置1失败又会反复进行CAS baseCount 和 CAS counterCells数组

N句话总结

  1. 去除 Segment + HashEntry + Unsafe 的实现,
    改为 Synchronized + CAS + Node + Unsafe 的实现
    其实 Node 和 HashEntry 的内容一样,但是HashEntry是一个内部类。
    用 Synchronized + CAS 代替 Segment ,这样锁的粒度更小了,并且不是每次都要加锁了,CAS尝试失败了再加锁。
  2. put()方法中 初始化数组大小时,1.8不用加锁,因为用了个 sizeCtl 变量,将这个变量置为-1,就表明table正在初始化。

参考资料

ConCurrentHashMap JDK1.7 和 JDK1.8 的区别

0%