AtomicIntegerArray源码解析

AtomicIntegerArray源码解析

顾名思义,AtomicIntegerArray就是JDK提供的提供原子操作能力的整型数组。之所以提供这个类是因为整型数组在多线程环境下是线程不安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class UseIntegerArrayUnsafe implements Runnable {
private static int[] value = new int[2];

@Override
public void run() {
for (int i = 0;i < 10000;i++) {
value[0]++;
value[1]++;
}
}

public static void main(String[] args) {
UseIntegerArrayUnsafe unsafe = new UseIntegerArrayUnsafe();

for (int i = 0;i < 10;i++) {
Thread thread = new Thread(unsafe);
thread.start();
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(value[0]);
System.out.println(value[1]);
}
}

上述类UseIntegerArrayUnsafe,实现了Runnable接口,在其实现的run方法中,对私有数组变量的两个值分别自增1.

而后在main方法中,起了10个线程,分别执行。

运行结果1

运行结果2

运行结果3

AtomicIntegerArray类介绍

JDK提供了AtomicIntegerArray类用于原子的更新int类型的数组。不同于其他的Atomic×××,AtomicInterArray并没有使用CAS+Volatile关键字来实现原子操作。

类图

AtomicIntegerArray类图

主要属性

1
2
3
4
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
private final int[] array;

AtomicIntegerArray主要有上面4个属性:

unsafe:Sun提供的CAS算法的相关操作,AtomicIntegerArray底层就是通过它来完成值的修改的;

base:数组在ArrayIntegerArray中的内存起始地址

shift:左移偏移量。用来计算数组中的元素在整个AtomicIntegerArray对象中的偏移量。数组中第i个元素的地址就是 i << shift + base.

array:数组对象,保存数值

主要方法

初始化

1
2
3
4
5
6
static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

首先看下这个静态代码块,在这个代码块中,首先定义一个int变量scale,然后赋值为int[].class占用的字节数。

如果占用的字节数不是2的幂次,抛出异常。

最后再通过scale计算出来左移的偏移量。

例如,int一般占用4个字节,那么scale就等于4,而shift就等于31减去4变为二进制之后的前导0的个数,也就是2.

那么,数组第一个位置的元素就是base + 0 << 2, A[base],第二个就是base + 1 << 2.即A[base + 4],第三个就是A[base + 8]

checkedByteOffset(int)

1
2
3
4
5
6
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);

return byteOffset(i);
}

校验索引i是否越界,越界抛出异常,否则调用byteOffset方法计算出偏移量返回。

byteOffset(int)

1
2
3
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}

将索引i左移shift位,加上base返回目标位置的内存地址

AtomicIntegerArray(int)

1
2
3
4
5
6
7
8
9
/**
* Creates a new AtomicIntegerArray of the given length, with all
* elements initially zero.
*
* @param length the length of the array
*/
public AtomicIntegerArray(int length) {
array = new int[length];
}

创建一个指定数组长度的AtomicIntegerArray对象

AtomicIntegerArray(int[])

1
2
3
4
5
6
7
8
9
10
11
/**
* Creates a new AtomicIntegerArray with the same length as, and
* all elements copied from, the given array.
*
* @param array the array to copy elements from
* @throws NullPointerException if array is null
*/
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}

根据指定的数组,创建一个AtomicIntegerArray对象

length()

1
2
3
4
5
6
7
8
/**
* Returns the length of the array.
*
* @return the length of the array
*/
public final int length() {
return array.length;
}

返回数组的长度

get(int)

1
2
3
4
5
6
7
8
9
/**
* Gets the current value at position {@code i}.
*
* @param i the index
* @return the current value
*/
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}

获取数组指定位置的值。调用checkedByteOffset检查越界情况,并在方法内部调用byteOffset计算出位置i的内存偏移量。传给getRaw方法,获取值

getRaw(long offset)

1
2
3
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}

直接调用unsafe的方法,获取对应内存偏移量上的值

set(int , int)

1
2
3
4
5
6
7
8
9
/**
* Sets the element at position {@code i} to the given value.
*
* @param i the index
* @param newValue the new value
*/
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}

调用unsafe的方法直接将位置i上的值设置为给定的新值

lazySet(int, int)

1
2
3
4
5
6
7
8
9
10
/**
* Eventually sets the element at position {@code i} to the given value.
*
* @param i the index
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(int i, int newValue) {
unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
}

调用unsafe的方法直接将位置i上的值设置为给定的新值。但是这个方法有别于上面的set方法,它在一段时间内是有可能让其他线程查到原来的旧值,而不是立马查到新值。因此,它可以算是一个“最终一致性”的方法

getAndSet(int, int)

1
2
3
4
5
6
7
8
9
10
11
/**
* Atomically sets the element at position {@code i} to the given
* value and returns the old value.
*
* @param i the index
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

返回位置i上的旧值,并将新值赋值给位置i

compareAndSet(int, int, int)

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Atomically sets the element at position {@code i} to the given
* updated value if the current value {@code ==} the expected value.
*
* @param i the index
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

如果位置i上的旧值等于期望值expect,那么就将位置i上的值赋值为update,并返回true,否则返回false

compareAndSetRaw(long, int, int)

1
2
3
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}

compareAndSet方法实际调用的方法,直接调用unsafe设置对应偏移位置上的值

weakCompareAndSet(int, int, int)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Atomically sets the element at position {@code i} to the given
* updated value if the current value {@code ==} the expected value.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param i the index
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(int i, int expect, int update) {
return compareAndSet(i, expect, update);
}

这个方法的本意是,调用weakCompareAndSet方法时不能保证指令重排的发生,因此,这个方法有时候会毫无理由地失败。

但是从实现上看,和compareAndSet没什么区别

getAndIncrement(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically increments by one the element at index {@code i}.
*
* @param i the index
* @return the previous value
*/
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}

获取位置i上的值返回,并对该值自增.

getAndDecrement(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically decrements by one the element at index {@code i}.
*
* @param i the index
* @return the previous value
*/
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}

获取位置i上的值返回,并对该值自减.

getAndAdd(int, int)

1
2
3
4
5
6
7
8
9
10
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

返回位置i上对应的值,并加上delta

incrementAndGet(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically increments by one the element at index {@code i}.
*
* @param i the index
* @return the updated value
*/
public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}

对位置i上的值增1,并返回

decrementAndGet(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically decrements by one the element at index {@code i}.
*
* @param i the index
* @return the updated value
*/
public final int decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}

对位置i上的值减1,并返回

addAndGet(int, int)

1
2
3
4
5
6
7
8
9
10
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the updated value
*/
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}

对位置i上的值加上delta,并返回最新的值

getAndUpdate(int, IntUnaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Atomically updates the element at index {@code i} with the results
* of applying the given function, returning the previous value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param i the index
* @param updateFunction a side-effect-free function
* @return the previous value
* @since 1.8
*/
public final int getAndUpdate(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}

返回位置i上的值,并对位置i上的值应用单元函数updateFunction,更新位置i上的值。这里要求函数updateFunction是幂等的,即,多次执行结果是一致的,因为CAS操作可能失败。

updateAndGet(int, IntUnaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Atomically updates the element at index {@code i} with the results
* of applying the given function, returning the updated value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param i the index
* @param updateFunction a side-effect-free function
* @return the updated value
* @since 1.8
*/
public final int updateAndGet(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

对位置i上的值应用单元函数updateFunction,并将应用后的值更新到位置i。要求单元函数是幂等的,因为CAS操作可能失败

getAndAccumulate(int, int, IntBinaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Atomically updates the element at index {@code i} with the
* results of applying the given function to the current and
* given values, returning the previous value. The function should
* be side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function is
* applied with the current value at index {@code i} as its first
* argument, and the given update as the second argument.
*
* @param i the index
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the previous value
* @since 1.8
*/
public final int getAndAccumulate(int i, int x,
IntBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}

返回位置i上的值,并对位置i上的值应用二元函数accumulatorFunction,设置在位置i上。这里同样要求二元函数具有幂等性,因为CAS操作可能会失败。

accumulateAndGet(int, int, IntBinaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Atomically updates the element at index {@code i} with the
* results of applying the given function to the current and
* given values, returning the updated value. The function should
* be side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function is
* applied with the current value at index {@code i} as its first
* argument, and the given update as the second argument.
*
* @param i the index
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the updated value
* @since 1.8
*/
public final int accumulateAndGet(int i, int x,
IntBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

对位置i上的值应用二元函数accumulatorFunction,设置在位置i上,然后返回位置i上的值。这里同样要求二元函数具有幂等性,因为CAS操作可能会失败。

toString()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns the String representation of the current values of array.
* @return the String representation of the current values of array
*/
public String toString() {
int iMax = array.length - 1;
if (iMax == -1)
return "[]";

StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; ; i++) {
b.append(getRaw(byteOffset(i)));
if (i == iMax)
return b.append(']').toString();
b.append(',').append(' ');
}
}

返回[value, value,value],这样的字符串。

应用

下面就是用AtomicIntegerArray修改在多线程环境下存在问题的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class UseAtomicIntegerArray implements Runnable {
private static AtomicIntegerArray integerArray = new AtomicIntegerArray(2);

@Override
public void run() {
for (int i = 0;i < 10000;i++) {
integerArray.addAndGet(0, 1);
integerArray.addAndGet(1, 1);
}
}

public static void main(String[] args) {
UseAtomicIntegerArray unsafe = new UseAtomicIntegerArray();

for (int i = 0;i < 10;i++) {
Thread thread = new Thread(unsafe);
thread.start();
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(integerArray.get(0));
System.out.println(integerArray.get(1));
}
}

再执行多少次都不会影响结果了。