ReentrantReadWriteLock解析

ReentrantReadWriteLock解析

前文提到的synchronized关键和和ReentrantLock,它们都是独占式锁,排他锁。在同一时刻只能有一个线程获取多。这个就非常不适合那种读多写少的场景。

如果有多个线程需要读取共享数据,极少数甚至只有一个线程写共享数据的话,就非常不划算了。读操作对数据没有影响,完全可以并发进行。

于是Java提供了另外一个实现了Lock接口的ReentrantReadWriteLock(可重入读写锁)。使用这个锁时,多个读线程可以在同一个时刻访问共享资源。但是在写线程访问的时候,所有的读线程和其他写线程都会被阻塞。

它还有以下特点:

  1. 支持公平锁和非公平锁,默认非公平锁
  2. 可重入。不管是读锁还是写锁,线程在获取之后,还能再次获取。写锁在成功获取之后,也能获取读锁。
  3. 锁降级:遵循获取写锁,获取读锁,然后释放写锁的次序,写锁就能降级为读锁。

类结构

请见下图

ReentrantReadWriteLock类结构图

在分别分析读写锁之前,我们看下读写锁在Sync中时怎么计数的。

在ReentrantReadWriteLock的内部抽象静态类中有这么几句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

其中方法sharedCount是用作获取读锁被获取的次数。它将同步状态c右移16位,取它的高16位。

方法exclusiveCount是用作获取写锁被获取的次数。EXCLUSIVE_MASK为1左移16位,然后减1,即为0x0000FFFF。然后和当前的同步状态C相与,获取同步状态的低16位。

示意图如下:

同步状态高低位示意

写锁

写锁的获取

其他流程已经在AQS里面实现了,我们具体看一下写锁的tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

大体流程是:

  1. 如果同步状态c不为0,写锁被获取次数为0,说明此时有线程已经获取到了读锁,获取失败
  2. 亦或同步状态c不为0,当前线程不是获取写锁的线程,获取失败
  3. 亦或持有的写锁次数应超过最大可持有数目了。这里写锁只可有一个线程持有,但是可以重入MAX_COUNT次
  4. 如果上述情况均没有,则当前线程可获取写锁,设置同步状态,设置独占线程为线程本身

注意到还有一个writerShouldBlock方法,这个方法在公平锁和非公平锁中的实现逻辑是不一样的。

在公平锁中

1
2
3
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}

它是以队列里面是否有正在等候的线程来判断的。

而非公平锁中直接返回false,因为非公平锁是支持抢占的

1
2
3
final boolean writerShouldBlock() {
return false; // writers can always barge
}

写锁的释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

流程基本上和ReentrantLock差不多,因为写锁是同步状态的低16位表示的,所以,直接用getState()-releases就行了。

读锁

读锁和写锁不一样,它不是独占的,排他的,它是一种共享锁。同一时刻可以被多个线程获取。按照上一篇AQS文章中的介绍,读锁需要重写AQS中的tryAcquireShared和tryReleaseShared方法。

读锁的获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

JDK源码自带的注释已经说得很清楚了。

  1. 如果写锁被其他线程获取了,获取读锁失败
  2. 否则,获取读锁成功,更新同步状态,只更新同步状态c的高16位的值
  3. 无论是CAS失败或者同一线程再次获取读锁时,都会调用fullTryAcquireShared方法

fullTryAcquireShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

代码和tryAcquireShared类似。就是一个dead loop,不断去尝试设置同步状态。

读锁的释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

锁降级

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

可以看看官方文档对锁降级的示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

在释放写锁前,需要先获得读锁,然后再释放写锁。如果不先获取读锁,那么其他线程在这个线程释放写锁后可能会修改data,而这种修改对于这个线程是不可见的,从而在之后的use(data)中使用的是错误的值 。

0%