分布式锁的几种实现方式

分布式锁的几种实现方式

为什么需要分布式锁

在许多场景中,有些操作是只能被执行一次的,比如,用户消费之后的通知短信,发送多了,会招致投诉。在单机场景下,可以使用并发API或者锁解决,但是在分布式场景中,就没有那么简单了。

分布式场景是多进程场景,相同服务的不同实例可能都不知道对方的存在,这个时候要确保只有一个实例执行了某个操作,方法之一就是使用分布式锁。

分布式锁的特性

根据分布式场景的特性,分布式锁需要具备以下一些特性:

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

常见的分布式锁

下面我们看看常用的实现分布式锁的方法

数据库实现分布式锁

使用数据库表做分布式锁

要通过数据库实现分布式锁,最简单的就是给数据库里面插入一张锁表。

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `lock` (
`id` int(11) unsigned not null AUTO_INCREMENT,
`resource_name` varchar(128) not null default '',
`node_info` varchar(128) default null.
`count` int(11) not null default 0,
`desc` varchar(128) default null,
`update_time` timestamp null default null,
`create_time` timestamp null default null
PRIMARY KEY(`id`),
UNIQUE KEY `unique_resource` (`resource_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

当我们向锁住某个资源的时候,就插入一条记录,以资源名称和插入线程的名称(NodeInfo)为值。因为对resource_name做了唯一性约束,第二个请求达到数据库的时候先判断是否存在resource_name的记录,并且记录的node_info是不是当前线程的,是的话,就增加计数count,否则拒绝。

Lock示例

释放锁的时候,删除记录就行了

缺点:

数据库锁存在以下几个问题:

  1. 强烈依赖数据库的可用性,一旦单点数据库宕机,业务就会不可用
  2. 锁没有失效时间,一旦解锁失败就会导致锁记录一直存在,其他线程无法再次获得锁
  3. 锁只能是非阻塞的,insert失败就会马上失败,没有等待队列一说
  4. 非公平锁

基于数据库排他锁做分布式锁

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。

如果执行失败,就会一直阻塞,直至成功。使用排他锁可以解决阻塞和无法释放锁的问题。

锁定之后宕机,数据库会自己释放锁。

加锁操作可以做成如下这种:

1
2
connection.setAutoCommit(false);
select * from xxx for update;

解锁操作就可以是

1
connection.commit()

但是,如果这个排他锁一直不提交,就会占用数据库连接,长久会撑爆数据库的连接池。

Zookeeper实现分布式锁

基于Zookeeper实现分布式锁,需要具备相关的Zookeeper的信息。

  • Zookeeper 一般由多个节点构成(单数),采用 zab 一致性协议。因此可以将 Zookeeper 看成一个单点结构,对其修改数据其内部自动将所有节点数据进行修改而后才提供查询服务。
  • Zookeeper的数据以目录树的形式,每个目录称为 znode, znode 中可存储数据(一般不超过 1M),还可以在其中增加子节点。
  • 子节点有三种类型。序列化节点,每在该节点下增加一个节点自动给该节点的名称上自增。临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除。最后就是普通节点。
  • Watch 机制,client 可以监控每个节点的变化,当产生变化会给 client 产生一个事件。

原理

上锁,改为创建临时有序节点,每个上锁的动作都能创建成功,只是他们的序号不同。只有序号最小的节点才可以用有锁,如果这个节点不是最小的序号,那么就watch序号比它小1号的前一个节点。因此,基于这个原理的锁,是一个公平锁。

步骤:

  1. 新建一个/lock目录
  2. 每当需要竞争获取锁的时候,在/lock目录下创建一个有序的临时节点(EPHEMERAL_SEQUENTIAL)
  3. 判断节点序号是不是所有节点中最小的,是的话,获取锁成功。否则,watch序号比本身小1的前一个节点
  4. watch事件到来后,再次判断是否是最小的序号,成功获取锁后,执行代码,最后释放资源,删除当前节点。

Redis实现分布式锁

简单实现

借助setNx命令。

如果不存在则更新,其可以很好的用来实现我们的分布式锁。对于某个资源加锁我们只需要

setNx resourceName value

这里有个问题,加锁了之后如果机器宕机那么这个锁就不会得到释放所以会加入过期时间,加入过期时间需要和setNx同一个原子操作,在Redis2.8之前我们需要使用Lua脚本达到我们的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。

set resourceName value ex 5 nx

Redission

redisson是在redis基础上实现的一套开源解决方案,不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,宗旨是促进使用者对redis的关注分离,更多的关注业务逻辑的处理上。

Redission实现了Java的Lock接口

简单的使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main() {

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://xxxx1:xxx1")
.setPassword("xxxx1")
.setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer()
.setAddress("redis://xxxx2:xxx2")
.setPassword("xxxx2")
.setDatabase(0);

RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().
setAddress("redis://xxxx3:xxx3")
.setPassword("xxxx3")
.setDatabase(0);

RedissonClient redissonClient3 = Redisson.create(config3);

String lockName = "redlock-test";
RLock lock1 = redissonClient1.getLock(lockName);
RLock lock2 = redissonClient2.getLock(lockName);
RLock lock3 = redissonClient3.getLock(lockName);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
System.out.println("isLock = " + isLock);
if (isLock) {
// lock success, do something;
Thread.sleep(30000);
}
} catch (Exception e) {

} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
System.out.println("unlock success");
}
}

Redission加锁过程

Redission解锁过程

RedLock

RedLock是基于redis实现的分布式锁,它能够保证以下特性:

  1. 互斥性:在任何时候,只能有一个客户端能够持有锁;
  2. 避免死锁:当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)
  3. 容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁;
RedLock算法

假设有N个redis的master节点,这些节点是相互独立的(不需要主从或者其他协调的系统)。N推荐为奇数

客户端在获取锁时,需要做以下操作:

  • 获取当前时间戳,以微秒为单位
  • 使用相同的lockName和lockValue,尝试从N个节点获取锁。
  • 在获取锁时,要求等待获取锁的时间远小于锁的释放时间,如锁的lease_time为10s,那么wait_time应该为5-50毫秒;避免因为redis实例挂掉,客户端需要等待更长的时间才能返回,即需要让客户端能够fast_fail;如果一个redis实例不可用,那么需要继续从下个redis实例获取锁
  • 当从N个节点获取锁结束后,如果客户端能够从多数节点(N/2 + 1)中成功获取锁,且获取锁的时间小于失效时间,那么可认为,客户端成功获得了锁。(获取锁的时间=当前时间戳 - 步骤1的时间戳)
  • 客户端成功获得锁后,那么锁的实际有效时间 = 设置锁的有效时间 - 获取锁的时间。
  • 客户端获取锁失败后,N个节点的redis实例都会释放锁,即使未能加锁成功。
  • 当客户端无法获取到锁时,应该随机延时后进行重试,防止多个客户端在同一时间抢夺同一资源的锁(会导致脑裂,最终都不能获取到锁)。客户端获得超过半数节点的锁花费的时间越短,那么脑裂的概率就越低。所以,理想的情况下,客户端最好能够同时(并发)向所有redis发出set命令。
  • 当客户端从多数节点获取锁失败时,应该尽快释放已经成功获取的锁,这样其他客户端不需要等待锁过期后再获取。(如果存在网络分区,客户端已经无法和redis进行通信,那么此时只能等待锁过期后自动释放)
  • 向所有redis实例发送释放锁命令即可,不需要关心redis实例有没有成功上锁。

基于Consul实现的分布式锁

基于K/V

Consul的KV支持acquire和release操作。acquire/release操作实现了一种类似 Check-And-Set操作,这两个操作使用 Consul Session 进行操作:

  • acquire操作只有当 Key 的锁不存在持有者(Session)时才会返回 true,同时执行操作的 Session 会持有对该 Key 的锁;否则就返回false;
  • release操作则是使用指定的 Session 来释放某个Key的锁,如果指定的 Session无效,那么会返回 false,否则就会set设置Value值,并返回 true。

由于同一时间只有一个 Session 可以占有一个 Key 的锁,因此可以将一个 Key 当做一把锁,在访问临界资源时调用acquire操作实现 Lock 操作;在访问结束后调用release操作实现 Unlock 操作。

基于KV和Session的分布式锁

需要注意的是,上面的这个锁,如果一直没有释放的话,就永远释放不了了。因此,在创建session的时候需要给这个session加一个ttl,时间到了就自动释放锁。

样例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class Lock {
private static final String prefix = "lock/"; // 同步锁参数前缀
private ConsulClient consulClient;
private String sessionName;
private String sessionId = null;
private String lockKey;
/**
*
* @param consulClient
* @param sessionName 同步锁的session名称
* @param lockKey 同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询
*/
public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
this.consulClient = consulClient;
this.sessionName = sessionName;
this.lockKey = prefix + lockKey;
}
/**
* 获取同步锁
*
* @param block 是否阻塞,直到获取到锁为止
* @return
*/
public Boolean lock(boolean block) {
if (sessionId != null) {
throw new RuntimeException(sessionId + " - Already locked!");
}
sessionId = createSession(sessionName);
while(true) {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
return true;
} else if(block) {
continue;
} else {
return false;
}
}
}
/**
* 释放同步锁
*
* @return
*/
public Boolean unlock() {
PutParams putParams = new PutParams();
putParams.setReleaseSession(sessionId);
boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
consulClient.sessionDestroy(sessionId, null);
return result;
}
/**
* 创建session
* @param sessionName
* @return
*/
private String createSession(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}
}

基于Consul的分布式信号量

上一个小章节中,利用Consul做了分布式锁,基于Consul的KV,但是只用到了K,没有用到V。如果把V也用上的话,我们就可以实现一个分布式信号量。

当然,也是借助acquire和release操作

  • 信号量存储:semaphore/key

  • acquired操作:

    • 创建session
    • 锁定key竞争者:semaphore/key/session
    • 查询信号量:semaphore/key/.lock,可以获得如下内容(如果是第一次创建信号量,将获取不到,这个时候就直接创建)
    • 如果持有者已达上限,返回false,如果阻塞模式,就继续尝试acquired操作
    • 如果持有者未达上限,更新semaphore/key/.lock的内容,将当前线程的sessionId加入到holders中。注意:更新的时候需要设置cas,它的值是“查询信号量”步骤获得的“ModifyIndex”值,该值用于保证更新操作的基础没有被其他竞争者更新。如果更新成功,就开始执行具体逻辑。如果没有更新成功,说明有其他竞争者抢占了资源,返回false,阻塞模式下继续尝试acquired操作
    1
    2
    3
    4
    5
    6
    7
    {
    "limit": 3,
    "holders": [
    "90c0772a-4bd3-3a3c-8215-3b8937e36027",
    "93e5611d-5365-a374-8190-f80c4a7280ab"
    ]
    }
  • release操作:

    • 从semaphore/key/.lock的holders中移除当前sessionId
    • 删除semaphore/key/session
    • 删除当前的session

基于KV和Session的分布式信号量

实际应用的时候,必须加入TTL的session清理以及对.lock资源中的无效holder进行清理的机制。

参考资料

分布式锁的几种实现方式

分布式锁看这篇就够了

基于Consul的分布式锁实现

Distributed Semaphores with Sessions

0%