AtomicLongArray源码解析

AtomicLongArray源码解析

顾名思义,AtomicLongArray就是JDK提供的提供原子操作能力的长整型数组。之所以提供这个类是因为长整型数组在多线程环境下是线程不安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class UseLongArrayUnsafe implements Runnable {
private static long[] value = new long[2];

@Override
public void run() {
for (int i = 0;i < 10000;i++) {
value[0]++;
value[1]++;
}
}

public static void main(String[] args) {
UseLongArrayUnsafe unsafe = new UseLongArrayUnsafe();

for (int i = 0;i < 10;i++) {
Thread thread = new Thread(unsafe);
thread.start();
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(value[0]);
System.out.println(value[1]);
}
}

上述类UseLongArrayUnsafe,实现了Runnable接口,在其实现的run方法中,对私有数组变量的两个值分别自增1.

而后在main方法中,起了10个线程,分别执行。

运行结果1

运行结果2

运行结果3

AtomicLongArray类介绍

JDK提供了AtomicLongArray类用于原子的更新long类型的数组。不同于其他的Atomic×××,AtomicLongArray并没有使用CAS+Volatile关键字来实现原子操作。

类图

AtomicLongArray类图

主要属性

1
2
3
4
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(long[].class);
private static final int shift;
private final long[] array;

AtomicLongArray主要有上面4个属性:

unsafe:Sun提供的CAS算法的相关操作,AtomicLongArray底层就是通过它来完成值的修改的;

base:数组中第一个元素的内存地址

shift:左移偏移量。基于对象的数组,shift是根据不同对象来决定不同值的用来计算,数组中的元素在整个AtomicLongArray对象中的偏移量。数组中第i个元素的地址就是 i << shift + base.

array:数组对象,保存数值

主要方法

初始化

1
2
3
4
5
6
static {
int scale = unsafe.arrayIndexScale(long[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

首先看下这个静态代码块,在这个代码块中,首先定义一个int变量scale,然后赋值为int[].class占用的字节数。

如果占用的字节数不是2的幂次,抛出异常。

最后再通过scale计算出来左移的偏移量。

例如,int一般占用4个字节,那么scale就等于4,而shift就等于31减去4变为二进制之后的前导0的个数,也就是2.

那么,数组第一个位置的元素就是base + 0 << 2, A[base],第二个就是base + 1 << 2.即A[base + 4],第三个就是A[base + 8]

checkedByteOffset(int)

1
2
3
4
5
6
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);

return byteOffset(i);
}

校验索引i是否越界,越界抛出异常,否则调用byteOffset方法计算出偏移量返回。

byteOffset(int)

1
2
3
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}

将索引i左移shift位,加上base返回目标位置的内存地址

AtomicLongArray(int)

1
2
3
4
5
6
7
8
9
/**
* Creates a new AtomicLongArray of the given length, with all
* elements initially zero.
*
* @param length the length of the array
*/
public AtomicLongArray(int length) {
array = new long[length];
}

创建一个指定数组长度的AtomicLongArray对象

AtomicLongArray(int[])

1
2
3
4
5
6
7
8
9
10
11
/**
* Creates a new AtomicLongArray with the same length as, and
* all elements copied from, the given array.
*
* @param array the array to copy elements from
* @throws NullPointerException if array is null
*/
public AtomicLongArray(long[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}

根据指定的数组,创建一个AtomicLongArray对象

length()

1
2
3
4
5
6
7
8
/**
* Returns the length of the array.
*
* @return the length of the array
*/
public final int length() {
return array.length;
}

返回数组的长度

get(int)

1
2
3
4
5
6
7
8
9
/**
* Gets the current value at position {@code i}.
*
* @param i the index
* @return the current value
*/
public final long get(int i) {
return getRaw(checkedByteOffset(i));
}

获取数组指定位置的值。调用checkedByteOffset检查越界情况,并在方法内部调用byteOffset计算出位置i的内存偏移量。传给getRaw方法,获取值

getRaw(long offset)

1
2
3
private long getRaw(long offset) {
return unsafe.getLongVolatile(array, offset);
}

直接调用unsafe的方法,获取对应内存偏移量上的值

set(int , long)

1
2
3
4
5
6
7
8
9
/**
* Sets the element at position {@code i} to the given value.
*
* @param i the index
* @param newValue the new value
*/
public final void set(int i, long newValue) {
unsafe.putLongVolatile(array, checkedByteOffset(i), newValue);
}

调用unsafe的方法直接将位置i上的值设置为给定的新值

lazySet(int, long)

1
2
3
4
5
6
7
8
9
10
/**
* Eventually sets the element at position {@code i} to the given value.
*
* @param i the index
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(int i, long newValue) {
unsafe.putOrderedLong(array, checkedByteOffset(i), newValue);
}

调用unsafe的方法直接将位置i上的值设置为给定的新值。但是这个方法有别于上面的set方法,它在一段时间内是有可能让其他线程查到原来的旧值,而不是立马查到新值。因此,它可以算是一个“最终一致性”的方法

getAndSet(int, long)

1
2
3
4
5
6
7
8
9
10
11
/**
* Atomically sets the element at position {@code i} to the given value
* and returns the old value.
*
* @param i the index
* @param newValue the new value
* @return the previous value
*/
public final long getAndSet(int i, long newValue) {
return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue);
}

返回位置i上的旧值,并将新值赋值给位置i

compareAndSet(int, long, long)

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Atomically sets the element at position {@code i} to the given
* updated value if the current value {@code ==} the expected value.
*
* @param i the index
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int i, long expect, long update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

如果位置i上的旧值等于期望值expect,那么就将位置i上的值赋值为update,并返回true,否则返回false

compareAndSetRaw(long, long, long)

1
2
3
private boolean compareAndSetRaw(long offset, long expect, long update) {
return unsafe.compareAndSwapLong(array, offset, expect, update);
}

compareAndSet方法实际调用的方法,直接调用unsafe设置对应偏移位置上的值

weakCompareAndSet(int, long, long)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Atomically sets the element at position {@code i} to the given
* updated value if the current value {@code ==} the expected value.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param i the index
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(int i, long expect, long update) {
return compareAndSet(i, expect, update);
}

这个方法的本意是,调用weakCompareAndSet方法时不能保证指令重排的发生,因此,这个方法有时候会毫无理由地失败。

但是从实现上来看,这个方法还是和compareAndSet一模一样的。不过建议在使用的时候,谨慎对待,保不定什么时候,方法实现就修改了。

getAndIncrement(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically increments by one the element at index {@code i}.
*
* @param i the index
* @return the previous value
*/
public final long getAndIncrement(int i) {
return getAndAdd(i, 1);
}

获取位置i上的值返回,并对该值自增.

getAndDecrement(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically decrements by one the element at index {@code i}.
*
* @param i the index
* @return the previous value
*/
public final long getAndDecrement(int i) {
return getAndAdd(i, -1);
}

获取位置i上的值返回,并对该值自减.

getAndAdd(int, long)

1
2
3
4
5
6
7
8
9
10
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the previous value
*/
public final long getAndAdd(int i, long delta) {
return unsafe.getAndAddLong(array, checkedByteOffset(i), delta);
}

返回位置i上对应的值,并加上delta

incrementAndGet(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically increments by one the element at index {@code i}.
*
* @param i the index
* @return the updated value
*/
public final long incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}

对位置i上的值增1,并返回

decrementAndGet(int)

1
2
3
4
5
6
7
8
9
/**
* Atomically decrements by one the element at index {@code i}.
*
* @param i the index
* @return the updated value
*/
public final long decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}

对位置i上的值减1,并返回

addAndGet(int, long)

1
2
3
4
5
6
7
8
9
10
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the updated value
*/
public long addAndGet(int i, long delta) {
return getAndAdd(i, delta) + delta;
}

对位置i上的值加上delta,并返回最新的值

getAndUpdate(int, LongUnaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Atomically updates the element at index {@code i} with the results
* of applying the given function, returning the previous value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param i the index
* @param updateFunction a side-effect-free function
* @return the previous value
* @since 1.8
*/
public final long getAndUpdate(int i, LongUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
long prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsLong(prev);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}

返回位置i上的值,并对位置i上的值应用单元函数updateFunction,更新位置i上的值。这里要求函数updateFunction是幂等的,即,多次执行结果是一致的,因为CAS操作可能失败。

updateAndGet(int, LongUnaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Atomically updates the element at index {@code i} with the results
* of applying the given function, returning the updated value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param i the index
* @param updateFunction a side-effect-free function
* @return the updated value
* @since 1.8
*/
public final long updateAndGet(int i, LongUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
long prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsLong(prev);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

对位置i上的值应用单元函数updateFunction,并将应用后的值更新到位置i。要求单元函数是幂等的,因为CAS操作可能失败

getAndAccumulate(int, long, LongBinaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Atomically updates the element at index {@code i} with the
* results of applying the given function to the current and
* given values, returning the previous value. The function should
* be side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function is
* applied with the current value at index {@code i} as its first
* argument, and the given update as the second argument.
*
* @param i the index
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the previous value
* @since 1.8
*/
public final long getAndAccumulate(int i, long x,
LongBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
long prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}

返回位置i上的值,并对位置i上的值应用二元函数accumulatorFunction,设置在位置i上。这里同样要求二元函数具有幂等性,因为CAS操作可能会失败。

accumulateAndGet(int, long, LongBinaryOperator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Atomically updates the element at index {@code i} with the
* results of applying the given function to the current and
* given values, returning the updated value. The function should
* be side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function is
* applied with the current value at index {@code i} as its first
* argument, and the given update as the second argument.
*
* @param i the index
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the updated value
* @since 1.8
*/
public final long accumulateAndGet(int i, long x,
LongBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
long prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

对位置i上的值应用二元函数accumulatorFunction,设置在位置i上,然后返回位置i上的值。这里同样要求二元函数具有幂等性,因为CAS操作可能会失败。

toString()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns the String representation of the current values of array.
* @return the String representation of the current values of array
*/
public String toString() {
int iMax = array.length - 1;
if (iMax == -1)
return "[]";

StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; ; i++) {
b.append(getRaw(byteOffset(i)));
if (i == iMax)
return b.append(']').toString();
b.append(',').append(' ');
}
}

返回**[value, value,value]**,这样的字符串。

应用

下面就是用AtomicLongArray修改在多线程环境下存在问题的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class UseAtomicLongArray implements Runnable {
private static AtomicLongArray longArray = new AtomicLongArray(2);

@Override
public void run() {
for (int i = 0;i < 10000;i++) {
longArray.addAndGet(0, 1);
longArray.addAndGet(1, 1);
}
}

public static void main(String[] args) {
UseAtomicLongArray unsafe = new UseAtomicLongArray();

for (int i = 0;i < 10;i++) {
Thread thread = new Thread(unsafe);
thread.start();
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(longArray.get(0));
System.out.println(longArray.get(1));
}
}

再执行多少次都不会影响结果了。

0%