Striped64源码解析

Striped64源码解析

Striped64类介绍

Striped64是Java8中新增用来支持累加器的并发组件。它可以在并发环境下做一个安全的计数器。它的设计思路,借鉴了JAVA7中的ConcurrentHashmap的分段思想,在竞争的时候,尽量分散竞争。从实现上来看,Striped64维护了一个base和一个Cell数组。在竞争不激烈的时候,累加器的值就一直在base上累加。竞争激烈的时候,就会通过Cell数组来分散技术。Striped64根据线程计算他们的哈希值,将各个线程分散到不同的Cell中累加。最后的总数,只要结合base以及散落在Cell数组中的计数内容。

Striped64采用了更为轻量级的CAS来协调并发,效率更佳。

类图

Striped64类图

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;

NCPU:当前运行主机的CPU核数

cells:初始化值为null,没有竞争的情况下,不会使用到这个数组。有了竞争之后,第一次初始化的长度为2,后续每次扩容,都是原来长度的两倍,直到数组长度大于等于当前服务器CPU的数量为止。每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个Cell对象上。

base:作用类似于AtomicLong中的value,没有竞争的情况下,就对这个值进行增或减。

cellsBusy:它有两个值0或1,它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组(也称cells表),0为无锁,1为加锁,加锁的状况有三种:

  • cells数组初始化的时候;
  • cells数组扩容的时候;
  • 如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

UNSAFE:JDK提供的CAS算法工具类。

BASE:base字段在当前类中的内存偏移量

CELLSBUSY:cellsBusy字段在当前类中的内存偏移量

PROBE:标记位。标记当前线程是否已经初始化

主要方法

Striped64()

1
2
3
4
5
/**
* Package-private default constructor
*/
Striped64() {
}

默认的构造函数。什么都不做。

casBase(long,long)

1
2
3
4
5
6
/**
* CASes the base field.
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

如果当前值等于cmp,就把base的值更新为val,cas更新base字段的值。

casCellsBusy()

1
2
3
4
5
6
/**
* CASes the cellsBusy field from 0 to 1 to acquire lock.
*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

将标记位casCellsBusy标记为1.采用cas,如果当前值是0,则标记成功。否则返回false。

getProbe()

1
2
3
4
5
6
7
/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

返回当前线程的probe值。

longAccumulate(long,LongBinaryOperator,boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
*
* @param x the value
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
// 初始化变量threadLocalRandomProbe的值
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// cas冲突标记,true表示相同的cell间有另外的线程在做累加
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// 如果当前cells数组已经初始化了,并且长度不为0
if ((as = cells) != null && (n = as.length) > 0) {
// 通过计算得到隔间cell的值为null,说明没有其他线程对当前cell做操作,
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个cell
Cell r = new Cell(x); // Optimistically create
// 当前cell未上锁并且上锁成功
if (cellsBusy == 0 && casCellsBusy()) {
// 创建成功标记位
boolean created = false;
try { // Recheck under lock

Cell[] rs; int m, j;
// 二次检查当前隔间cell不为null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 设置当前隔间cell值为r
rs[j] = r;
// 创建成功标记位设置为true
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 新增cell成功,跳出循环
if (created)
break;
// 新增cell失败,说明其他线程已经新增成功,继续循环
continue; // Slot is now non-empty
}
}
// cellsBusy=1,有线程正在更改cells数组,设置为false
collide = false;
}
// 如果cas当前cas失败,将wasUncontended设置为true,然后在87行重新计算probe值,
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// cas将值设置到当前cell的value上,成功则直接跳出
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果当前cells数组已经无法再扩容,已经大于等于最大的CPU数量;或者已经扩容,将collide设置为false,重新计算probe值
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
else if (!collide)
//设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
collide = true;
// 扩容,新参与cell争用的线程两次均失败,且符合扩容条件
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// cells数组还未被扩容
if (cells == as) { // Expand table unless stale
// 扩容为原大小的两倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 重新计算probe值
h = advanceProbe(h);
}
// 如果cell未初始化或者长度为0,先尝试获取cellsBusy锁。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// 初始化,初始容量为2
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
/// 初始化成功
if (init)
break;
}
// 以上操作均失败,则尝试用cas将值累加到base上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

DoubleAccumulate(double,DoubleBinaryOperator, boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* Same as longAccumulate, but injecting long/double conversions
* in too many places to sensibly merge with long version, given
* the low-overhead requirements of this class. So must instead be
* maintained by copy/paste/adapt.
*/
final void doubleAccumulate(double x, DoubleBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
// 初始化变量threadLocalRandomProbe的值
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// cas冲突标记,true表示相同的cell间有另外的线程在做累加
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// 如果当前cells数组已经初始化了,并且长度不为0
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个cell,并将double值转为long存储
Cell r = new Cell(Double.doubleToRawLongBits(x));
// 当前cell未上锁,并上锁成功
if (cellsBusy == 0 && casCellsBusy()) {
// 创建成功标记位
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 二次检查当前隔间cell不为null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { // 当前隔间cell为r
rs[j] = r;
// 创建成功
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 新增成功,跳出循环
if (created)
break;
// 新增失败,说明已经有其他线程新增成功,继续循环
continue; // Slot is now non-empty
}
}
// cellsBusy=1,有线程正在更改cells数组,设置为false
collide = false;
}
// 如果当前cas失败,将wasUncontended设置为true,然后重新计算probe值,重新散列当前线程到其他cell减少冲突
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value,
((fn == null) ?
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x) :
Double.doubleToRawLongBits
(fn.applyAsDouble
(Double.longBitsToDouble(v), x)))))
break;
// 如果当前cells数组已经无法再扩容,已经大于等于最大的CPU数量;或者已经扩容,将collide设置为false,重新计算probe值
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
else if (!collide)
// 设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试仍然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
collide = true;
// 扩容,新参与cell争用的线程两次均失败,且符合扩容条件
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// cells数组还未被扩容
if (cells == as) { // Expand table unless stale
// 扩容为原大小的两倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 重新计算probe值
h = advanceProbe(h);
}
// 如果cell未初始化或者长度为0,先尝试获取cellsBusy锁。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// 初始化,初始容量为2
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
cells = rs;
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 初始化成功
if (init)
break;
}
// 以上操作均失败,则尝试用cas将值累加到base上
else if (casBase(v = base,
((fn == null) ?
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x) :
Double.doubleToRawLongBits
(fn.applyAsDouble
(Double.longBitsToDouble(v), x)))))
break; // Fall back on using base
}
}

doubleAccumulate基本上和longAccumulate差不多,它多了Double转长整型和长整型转Double的步骤。