CAP理论和BASE理论

This commit is contained in:
罗祥
2020-05-06 17:43:59 +08:00
parent 29a6cc946b
commit 6ba5a17420
14 changed files with 503 additions and 654 deletions

View File

@@ -1,43 +1,64 @@
# ZooKeeper 分布式锁原理
<nav>
<a href="#一实现原理">一、实现原理</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#11-临时节点方案">1.1 临时节点方案</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#12-临时有序节点方案">1.2 临时有序节点方案</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#13-读写锁">1.3 读写锁</a><br/>
<a href="#二-Apache-Curator">二、 Apache Curator</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-基本使用">2.1 基本使用</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-源码解析">2.2 源码解析</a><br/>
</nav>
## 一、实现原理
JDK 原生的锁可以让不同**线程**之间以互斥的方式来访问共享资源,但如果想要在不同**进程**之间以互斥的方式来访问共享资源JDK 原生的锁就无能为力。此时可以使用 Redis 或 Zookeeper 来实现分布式锁。
JDK 原生的锁可以让不同**线程**之间以互斥的方式来访问共享资源,但如果想要在不同**进程**之间以互斥的方式来访问共享资源JDK 原生的锁就无能为力。此时可以使用 Zookeeper 来实现分布式锁。具体分为以下两种方案:
### 1.1 临时节点方案
![zookeeper_分布式锁_临时节点方法](../pictures/zookeeper_分布式锁_临时节点方法.png)
<div align="center"> <img src="../pictures/zookeeper_分布式锁_临时节点方法.png"/> </div>
临时节点方案的原理如下:
+ 让多个进程(或线程)竞争性地去创建同一个临时节点,由于 ZooKeeper 不允许存在两个完全相同节点,因此必然只有一个进程能够抢先创建成功
+ 假设进程 A 成功创建,则它获得该分布式锁。此时其他进程需要在 parent_node 上注册监听,监听其下所有子节点的变化,并挂起当前线程;
+ 让多个进程(或线程)竞争性地去创建同一个临时节点,由于 ZooKeeper 不允许存在两个完全相同节点,因此必然只有一个进程能够抢先创建成功
+ 假设进程 A 成功创建了节点,则它获得该分布式锁。此时其他进程需要在 parent_node 上注册监听,监听其下所有子节点的变化,并挂起当前线程;
+ 当 parent_node 下有子节点发生变化时候,它会通知所有在其上注册了监听的进程。这些进程需要判断是否是对应的锁节点上的删除事件。如果是,则让挂起的线程继续执行,并尝试再次获取锁。
这里之所以使用临时节点是为了避免死锁:进程 A 正常执行完业务逻辑后,会主动地去删除该节点,释放锁。但如果进程 A 意外宕机了,由于声明的是临时节点,因此该节点也会被移除,而避免死锁。
这里之所以使用临时节点是为了避免死锁:进程 A 正常执行完业务逻辑后,会主动地去删除该节点,释放锁。但如果进程 A 意外宕机了,由于声明的是临时节点,因此该节点也会被移除,而避免死锁。
临时节点方案的实现比较简单,但是其缺点也比较明显:
+ **缺点一**:当 parent_node 下其他锁变动或者被删除时,进程 BCD 也会收到通知,但是显然它们并不关心其他锁的释放情况。如果 parent_node 下存在大量的锁,并且程序处于高并发状态下,则 ZooKeeper 集群就需要频繁地通知客户端进程,这会带来大量的网络开销;
+ **缺点一**:当 parent_node 下其他锁变动或者被删除时,进程 BCD 也会收到通知,但是显然它们并不关心其他锁的释放情况。如果 parent_node 下存在大量的锁,并且程序处于高并发状态下,则 ZooKeeper 集群就需要频繁地通知客户端,这会带来大量的网络开销;
+ **缺点二**:采用临时节点方案创建的锁是非公平的,也就是说在进程 A 释放锁后,进程 BCD 发起重试的顺序与其收到通知的时间有关,而与其第一次尝试获取锁的时间无关,即与等待时间的长短无关。
当程序并发量不高时,可以采用该方案来实现,因为其实现比较简单。而如果程序并发量很高,则需要采用下面的临时有序节点方案:
### 1.2 临时有序节点方案
![zookeeper_分布式锁_临时有序节点方案](../pictures/zookeeper_分布式锁_临时有序节点方案.png)
<div align="center"> <img src="../pictures/zookeeper_分布式锁_临时有序节点方案.png"/> </div>
采用临时有序节点时,对应的流程如下:
+ 每个进程(或线程)都会尝试在 parent_node 下创建临时有序节点,根据临时有序节点的特性,所有的进程都会创建成功;
+ 然后每个进程需要获取 parent_node 下该锁的所有临时节点的信息,并判断自己是否是最小的一个节点,如果是,则代表获得该锁;
+ 然后每个进程需要获取当前 parent_node 下该锁的所有临时节点的信息,并判断自己是否是最小的一个节点,如果是,则代表获得该锁;
+ 如果不是,则挂起当前线程。并对其前一个节点注册监听(这里可以通过 exists 方法传入需要触发 Watch 事件);
+ 如上图所示,当进程 A 处理完成后,会触发进程 B 注册的 Watch 事件,此时进程 B 就知道自己获得了锁,从而可以将挂起的线程继续,开始业务的处理。
+ 如上图所示,当进程 A 处理完成后,会触发进程 B 注册的 Watch 事件,此时进程 B 就知道自己获得了锁,从而可以将挂起的线程继续,开始业务的处理。
这里需要注意的是:如果进程 B 创建了临时节点,并且通过比较后知道自己不是最小的一个节点,但还没有注册监听;而此时 A 进程恰好处理完成并删除了 01 节点,此时调用 exist 方法时会抛出 IllegalArgumentException 异常。这虽然是一个异常,但是却代表进程 B 获得了锁,因此进程 B 可以开始执行业务逻辑。
这里需要注意的是一种特殊的情况,其过程如下:
临时有序节点方案正好解决了临时节点方案的两个缺点:
+ 如果进程 B 创建了临时节点,并且通过比较后知道自己不是最小的一个节点,但还没有注册监听;
+ 而 A 进程此时恰好处理完成并删除了 01 节点;
+ 接着进程 B 再调用 exist 方法注册监听就会抛出 IllegalArgumentException 异常。这虽然是一个异常,通常代表前一个节点已经不存在了。
在这种情况下进程 B 应该再次尝试获取锁,如果获取到锁,则就可以开始业务的处理。下文讲解 Apache Curator 源码时也会再次说明这一点。
通过上面对的介绍,可以看出来临时有序节点方案正好解决了临时节点方案的两个缺点:
+ 每个临时有序节点只需要关心它的上一个节点,而不需要关心其他的额外节点和额外事件;
+ 实现的锁是公平的,先到达的进程创建的临时有序节点的值越小,能更快地获得锁。
+ 实现的锁是公平的,先到达的进程创建的临时有序节点的值越小,因此能更快地获得锁。
临时有序节点方案的另外一个优点是其能够实现共享锁,比如读写锁中的读锁。
@@ -48,7 +69,8 @@ JDK 原生的锁可以让不同**线程**之间以互斥的方式来访问共享
+ 对于读锁节点而言,其只需要关心前一个写锁节点的释放。如果前一个写锁释放了,则多个读锁节点对应的线程可以并发地读取数据;
+ 对于写锁节点而言,其只需要关心前一个节点的释放,而不需要关心前一个节点是写锁节点还是读锁节点。因为为了保证有序性,写操作必须要等待前面的读操作或者写操作执行完成。
![zookeeper_分布式读写锁](../pictures/zookeeper_分布式读写锁.png)
<div align="center"> <img src="../pictures/zookeeper_分布式读写锁.png"/> </div>
@@ -56,56 +78,57 @@ JDK 原生的锁可以让不同**线程**之间以互斥的方式来访问共享
### 2.1 基本使用
Apache Curator 是 ZooKeeper 的 Java 客户端,它基于临时有序节点方案实现了分布式锁、分布式读写锁等功能。基本使用如下
Apache Curator 是 ZooKeeper 的 Java 客户端,它基于临时有序节点方案实现了分布式锁、分布式读写锁等功能。使用前需要先导入 Apache Curator 和 ZooKeeper 相关的依赖,并保证 ZooKeeper 版本与服务器上 ZooKeeper 的版本一致
```xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
```
基本使用如下:
```java
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.105:2181")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("mySpace").build();
.connectString("192.168.0.105:2181")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("mySpace").build();
client.start();
// 1. 创建分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/distributed/myLock");
// 2.尝试获取分布式锁
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
System.out.println("模拟业务耗时");
Thread.sleep(3 * 1000);
} finally {
// 3.释放锁
lock.release();
}
try {
System.out.println("模拟业务耗时");
Thread.sleep(3 * 1000);
} finally {
// 3.释放锁
lock.release();
}
}
client.close();
```
这里需要事先导入 Apache Curator 和 ZooKeeper 相关的依赖,并保证 ZooKeeper 版本与服务器上 ZooKeeper 的版本一致
之后就可以启动多个程序进程来进行测试,此时 ZooKeeper 上的数据结构如下
```xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
```
<div align="center"> <img src="../pictures/zookeeper_分布式锁_cli.png"/> </div>
之后就可以启动多个程序进程来进程测试ZooKeeper 上的数据结构如下:
![zookeeper_分布式锁_cli](../pictures/zookeeper_分布式锁_cli.png)
在我们指定的路径下,会依次创建多个临时有序节点,而当业务逻辑处理完成后,这些节点就会被移除。这里我们使用的是单机版本的 ZooKeeper ,而集群环境下也是一样,和 Redis 主从模式下的延迟复制会导致数据不一致的情况不同ZooKeeper 各个节点上的数据一致性可以由其自身来进行保证。
在我们指定的路径下,会依次创建多个临时有序节点,而当业务逻辑处理完成后,这些节点就会被移除。这里我们使用的是单机版本的 ZooKeeper ,而集群环境下也是一样,和 Redis 主从模式下的延迟复制会导致数据不一致的情况不同ZooKeeper 集群各个节点上的数据一致性可以由其自身来进行保证。
@@ -115,7 +138,7 @@ Apache Curator 底层采用的是临时有序节点的实现方案,下面我
#### 1. 获取锁源码解析
上面最核心的获取锁的方法 `acquire()` ,其定义如下:
上面最核心的方法是获取锁的 `acquire()` 方法 ,其定义如下:
```java
@Override
@@ -124,7 +147,7 @@ public boolean acquire(long time, TimeUnit unit) throws Exception{
}
```
它在内部调用了 `internalLock()` 方法:
可以看到,它在内部调用了 `internalLock()` 方法internalLock 方法的源码如下
```java
// threadData是一个线程安全的Map其中Thread是持有锁的线程,LockData是锁数据
@@ -151,7 +174,7 @@ private boolean internalLock(long time, TimeUnit unit) throws Exception{
}
```
这里面真正去尝试创建锁的方法是 `attemptLock()`
面真正去尝试获取锁的方法是 `attemptLock()`
```java
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
@@ -163,7 +186,7 @@ String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Except
boolean hasTheLock = false;
boolean isDone = false;
// 当出现NoNodeException异常时候依靠该循环进行重试
// 当出现NoNodeException异常时候依靠该循环进行重试
while ( !isDone ){
isDone = true;
try{
@@ -184,7 +207,7 @@ String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Except
}
}
// 如果获取到锁,则跳出循环,并返回锁的路径
// 如果获取到锁,则跳出循环,并返回锁的路径
if ( hasTheLock ){
return ourPath;
}
@@ -212,71 +235,71 @@ public String createsTheLock(CuratorFramework client, String path, byte[] lockNo
}
```
这里创建好的临时节点的路径会作为参数传递给 `internalLockLoop()` 方法。在文章开头介绍原理时,我们说过每个线程创建好临时有序节点后,还需要判断它所创建的临时有序节点是否是当前最小的节点,`internalLockLoop()` 方法主要做的就是这事:
这里返回的临时有序节点的路径会作为参数传递给 `internalLockLoop()` 方法。在文章开头介绍原理时,我们说过每个线程创建好临时有序节点后,还需要判断它所创建的临时有序节点是否是当前最小的节点,`internalLockLoop()` 方法主要做的就是这事:
```java
private boolean internalLockLoop ( long startMillis, Long millisToWait, String ourPath) throws Exception {
// 是否持有锁
// 是否持有锁
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (revocable.get() != null) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
boolean doDelete = false;
try {
if (revocable.get() != null) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 如果连接ZooKeeper客户端处于启动状态也就是想要获取锁的进程仍然处于运行状态并且还没有获取到锁则循环继续
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
// 对所当前所有的子节点按照从小到大进行排序
List<String> children = getSortedChildren();
List<String> children = getSortedChildren();
// 将createsTheLock方法获得的临时有序节点的路径进行截取只保留节点名的部分
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 判断当前节点是否是最小的一个节点
PredicateResults predicateResults = driver.
PredicateResults predicateResults = driver.
getsTheLock(client, children, sequenceNodeName, maxLeases);
// 如果当前节点是最小的一个节点(排他锁情况),则此时就获得了锁
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
// 如果当前节点是最小的一个节点(排他锁情况),则此时就获得了锁
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
// 如果当前节点不是最小的一个节点,先拼接并获取前一个节点完整的路径
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
try {
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
try {
// 然后对前一个节点进行监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
// 如果设置了等待时间
if (millisToWait != null) {
if (millisToWait != null) {
// 将等待时间减去到目前为止所耗费的时间
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 如果等待时间小于0则说明我们耗费的时间已经超过了等待时间此时获取的锁无效需要删除它
if (millisToWait <= 0) {
if (millisToWait <= 0) {
//设置删除标志位,并退出循环
doDelete = true;
break;
}
// 如果还有剩余时间,则等待获取锁
wait(millisToWait);
} else {
doDelete = true;
break;
}
// 如果还有剩余时间,则在剩余时间内继续等待获取锁
wait(millisToWait);
} else {
// 如果没有设置等待时间,则持续等待获取锁
wait();
}
} catch (KeeperException.NoNodeException e) {
// 这个异常抛出时,代表对前一个节点设置监听时,前一个节点已经不存在(被释放),此时捕获该异常,
wait();
}
} catch (KeeperException.NoNodeException e) {
// 这个异常抛出时,代表对前一个节点设置监听时,前一个节点已经不存在(被释放),此时捕获该异常,
// 但不需要进行任何额外操作,因为循环会继续,就可以再次尝试获取锁
}
}
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
} finally {
// 如果抛出了异常或者超时都删除掉上一个方法createsTheLock创建的临时有序节点以便后面的进程进行锁的获取
if (doDelete) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
}
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
} finally {
// 如果抛出了异常或者超时,则代表该进程创建的锁无效,需要将已创建的锁删除。以便后面的进程继续尝试创建锁
if (doDelete) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
```
@@ -286,24 +309,24 @@ private boolean internalLockLoop ( long startMillis, Long millisToWait, String o
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
```
和上文介绍的一样,判断当前节点是否是持有锁的节点,在不同锁类型(如读写锁和互斥锁)的判断是不同的,因此 getsTheLock 方法有着不同的实现。这里以StandardLockInternalsDriver 为例,它使用的是互斥锁的判断规则:只要当前节点是最小的一个节点,就能持有锁:
和上文介绍的一样,判断当前节点是否是持有锁的节点,在不同锁类型(如读写锁和互斥锁)的判断是不同的,因此 getsTheLock 方法有着不同的实现。这里以StandardLockInternalsDriver 为例,它使用的是互斥锁的判断规则:只要当前节点是最小的一个节点,就能持有锁:
```java
public PredicateResults getsTheLock(CuratorFramework client, List<String> children,
String sequenceNodeName, int maxLeases) throws Exception {
// 获取当前节点在已经排好序的节点中的下标index
// 获取当前节点在已经排好序的节点中的下标index
int ourIndex = children.indexOf(sequenceNodeName);
// 如果ourIndexx小于0则抛出NoNodeException的异常
// 如果ourIndex小于0则抛出NoNodeException的异常
validateOurIndex(sequenceNodeName, ourIndex);
// 如果ourIndex小于maxLeases(默认值是1)则代表它就是0也就是从小到大排好序的集合中的第一个也就是最小的一个
// 如果ourIndex小于maxLeases(默认值是1)则代表它就是0也就是从小到大排好序的集合中的第一个也就是最小的一个
boolean getsTheLock = ourIndex < maxLeases;
// 如果是最小的一个,此时就已经获取到锁,不需要返回前一个节点的名称,否则需要返回前一个节点的名称,用于后续的监听操作
// 如果是最小的一个,此时就已经获取到锁,不需要返回前一个节点的名称,否则需要返回前一个节点的名称,用于后续的监听操作
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
```
这里解释一下 maxLease 这个参数的意义:默认值为 1就是互斥锁如果默认值大于 1假设 maxLease 的值是 5 5 个临时有序节点都可以认为是能持有锁的节点,此时最多可以有 5 个线程并发访问临界区, 在功能上类似于 Java 中Semaphore信号量机制 。
这里解释一下 maxLease 这个参数的意义:默认值为 1就是互斥锁如果默认值大于 1假设 maxLease 的值是 5最小的 5 个临时有序节点都可以认为是能持有锁的节点,此时最多可以有 5 个线程并发访问临界区, 在功能上类似于 Java 中Semaphore信号量机制 。
@@ -313,29 +336,29 @@ PredicateResults predicateResults = driver.getsTheLock(client, children, sequenc
```java
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
Thread currentThread = Thread.currentThread();
// 根据当前线程来获取锁信息
InterProcessMutex.LockData lockData = threadData.get(currentThread);
// 如果获取不到,则当前线程不是锁的持有者,此时抛出异常
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
InterProcessMutex.LockData lockData = threadData.get(currentThread);
// 如果获取不到,则当前线程不是锁的持有者,此时抛出异常
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// 因为Zookeeper实现的锁具有重入性所以将其计数器减少1
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
return;
}
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
return;
}
// 如果计数器的值小于0代表解锁次数大于加锁次数此时抛出异常
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 如果到达这一步则说明计数器的值正好等于0此时可以真正将节点删除,释放锁
internals.releaseLock(lockData.lockPath);
} finally {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 如果到达这一步则说明计数器的值正好等于0此时可以将节点真正的删除,释放锁
internals.releaseLock(lockData.lockPath);
} finally {
// 将锁信息从threadData移除
threadData.remove(currentThread);
}
threadData.remove(currentThread);
}
}
```