Java_函数式编程
183
SREADME.md
Normal file
@ -0,0 +1,183 @@
|
||||
# Full-Stack-Notes
|
||||
|
||||
<div align="center"> <img width="380px" src="pictures/full-stack-notes-logo.png"/> </div>
|
||||
<br/>
|
||||
|
||||
<div align="center">
|
||||
<img src="pictures/芽.png"/>
|
||||
<strong>一个处于萌芽阶段的知识库,用于持续分享自己的所见、所学、所思!</strong>
|
||||
</div>
|
||||
<br/>
|
||||
|
||||
<div align="right">
|
||||
<a href="">点击切换详细目录</a>
|
||||
</div>
|
||||
|
||||
## :coffee: JAVA
|
||||
|
||||
1. [Java 反射与注解](notes/Java_反射与注解.md)
|
||||
2. [Java 并发编程](notes/Java_并发编程.md)
|
||||
3. [Java 设计模式](notes/Java_设计模式.md)
|
||||
4. [Java 虚拟机](notes/Java_虚拟机.md)
|
||||
5. [JVM 性能监控之命令行工具](notes/JVM_性能监控之命令行工具.md)
|
||||
6. [JVM 性能监控之可视化工具](notes/JVM_性能监控之可视化工具.md)
|
||||
7. [Java NIO 核心组件详解](notes/Java_NIO.md)
|
||||
8. 函数式编程
|
||||
9. [Tomcat 架构解析](notes/Tomcat_架构解析.md)
|
||||
10. Java 集合类源码解析
|
||||
|
||||
<br/>
|
||||
|
||||
## :globe_with_meridians: 计算机与网络基础
|
||||
|
||||
1. [计算机网络模型](notes/计算机网络.md)
|
||||
|
||||
2. HTTP 协议详解
|
||||
|
||||
3. HTTPS 协议详解
|
||||
|
||||
4. 抓包神器 Wireshark
|
||||
|
||||
5. 计算机组成原理
|
||||
|
||||
<br/>
|
||||
|
||||
## :computer: 前端基础
|
||||
|
||||
1. [JavaScript 基础](notes/JavaScript_基础.md)
|
||||
|
||||
2. [ECMAScript 6.0 基础](notes/ES6_基础.md)
|
||||
|
||||
3. CSS 基础
|
||||
|
||||
4. JavaScript 设计模式
|
||||
|
||||
<br/>
|
||||
|
||||
## :dolphin: 数据库
|
||||
|
||||
### MySQL
|
||||
|
||||
1. [MySQL 核心概念](notes/MySQL_基础.md)
|
||||
|
||||
2. [MySQL 备份详解](notes/MySQL_备份.md)
|
||||
|
||||
3. [MySQL 复制详解](notes/MySQL_复制.md)
|
||||
|
||||
4. [MySQL 高可用架构之 PXC 集群](notes/MySQL_PXC集群.md)
|
||||
|
||||
5. [MyCat 读写分离与分库分表](notes/MySQL_Mycat中间件.md)
|
||||
|
||||
6. [MySQL 查询性能分析之 Explain](notes/MySQL_EXPLAIN.md)
|
||||
|
||||
### Redis
|
||||
|
||||
1. [Redis 基本数据类型和常用命令](notes/Redis_数据类型和常用命令.md)
|
||||
|
||||
2. [Redis AOF 和 RDB 持久化策略原理](notes/Redis_持久化.md)
|
||||
|
||||
3. [Redis 哨兵模式](notes/Redis_哨兵模式.md)
|
||||
|
||||
4. [Redis 集群模式](notes/Redis_集群模式.md)
|
||||
|
||||
5. 使用 Redis 实现分布式锁
|
||||
|
||||
|
||||
### MongoDB
|
||||
|
||||
1. [MongoDB 基础](notes/MongoDB_基础.md)
|
||||
|
||||
2. [MongoDB 索引](notes/MongoDB_索引.md)
|
||||
|
||||
3. [MongoDB 聚合](notes/MongoDB_聚合.md)
|
||||
|
||||
4. [MongoDB 复制](notes/MongoDB_复制.md)
|
||||
|
||||
5. [MongoDB 分片](notes/MongoDB_分片.md)
|
||||
|
||||
<br/>
|
||||
|
||||
## :whale: 系统与容器
|
||||
|
||||
1. [Linux 常用 Shell 命令](notes/Linux_常用Shell命令.md)
|
||||
|
||||
2. [Sehll 脚本编程基础](notes/Shell_基础.md)
|
||||
|
||||
3. [Docker 基础](notes/Docker_基础.md)
|
||||
|
||||
<br/>
|
||||
|
||||
## :package: 常用技术栈
|
||||
|
||||
|
||||
### RabbitMQ
|
||||
|
||||
1. [RabbitMQ 核心概念](notes/RabbitMQ_基础.md)
|
||||
|
||||
2. [RabbitMQ 客户端开发](notes/RabbitMQ_客户端开发.md)
|
||||
|
||||
3. [HAProxy + KeepAlived 搭建 RabbitMQ 高可用集群](notes/RabbitMQ_高可用集群架构.md)
|
||||
|
||||
### Nginx
|
||||
|
||||
1. [Nginx 基础之静态网站部署,负载均衡,动静分离](notes/Nginx_基础.md)
|
||||
2. HTTP 模块详解
|
||||
3. Nginx 性能优化
|
||||
|
||||
|
||||
### Kafka
|
||||
|
||||
1. [Kafka 简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka简介.md)
|
||||
2. [基于 Zookeeper 搭建 Kafka 高可用集群](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/基于Zookeeper搭建Kafka高可用集群.md)
|
||||
3. [Kafka 生产者详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka生产者详解.md)
|
||||
4. [Kafka 消费者详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka消费者详解.md)
|
||||
5. [深入理解 Kafka 副本机制](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka深入理解分区副本机制.md)
|
||||
|
||||
|
||||
### ZooKeeper
|
||||
|
||||
1. [ZooKeeper 简介及核心概念](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Zookeeper简介及核心概念.md)
|
||||
2. [ZooKeeper 单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md)
|
||||
3. [ZooKeeper 常用 Shell 命令](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Zookeeper常用Shell命令.md)
|
||||
4. [ZooKeeper Java 客户端](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Zookeeper_Java客户端Curator.md)
|
||||
5. [ZooKeeper ACL 权限控制](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Zookeeper_ACL权限控制.md)
|
||||
6. 使用 ZooKeeper 实现分布式锁
|
||||
|
||||
## ElasticSearch
|
||||
|
||||
TODO
|
||||
|
||||
<br/>
|
||||
|
||||
## :rocket: 测试与运维
|
||||
|
||||
1. 性能测试之 Jmeter
|
||||
2. 性能测试之 LoadRunner
|
||||
3. Jenkins 持续交付与自动化部署
|
||||
|
||||
<br/>
|
||||
|
||||
## :bullettrain_side: 微服务与分布式
|
||||
|
||||
1. 分布式锁的实现
|
||||
2. 分布式选举算法
|
||||
3. 分布式事务实现原理
|
||||
4. 分布式全局 ID 的生成
|
||||
5. CAP 理论和 BASE 理论
|
||||
|
||||
<br/>
|
||||
|
||||
## :hammer_and_wrench: 常用软件安装
|
||||
|
||||
1. [Redis 单机环境安装](notes/installation/Redis单机环境搭建.md)
|
||||
2. [RabbitMQ 单机环境安装](notes/installation/RabbitMQ单机环境搭建.md)
|
||||
3. [Nginx 单机环境安装](notes/installation/Nginx编译方式安装.md)
|
||||
4. [MySQL 单机环境安装](notes/installation/MySQL单机环境搭建.md)
|
||||
5. [MongoDB 单机环境安装](notes/installation/MongoDB单机环境搭建.md)
|
||||
6. [ElasticSearch + Kibana 单机环境安装](notes/installation/ElasticSearch+Kibana单机环境搭建.md)
|
||||
|
||||
<br>
|
||||
|
||||
<div align="center"> <img width="200px" src="pictures/blog-logo.png"/> </div>
|
||||
|
||||
<div align="center"> <a href = "https://blog.csdn.net/m0_37809146"> 欢迎关注我的博客:https://blog.csdn.net/m0_37809146</a> </div>
|
54
code/Java/stream-tutorial/pom.xml
Normal file
@ -0,0 +1,54 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.example</groupId>
|
||||
<artifactId>stream-tutorial</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>8</source>
|
||||
<target>8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
|
||||
<dependency>
|
||||
<groupId>org.redisson</groupId>
|
||||
<artifactId>redisson</artifactId>
|
||||
<version>3.12.5</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
|
||||
<dependency>
|
||||
<groupId>redis.clients</groupId>
|
||||
<artifactId>jedis</artifactId>
|
||||
<version>3.2.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-framework</artifactId>
|
||||
<version>4.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-recipes</artifactId>
|
||||
<version>4.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>3.4.14</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,41 @@
|
||||
package com.heibaiying;
|
||||
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DistributedLock {
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
|
||||
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
|
||||
CuratorFramework client = CuratorFrameworkFactory.builder()
|
||||
.connectString("192.168.0.105:2181")
|
||||
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
|
||||
.namespace("mySpace").build();
|
||||
client.start();
|
||||
|
||||
// 1. 创建分布式锁
|
||||
InterProcessMutex lock = new InterProcessMutex(client, "/distributed/myLock");
|
||||
|
||||
// 2.尝试获取分布式锁
|
||||
if (lock.acquire(10, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
System.out.println("模拟业务耗时");
|
||||
Thread.sleep(3 * 1000);
|
||||
} finally {
|
||||
// 3.释放锁
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
client.close();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,70 @@
|
||||
package com.heibaiying;
|
||||
|
||||
public class Employee {
|
||||
private String name;
|
||||
private String gender;
|
||||
private String company;
|
||||
private int age;
|
||||
private boolean isOfficial;
|
||||
|
||||
public Employee(String name, String gender, String company, int age) {
|
||||
this.name = name;
|
||||
this.gender = gender;
|
||||
this.company = company;
|
||||
this.age = age;
|
||||
}
|
||||
|
||||
Employee(String name, int age,boolean isOfficial) {
|
||||
this.name = name;
|
||||
this.age = age;
|
||||
this.isOfficial = isOfficial;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Employee{" +
|
||||
"name='" + name + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
public boolean isOfficial() {
|
||||
return isOfficial;
|
||||
}
|
||||
|
||||
public void setOfficial(boolean official) {
|
||||
isOfficial = official;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getGender() {
|
||||
return gender;
|
||||
}
|
||||
|
||||
public void setGender(String gender) {
|
||||
this.gender = gender;
|
||||
}
|
||||
|
||||
public String getCompany() {
|
||||
return company;
|
||||
}
|
||||
|
||||
public void setCompany(String company) {
|
||||
this.company = company;
|
||||
}
|
||||
|
||||
public int getAge() {
|
||||
return age;
|
||||
}
|
||||
|
||||
public void setAge(int age) {
|
||||
this.age = age;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package com.heibaiying;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class StreamTest {
|
||||
public static void main(String[] args) {
|
||||
System.out.println(UUID.randomUUID() + ":" + Thread.currentThread().getId());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 进行求和
|
||||
*
|
||||
* @param list
|
||||
* @param initValue
|
||||
* @param binaryOperator
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public static <T> T reduce(List<T> list, T initValue, BinaryOperator<T> binaryOperator) {
|
||||
for (T t : list) {
|
||||
initValue = binaryOperator.apply(initValue, t);
|
||||
}
|
||||
return initValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 集合过滤
|
||||
*
|
||||
* @param list 待过滤的集合
|
||||
* @param predicate 函数式接口
|
||||
* @param <T> 集合中元素的类型
|
||||
* @return 满足条件的元素的集合
|
||||
*/
|
||||
public static <T> List<T> filter(List<T> list, CustomPredicate<T> predicate) {
|
||||
ArrayList<T> result = new ArrayList<>();
|
||||
for (T t : list) {
|
||||
if (predicate.test(t)) result.add(t);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 定义接口
|
||||
*
|
||||
* @param <T> 参数类型
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface CustomPredicate<T> {
|
||||
// 判断是否满足过滤标准
|
||||
boolean test(T t);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
log4j.rootLogger=INFO, SYSLOG
|
||||
|
||||
log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
|
||||
log4j.appender.SYSLOG.syslogHost=127.0.0.1
|
||||
log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.SYSLOG.layout.conversionPattern=%d{ISO8601} %-5p [%t] %c{2} %x - %m%n
|
||||
log4j.appender.SYSLOG.Facility=LOCAL1
|
40
notes/Https.md
Normal file
@ -0,0 +1,40 @@
|
||||
# HTTPS
|
||||
|
||||
## 一、核心概念
|
||||
|
||||
### 1.1 SSL
|
||||
|
||||
安全套接层(英语:Secure Sockets Layer,缩写:SSL)是一种安全协议,目的是为互联网通信提供安全保障,最早由网景公司(Netscape)推出。SSL 协议有三个版本,分别是 SSL v1、SSL v2、SSL v3:
|
||||
|
||||
- v1 版本从未公开过,因为存在严重的安全漏洞。
|
||||
- v2 版本在1995年2月发布,但因为存在多个严重的安全漏洞而被 v3 版本替代。
|
||||
- v3 版本在1996年发布,是由网景公司完全重新设计的。
|
||||
|
||||
### 1.2 TLS
|
||||
|
||||
1966 年,TETF(Internet Engineering Task Force)组织在 SLL v3 的基础进一步进行了标准化,微软为这个新的协议取名为 TLS v1.0,这也就是TLS(Transport Layer Security)的由来。之后 TLS 继续发布了 v1.1,v1.2,v1.3 版本协议,当前主流的版本为 v1.2。
|
||||
|
||||
### 1.3 OpenSSL
|
||||
|
||||
OpenSSL 是一个开源的底层密码库,封装了所有的密码学算法,并为 TLS/SSL 提供了功能完善的工具库,因此它是 TLS/SSL 协议的具体实现。
|
||||
|
||||
### 1.4 HTTPS
|
||||
|
||||
HTTPS (Hyper Text Transfer Protocol over SecureSocket Layer)是在 HTTP 的基础上通过 SSL/TLS 层来进行传输加密和身份认证,进而保证通讯的安全性。除此之外它的报文结构、请求方法、连接管理等都完全沿用 HTTP 原有的模式,因此可以很方便地将原有 HTTP 服务转换为 HTTPS 服务。
|
||||
|
||||
|
||||
|
||||
## 二、数据安全
|
||||
|
||||
HTTPS 的数据安全主要是通过 SSL/TLS 协议来进行实现的,SSL/TLS 则主要采用了以下方式来保证传输的安全:
|
||||
|
||||
### 2.1 非对称加密
|
||||
|
||||
|
||||
|
||||
### 2.2 对称加密
|
||||
|
||||
|
||||
|
||||
## 三、握手过程
|
||||
|
510
notes/Java_函数式编程.md
Normal file
@ -0,0 +1,510 @@
|
||||
# Java 函数式编程
|
||||
|
||||
## 一、Lambda
|
||||
|
||||
### 1.1 格式
|
||||
|
||||
Java 从 1.8 版本开始支持 Lambda 表达式,通过 Lambda 表达式我们可以将一个函数作为参数传入方法中。在 JDK 1.8 之前,我们只能通过匿名表达式来完成类似的功能,但是匿名表达式比较繁琐,存在大量的冗余代码,不利于将行为参数化,而采用 Lamdba 则能很好的解决这个问题。Lambda 表达式的基本语法如下:
|
||||
|
||||
```java
|
||||
(parameters) -> expression
|
||||
```
|
||||
|
||||
或采用花括号的形式:
|
||||
|
||||
```java
|
||||
(parameters) -> { statements; }
|
||||
```
|
||||
|
||||
Lambda 表达式具有如下特点:
|
||||
|
||||
- **可选的参数:**不需要声明参数类型,编译器会从上下文自动进行推断;
|
||||
- **可选的参数圆括号:**当且仅当只有一个参数时,圆括号可以省略;
|
||||
- **可选的花括号:**如果主体只有一个表达式,则无需使用花括号;
|
||||
- **可选的返回关键字:**如果主体只有一个表达式,则该表达式的值就是整个 Lambda 表达式的返回值,此时不需要使用 return 关键字进行显式的返回。
|
||||
|
||||
### 1.2 行为参数化
|
||||
|
||||
上面我们说过,Lambda 表达式主要解决的是行为参数化的问题,而什么是行为参数化?下面给出一个具体的示例:
|
||||
|
||||
```java
|
||||
/**
|
||||
* 定义函数式接口
|
||||
* @param <T> 参数类型
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface CustomPredicate<T> {
|
||||
boolean test(T t);
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
/**
|
||||
* 集合过滤
|
||||
* @param list 待过滤的集合
|
||||
* @param predicate 函数式接口
|
||||
* @param <T> 集合中元素的类型
|
||||
* @return 满足条件的元素的集合
|
||||
*/
|
||||
public static <T> List<T> filter(List<T> list, CustomPredicate<T> predicate) {
|
||||
ArrayList<T> result = new ArrayList<>();
|
||||
for (T t : list) {
|
||||
// 将满足条件的元素添加到返回集合中
|
||||
if (predicate.test(t)) result.add(t);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
针对不同类型的集合,我们可以通过传入不同的 Lambda 表达式作为参数来表达不同的过滤行为,这就是行为参数化:
|
||||
|
||||
```java
|
||||
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
|
||||
filter(integers, x -> x % 2 == 0); // 过滤出所有偶数
|
||||
|
||||
List<Employee> employees = Arrays.asList(
|
||||
new Employee("张某", 21, true),
|
||||
new Employee("李某", 30, true),
|
||||
new Employee("王某", 45, false));
|
||||
filter(employees, employee -> employee.getAge() > 25); // 过滤出所有年龄大于25的员工
|
||||
```
|
||||
|
||||
需要注意的是上面我们声明接口时,使用了 `@FunctionalInterface` 注解,它表示当前的接口是一个函数式接口。函数式接口就是只含有一个抽象方法的接口;即一个接口不论含有多少个默认方法和静态方法,只要它只有一个抽象方法,它就是一个函数式接口。使用 `@FunctionalInterface` 修饰后,当该接口有一个以上的抽象方法时,编译器就会进行提醒。
|
||||
|
||||
任何使用到函数式接口的地方,都可以使用 Lambda 表达式进行简写。例如 Runnable 接口就是一个函数式接口,我们可以使用 Lambda 表达式对其进行简写:
|
||||
|
||||
```java
|
||||
new Thread(() -> {
|
||||
System.out.println("hello");
|
||||
});
|
||||
```
|
||||
|
||||
### 1.3 方法引用和构造器引用
|
||||
|
||||
紧接上面的例子,如果我们需要过滤出所有的正式员工,除了可以写成下面的形式外:
|
||||
|
||||
```java
|
||||
filter(employees, employee -> employee.isOfficial());
|
||||
```
|
||||
|
||||
还可以使用方法引用的形式进行简写:
|
||||
|
||||
```java
|
||||
filter(employees, Employee::isOfficial);
|
||||
```
|
||||
|
||||
除了方法引用外,还可以对构造器进行引用,示例如下:
|
||||
|
||||
```java
|
||||
Stream<Integer> stream = Stream.of(1, 3, 5, 2, 4);
|
||||
stream.collect(Collectors.toCollection(ArrayList::new)); //等价于 toCollection(()->new ArrayList<>())
|
||||
```
|
||||
|
||||
方法引用和构造器引用的目的都是为了让代码更加的简洁。
|
||||
|
||||
|
||||
|
||||
## 二、函数式接口
|
||||
|
||||
通常我们不需要自定义函数式接口,JDK 中内置了大量函数式接口,基本可以满足大多数场景下的使用需求,最基本的四种如下:
|
||||
|
||||
**1. Consumer\<T>**:消费型接口,消费输入的变量,没有返回值:
|
||||
|
||||
```java
|
||||
@FunctionalInterface
|
||||
public interface Consumer<T> {
|
||||
void accept(T t);
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
**2. Consumer\<T>**:供给型接口,供给变量:
|
||||
|
||||
```java
|
||||
@FunctionalInterface
|
||||
public interface Supplier<T> {
|
||||
T get();
|
||||
}
|
||||
```
|
||||
|
||||
**3. Function<T, R>**:对输入类型为 T 的变量执行特定的转换操作,并返回类型为 R 的返回值:
|
||||
|
||||
```java
|
||||
@FunctionalInterface
|
||||
public interface Function<T, R> {
|
||||
R apply(T t);
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
**4. Predicate\<T>**:判断类型为 T 的变量是否满足特定的条件,如果满足则返回 true,否则返回 flase:
|
||||
|
||||
```java
|
||||
@FunctionalInterface
|
||||
public interface Predicate<T> {
|
||||
boolean test(T t);
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
其他函数式接口都是这四种基本类型的延伸和扩展。以 BiFunction 和 BinaryOperator 接口为例:
|
||||
|
||||
+ **BiFunction<T, U, R>**:是函数型接口 Function<T, R> 的扩展,Function 只能接收一个入参;而 BiFunction 可以用于接收两个不同类型的入参;
|
||||
+ **BinaryOperator\<T>**:是 BiFunction 的一种特殊化情况,即两个入参和返回值的类型均相同,通常用于二元运算:
|
||||
|
||||
```java
|
||||
@FunctionalInterface
|
||||
public interface BiFunction<T, U, R> {
|
||||
R apply(T t, U u);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface BinaryOperator<T> extends BiFunction<T,T,T> {
|
||||
....
|
||||
}
|
||||
```
|
||||
|
||||
使用示例如下:
|
||||
|
||||
```java
|
||||
public static void main(String[] args) {
|
||||
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
|
||||
reduce(integers, 0, (a, b) -> a + b); // 求和 输出:15
|
||||
reduce(integers, 1, (a, b) -> a * b); // 求积 输出:120
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 执行归约操作
|
||||
*/
|
||||
public static <T> T reduce(List<T> list, T initValue, BinaryOperator<T> binaryOperator) {
|
||||
for (T t : list) {
|
||||
initValue = binaryOperator.apply(initValue, t);
|
||||
}
|
||||
return initValue;
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 三、创建流
|
||||
|
||||
JDK 1.8 中最主要的变化是引入了流,通过流、Lamda 表达式以及函数式接口,可以高效地完成数据的处理。创建流通常有以下四种方法:
|
||||
|
||||
**1. 由值创建**
|
||||
|
||||
使用静态方法 `Stream.of()` 由指定的值进行创建:
|
||||
|
||||
```java
|
||||
Stream<String> stream = Stream.of("a", "b ", "c", "d");
|
||||
```
|
||||
|
||||
**2. 由集合或数组创建**
|
||||
|
||||
使用静态方法 `Arrays.stream()` 由指定的数组进行创建:
|
||||
|
||||
```java
|
||||
String[] strings={"a", "b ", "c", "d"};
|
||||
Stream<String> stream = Arrays.stream(strings);
|
||||
```
|
||||
|
||||
调用集合类的 `stream()` 方法进行创建:
|
||||
|
||||
```shell
|
||||
List<String> strings = Arrays.asList("a", "b ", "c", "d");
|
||||
Stream<String> stream = strings.stream();
|
||||
```
|
||||
|
||||
`stream()` 方法定义在 `Collection` 接口中,它是一个默认方法,因此大多数的集合都可以通过该方法转换为流:
|
||||
|
||||
```java
|
||||
public interface Collection<E> extends Iterable<E> {
|
||||
default Stream<E> stream() {
|
||||
return StreamSupport.stream(spliterator(), false);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**3. 由文件创建**
|
||||
|
||||
```java
|
||||
try (Stream<String> lines = Files.lines(Paths.get("pom.xml"), StandardCharsets.UTF_8)) {
|
||||
lines.forEach(System.out::println);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
```
|
||||
|
||||
**4. 由函数创建**
|
||||
|
||||
除了以上方法外,还可以通过 `Stream.iterate()` 和 `Stream.generate()` 方法来来创建无限流:
|
||||
|
||||
+ `Stream.iterate()` 接受两个参数:第一个是初始值,第二个参数是一个输入值和输出值相同的函数型接口。它主要用于迭代式的产生新的元素,示例如下:
|
||||
|
||||
```java
|
||||
// 依次输出1到9
|
||||
Stream.iterate(0, x -> x + 1).limit(10).forEach(System.out::print);
|
||||
```
|
||||
|
||||
+ `Stream.generate()` 接收一个供应型函数作为参数,用于按照该函数产生新的元素:
|
||||
|
||||
```java
|
||||
// 依次输出随机数
|
||||
Stream.generate(Math::random).limit(10).forEach(System.out::print);
|
||||
```
|
||||
|
||||
## 四、操作流
|
||||
|
||||
### 4.1 基本操作
|
||||
|
||||
当流创建后,便可以利用 Stream 类的各种方法对其上数据进行各种处理,常用的方法如下:
|
||||
|
||||
| 操作 | 作用 | 返回类型 | 使用的类型/函数式接口 |
|
||||
| --------- | ------------------------------ | ------------ | ---------------------- |
|
||||
| filter | 过滤符合条件的元素 | Stream\<T> | Predicate\<T> |
|
||||
| distinct | 过滤重复元素 | Stream\<T> | |
|
||||
| skip | 跳过指定数量的元素 | Stream\<T> | long |
|
||||
| limit | 限制元素的数量 | Stream\<T> | long |
|
||||
| map | 对元素执行特定转换操作 | Stream\<T> | Function<T,R> |
|
||||
| flatMap | 将元素扁平化后执行特定转换操作 | Stream\<T> | Function<T,Stream\<R>> |
|
||||
| sorted | 对元素进行排序 | Stream\<T> | Comparator\<T> |
|
||||
| anyMatch | 是否存在指定元素满足特定条件 | boolean | Predicate\<T> |
|
||||
| noneMatch | 是否所有元素都不满足特定条件 | boolean | Predicate\<T> |
|
||||
| allMatch | 是否所有元素都满足特定条件 | boolean | Predicate\<T> |
|
||||
| findAny | 返回任意一个满足指定条件的元素 | Optional\<T> | |
|
||||
| findFirst | 返回第一个满足指定条件的元素 | Optional\<T> | |
|
||||
| forEach | 对所有元素执行特定的操作 | void | Cosumer\<T> |
|
||||
| collect | 对所有元素指定特定的收集操作 | R | Collector<T, A, R> |
|
||||
| reduce | 对元素依次执行归约操作 | Optional\<T> | BinaryOperator\<T> |
|
||||
| count | 计算流中元素的数量 | long | |
|
||||
|
||||
> 注:上表中返回类型为 Stream\<T> 的操作都是中间操作,代表还可以继续调用其它方法对流进行处理。返回类型为其它的操作都是终止操作,代表处理过程到此为止。
|
||||
|
||||
使用示例如下:
|
||||
|
||||
```java
|
||||
Stream.iterate(0, x -> x + 1) // 构建流
|
||||
.limit(20) // 限制元素的个数
|
||||
.skip(10) // 跳过前10个元素
|
||||
.filter(x -> x % 2 == 0) // 过滤出所有偶数
|
||||
.map(x -> "偶数:" + x) // 对元素执行转换操作
|
||||
.forEach(System.out::println); // 打印出所有元素
|
||||
```
|
||||
|
||||
输出结果如下:
|
||||
|
||||
```shell
|
||||
偶数:10
|
||||
偶数:12
|
||||
偶数:14
|
||||
偶数:16
|
||||
偶数:18
|
||||
```
|
||||
|
||||
上表的 `flatMap()` 方法接收一个参数,它是一个函数型接口 `Function<? super T, ? extends Stream<? extends R>> mapper`,该接口用于将流中的元素转换为 `Stream` ,从而可以将原有的元素进行扁平化:
|
||||
|
||||
```java
|
||||
String[] strings = {"hello", "world"};
|
||||
|
||||
Arrays.stream(strings)
|
||||
.map(x -> x.split("")) // 拆分得到: ['h','e','l','l','o'],['w','o','r','l','d']
|
||||
.flatMap(x -> Arrays.stream(x)) // 进行扁平化处理得到:'h','e','l','l','o','w','o','r','l','d'
|
||||
.forEach(System.out::println);
|
||||
```
|
||||
|
||||
上表的 `reduce()` 方法接收两个参数:第一个参数表示执行归约操作的初始值;第二个参数是上文我们介绍过的函数式接口 `BinaryOperator<T>` ,使用示例如下:
|
||||
|
||||
```java
|
||||
Stream.iterate(0, x -> x + 1).limit(10)
|
||||
.reduce(0, (a, b) -> a + b); //进行求和操作
|
||||
```
|
||||
|
||||
### 4.2 数值流
|
||||
|
||||
上面的代码等效于对 Stream 中的所有元素执行了求和操作,因此我们还可以调用简便方法 `sum()` 来进行实现,但是需要注意的是上面 `Stream.iterate()` 生成流中的元素类型都是包装类型:
|
||||
|
||||
```java
|
||||
Stream<Integer> stream = Stream.iterate(0, x -> x + 1); //包装类型Integer
|
||||
```
|
||||
|
||||
而 `sum()` 方法则是定义在 IntStream 上,此时需要将流转换为具体的数值流,对应的方法是 `mapToInt()`:
|
||||
|
||||
````java
|
||||
Stream.iterate(0, x -> x + 1).limit(10).mapToInt(x -> x).sum();
|
||||
````
|
||||
|
||||
类似的方法还有 `mapToLong()` 和 `mapToDouble()` 。如果你想要将数值流转换为原有的流,相当于对其中的元素进行装箱操作,此时可以调用 `boxed()` 方法:
|
||||
|
||||
```java
|
||||
IntStream intStream = Stream.iterate(0, x -> x + 1).limit(10).mapToInt(x -> x);
|
||||
Stream<Integer> boxed = intStream.boxed();
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 五、收集器
|
||||
|
||||
Stream 中最强大一个终止操作是 `collect()` ,它接收一个收集器 Collector 作为参数,可以将流中的元素收集到集合中,或进行分组、分区等操作。Java 中内置了多种收集器的实现,可以通过 Collectors 类的静态方法进行调用,常用的如下:
|
||||
|
||||
| 工厂方法 | 返回类型 | 用于 |
|
||||
| ----------------- | --------------------- | ------------------------------------------------------------ |
|
||||
| toList | List\<T> | 把流中所有元素收集到 List 中 |
|
||||
| toSet | Set\<T> | 把流中所有元素收集到 Set 中 |
|
||||
| toCollection | Collection\<T> | 把流中所有元素收集到指定的集合中 |
|
||||
| counting | Long | 计算流中所有元素的个数 |
|
||||
| summingInt | Integer | 将流中所有元素转换为整数,并计算其总和 |
|
||||
| averagingInt | Double | 将流中所有元素转换为整数,并计算其平均值 |
|
||||
| summarizingInt | IntSummaryStatistics | 将流中所有元素转换为整数,并返回值统计值,包含最大值、最小值、<br/>总和与平均值等信息 |
|
||||
| joining | String | 将流中所有元素转换为字符串,并使用给定连接符进行连接 |
|
||||
| maxBy | Optional\<T> | 查找流中最大元素的 Optional |
|
||||
| minBy | Optional\<T> | 查找流中最小元素的 Optional |
|
||||
| reducing | 规约操作产生的类型 | 对流中所有元素执行归约操作 |
|
||||
| collectingAndThen | 转换返回的类型 | 把流中所有元素收集到指定的集合中,再对集合执行特定转换操作 |
|
||||
| groupingBy | Map<K,List\<T>> | 对流中所有元素执行分组操作 |
|
||||
| partitionBy | Map<Boolean,List\<T>> | 对流中所有元素执行分区操作 |
|
||||
|
||||
使用示例如下:
|
||||
|
||||
```java
|
||||
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 4, 5, 6);
|
||||
|
||||
stream.collect(Collectors.toSet()); // [1, 2, 3, 4, 5, 6]
|
||||
stream.collect(Collectors.toList()); // [1, 2, 3, 4, 4, 5, 6]
|
||||
stream.collect(Collectors.toCollection(ArrayList::new)); // [1, 2, 3, 4, 4, 5, 6]
|
||||
stream.collect(Collectors.counting()); // 7 等效于 stream.count();
|
||||
stream.collect(Collectors.summarizingInt(x -> x)); // IntSummaryStatistics{count=7, sum=25, min=1, average=3.571429, max=6}
|
||||
stream.collect(Collectors.maxBy((Integer::compareTo))); // Optional[6]
|
||||
stream.collect(Collectors.reducing(1, (a, b) -> a * b)); // 等效于 stream.reduce(1, (a, b) -> a * b);
|
||||
collect(Collectors.collectingAndThen(Collectors.toSet(), Set::size)); // 先把所有元素收集到Set中,再计算Set的大小
|
||||
```
|
||||
|
||||
> 注意:以上每个终止操作只能单独演示,因为对一个流只能执行一次终止操作。并且执行完终止操作后,就不能再对这个流执行任何操作,否则将抛出 `java.lang.IllegalStateException: stream has already been operated upon or closed` 异常。
|
||||
|
||||
### 5.2 分组
|
||||
|
||||
分组收集器可以实现类似数据库 groupBy 子句的功能。假设存在如下员工信息:
|
||||
|
||||
```java
|
||||
Stream<Employee> stream = Stream.of(new Employee("张某", "男", "A公司", 20),
|
||||
new Employee("李某", "女", "A公司", 30),
|
||||
new Employee("王某", "男", "B公司", 40),
|
||||
new Employee("田某", "女", "B公司", 50));
|
||||
```
|
||||
|
||||
```java
|
||||
public class Employee {
|
||||
|
||||
private String name;
|
||||
private String gender;
|
||||
private String company;
|
||||
private int age;
|
||||
|
||||
@Override
|
||||
public String toString() {return "Employee{" + "name='" + name + '\'' + '}';
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
此时如果需要按照公司进行分组,则可以使用 `groupingBy()` 收集器:
|
||||
|
||||
```java
|
||||
stream.collect(Collectors.groupingBy(Employee::getCompany));
|
||||
|
||||
对应的分组结果如下:
|
||||
{
|
||||
B公司=[Employee{name='王某'}, Employee{name='田某'}],
|
||||
A公司=[Employee{name='张某'}, Employee{name='李某'}]
|
||||
}
|
||||
```
|
||||
|
||||
如果想要计算分组后每家公司的人数,还可以为 `groupingBy()` 传递一个收集器 Collector 作为其第二个参数,调用其重载方法:
|
||||
|
||||
```java
|
||||
stream.collect(Collectors.groupingBy(Employee::getCompany, Collectors.counting()));
|
||||
|
||||
对应的结果如下:
|
||||
{
|
||||
B公司=2,
|
||||
A公司=2
|
||||
}
|
||||
```
|
||||
|
||||
因为第二个参数是一个 Collector,这意味着你可以再传入一个分组收集器来完成多级分组,示例如下:
|
||||
|
||||
```java
|
||||
stream.collect(Collectors.groupingBy(Employee::getCompany, Collectors.groupingBy(Employee::getGender)));
|
||||
|
||||
对应的分组结果如下:
|
||||
{
|
||||
B公司={女=[Employee{name='田某'}], 男=[Employee{name='王某'}]},
|
||||
A公司={女=[Employee{name='李某'}], 男=[Employee{name='张某'}]}
|
||||
}
|
||||
```
|
||||
|
||||
除此之外,也可以通过代码块来自定义分组条件,示例如下:
|
||||
|
||||
```java
|
||||
Map<String, List<Employee>> collect = stream.collect(Collectors.groupingBy(employee -> {
|
||||
if (employee.getAge() <= 30) {
|
||||
return "青年员工";
|
||||
} else if (employee.getAge() < 50) {
|
||||
return "中年员工";
|
||||
} else {
|
||||
return "老年员工";
|
||||
}
|
||||
}));
|
||||
|
||||
对应的分组结果如下:
|
||||
{
|
||||
中年员工=[Employee{name='王某'}],
|
||||
青年员工=[Employee{name='张某'}, Employee{name='李某'}],
|
||||
老年员工=[Employee{name='田某'}]
|
||||
}
|
||||
```
|
||||
|
||||
### 5.3 分区
|
||||
|
||||
分区是分组的一种特殊情况,即将满足指定条件的分为一组,将不满足指定条件的分为另外一组,两者在使用上基本类似,示例如下:
|
||||
|
||||
```java
|
||||
stream.collect(Collectors.partitioningBy(x -> "A公司".equals(x.getCompany())));
|
||||
|
||||
对应的分区结果如下:
|
||||
{
|
||||
false=[Employee{name='王某'}, Employee{name='田某'}],
|
||||
true=[Employee{name='张某'}, Employee{name='李某'}]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 六、并行流
|
||||
|
||||
想要将普通流转换为并行流非常简单,只需要调用 Stream 的 `parallel()` 方法即可:
|
||||
|
||||
```java
|
||||
stream.parallel();
|
||||
```
|
||||
|
||||
此时流中的所有元素会被均匀的分配到多个线程上进行处理。并行流内部使用的是 ForkJoinPool 线程池,它默认的线程数量就是处理器数量,可以通过 `Runtime.getRuntime().availableProcessors()` 来查看该值,通常不需要更改。
|
||||
|
||||
同时当前也无法为某个具体的流指定线程数量,只能通过修改系统属性 `java.util.concurrent.ForkJoinPool.common.parallelism` 的值来改变线程池大小,进而改变所有并行流的线程大小,示例如下:
|
||||
|
||||
```java
|
||||
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
|
||||
```
|
||||
|
||||
如果想将并行流改回普通的串行流,则只需要调用 Stream 的 `sequential()` 方法即可:
|
||||
|
||||
```she
|
||||
stream.sequential();
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
## 参考文档
|
||||
|
||||
厄马(Raoul-Gabriel Urma) / 弗斯科(Mario Fusco) / 米克罗夫特(Alan Mycroft) .**《Java 8实战》**. 人民邮电出版社 . 2016-04-01
|
322
notes/Redis_分布式锁原理.md
Normal file
@ -0,0 +1,322 @@
|
||||
# Redis 分布式锁
|
||||
|
||||
## 一、实现原理
|
||||
|
||||
### 1.1 基本原理
|
||||
|
||||
JDK 原生的锁可以让不同**线程**之间以互斥的方式来访问共享资源,但如果想要在不同**进程**之间以互斥的方式来访问共享资源,JDK 原生的锁就无能为力。此时可以使用 Redis 或 Zookeeper 来实现分布式锁。
|
||||
|
||||
Redis 实现分布式锁的核心命令如下:
|
||||
|
||||
```shell
|
||||
SETNX key value
|
||||
```
|
||||
|
||||
SETNX 命令的作用是如果指定的 key 不存在,则创建并将为其设置值,此时返回状态码为 1,否则为 0。如果状态码为 1,代表获得该锁;此时其他进程再次尝试创建时,都回返回 0 ,代表锁已经被占用。当获得锁的进程处理完成业务后,再通过 `del` 命令将该 key 删除,其他进程就可以再次竞争性地进行创建,获得该锁。
|
||||
|
||||
通常为了避免死锁,我们会为锁设置一个超时时间,在 Redis 中可以通过 `expire` 命令来进行实现:
|
||||
|
||||
```shell
|
||||
EXPIRE key seconds
|
||||
```
|
||||
|
||||
这里我们将两者结合起来,并使用 Jedis 客户端来进行实现,其代码如下:
|
||||
|
||||
```java
|
||||
Long result = jedis.setnx("lockKey", "lockValue");
|
||||
if (result == 1) {
|
||||
// 如果此处程序被异常终止(如直接kill -9进程),则设置超时的操作就无法进行,该锁就会出现死锁
|
||||
jedis.expire("lockKey", 3);
|
||||
}
|
||||
```
|
||||
|
||||
上面的代码存在原子性问题,即 setnx + expire 操作是非原子性的,如果在设置超时时间前,程序被异常终止,则程序就会出现死锁。此时可以将 SETNX 和 EXPIRE 两个命令写在同一个 Lua 脚本中,然后通过调用 Jedis 的 `eval()` 方法来执行,并由 Redis 来保证整个 Lua 脚本操作的原子性。这种方式实现比较繁琐,因此官方文档中推荐了另外一种更加优雅的实现方法:
|
||||
|
||||
### 1.2 官方推荐
|
||||
|
||||
[官方文档]( Distributed locks with Redis) 中推荐直接使用 set 命令来进行实现:
|
||||
|
||||
```shell
|
||||
SET key value [EX seconds|PX milliseconds] [NX|XX] [KEEPTTL]
|
||||
```
|
||||
|
||||
这里我们主要关注以下四个参数:
|
||||
|
||||
- **EX** :设置超时时间,单位是秒;
|
||||
- **PX** :设置超时时间,单位是毫秒;
|
||||
- **NX** :当且仅当对应的 Key 不存在时候才进行设置;
|
||||
- **XX**:当且仅当对应的 Key 不存在时候才进行设置。
|
||||
|
||||
这四个参数从 Redis 2.6.12 版本开始支持,因为当前大多数在用的 Redis 都已经高于这个版本,所以推荐直接使用该命令来实现分布式锁。对应的 Jedis 代码如下:
|
||||
|
||||
```java
|
||||
jedis.set("lockKey", "lockValue", SetParams.setParams().nx().ex(3));
|
||||
```
|
||||
|
||||
此时一条命令就可以完成值和超时时间的设置,并且因为只有一条命令,因此其原子性也得到了保证。但因为引入了超时时间来避免死锁,同时也存在了两个其他问题:
|
||||
|
||||

|
||||
|
||||
+ **问题一**:当业务处理的时间超过过期时间后,由于锁已经被释放,此时其他进程就可以获得该锁,这意味着同时有两个进程进入了临界区,此时分布式锁就失效了;
|
||||
+ **问题二**:如上图所示,当进程 A 业务处理完成后,此时删除的是进程 B 的锁,进而导致分布式锁又一次失效,进程 B 和 进程 C 同时进入了临界区。
|
||||
|
||||
针对问题二,我们可以在创建锁时为其指定一个唯一的标识作为 Key 的 Value,这里假设我们采用 `UUID + 线程ID` 来作为唯一标识:
|
||||
|
||||
```java
|
||||
String identifier = UUID.randomUUID() + ":" + Thread.currentThread().getId();
|
||||
jedis.set("LockKey", identifier, SetParams.setParams().nx().ex(3));
|
||||
```
|
||||
|
||||
然后在删除锁前,先将该唯一标识与锁的 Value 值进行比较,如果不相等,证明该锁不属于当前的操作对象,此时不执行删除操作。为保证判断操作和删除操作整体的原子性,这里需要使用 Lua 脚本来执行:
|
||||
|
||||
```shell
|
||||
if redis.call("get",KEYS[1]) == ARGV[1] then
|
||||
return redis.call("del",KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
```
|
||||
|
||||
这段脚本的意思是如果 value 的值与给定的值相同,则执行删除命令,否则直接返回状态码 0 。对应使用 Jedis 实现的代码如下:
|
||||
|
||||
```java
|
||||
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
|
||||
jedis.eval(script,
|
||||
Collections.singletonList("LockKey"), // keys的集合
|
||||
Collections.singletonList(identifier) // args的集合
|
||||
);
|
||||
```
|
||||
|
||||
接着再看问题一,问题一最简单的解决方法是:你可以估计业务的最大处理时间,然后保证设置的过期时间大于最大处理时间。但是由于业务需要面临各种复杂的情况,因此可能无法保证业务每一次都能在规定的过期时间内处理完成,此时可以使用延长锁时效的策略。
|
||||
|
||||
### 1.3 延长锁时效
|
||||
|
||||
延长锁时效的方案如下:假设锁超时时间是 30 秒,此时程序需要每隔一段时间去扫描一下该锁是否还存在,扫描时间需要小于超时时间,通常可以设置为超时时间的 1/3,在这里也就是 10 秒扫描一次。如果锁还存在,则重置其超时时间恢复到 30 秒。通过这种方案,只要业务还没有处理完成,锁就会一直有效;而当业务一旦处理完成,程序也会马上删除该锁。
|
||||
|
||||
Redis 的 Java 客户端 Redisson 提供的分布式锁就支持延长锁时效的机制,称为 WatchDog,直译过来就是 “看门狗” 机制。
|
||||
|
||||
以上讨论的都是单机环境下的 Redis 分布式锁,而想要保证 Redis 分布式锁是高可用,首先 Redis 得是高可用的,Redis 的高可用模式主要有两种:哨兵模式和集群模式。
|
||||
|
||||
|
||||
|
||||
## 二、哨兵模式与分布式锁
|
||||
|
||||
哨兵模式是主从模式的升级版,能够在故障发生时自动进行故障切换,选举出新的主节点。但由于 Redis 的复制机制是异步的,因此在哨兵模式下实现的分布式锁是不可靠的,原因如下:
|
||||
|
||||
+ 由于主从之间的复制操作是异步的,当主节点上创建好锁后,此时从节点上的锁可能尚未创建。而如果此时主节点发生了宕机,从节点上将不会创建该分布式锁;
|
||||
+ 从节点晋升为主节点后,其他进程(或线程)仍然可以在该新主节点创建分布式锁,此时就存在多个进程(或线程)同时进入了临界区,分布式锁就失效了。
|
||||
|
||||
因此在哨兵模式下,无法避免锁失效的问题。因此想要实现高可用的分布式锁,可以采取 Redis 的另一个高可用方案 —— Redis 集群模式。
|
||||
|
||||
|
||||
|
||||
## 三、集群模式与分布式锁
|
||||
|
||||
### 3.1 RedLock 方案
|
||||
|
||||
想要在集群模式下实现分布式锁,Redis 提供了一种称为 RedLock 的方案,假设我们有 N 个 Redis 实例,此时客户端的执行过程如下:
|
||||
|
||||
+ 以毫秒为单位记录当前的时间,作为开始时间;
|
||||
+ 接着采用和单机版相同的方式,依次尝试在每个实例上创建锁。为了避免客户端长时间与某个故障的 Redis 节点通讯而导致阻塞,这里采用快速轮询的方式:假设创建锁时设置的超时时间为 10 秒,则访问每个 Redis 实例的超时时间可能在 5 到 50 毫秒之间,如果在这个时间内还没有建立通信,则尝试下一个实例;
|
||||
+ 如果在至少 N/2+1 个实例上都成功创建了锁。并且 `当前时间 - 开始时间 < 锁的超时时间` ,则认为已经获取了锁,锁的有效时间等于 `超时时间 - 花费时间`(如果考虑到不同 Redis 实例所在的服务器存在时钟漂移,则还需要减去时钟漂移);
|
||||
+ 如果少于 N/2+1 个实例,则认为创建分布式锁失败,此时需要删除这些实例上已创建的锁,以便其他客户端进行创建。
|
||||
+ 该客户端在失败后,可以等待一个随机的时间后,再次进行重试。
|
||||
|
||||
以上就是 RedLock 的实现方案,可以看到主要是由客户端来实现的,并不真正涉及到 Redis 集群相关的功能。因此这里的 N 个 Redis 实例并不要求是一个真正的 Redis 集群,它们彼此之间可以是完全独立的,但由于只需要半数节点获得锁就能真正获得锁,因此对于分布式锁功能而言,其仍然是高可用的。后面使用 Redisson 来演示 RedLock 时会再次验证这一点。
|
||||
|
||||
|
||||
|
||||
### 3.2 低延迟通讯和多路复用
|
||||
|
||||
实现 RedLock 方案的客户端与所有 Redis 实例进行通讯时,必须要保证低延迟,而且最好能使用多路复用技术来保证一次性将 SET 命令发送到所有 Redis 节点上,并获取到对应的执行结果。假设网络延迟高,此时客户端 A 和 B 分别尝试创建锁:
|
||||
|
||||
```shell
|
||||
SET key 随机数A EX 3 NX #A客户端
|
||||
SET key 随机数B EX 3 NX #B客户端
|
||||
```
|
||||
|
||||
假设客户端 A 在一半节点上创建了锁,而客户端 B 在另外一半节点上创建了锁,此时两个客户端都无法获取到锁。如果并发很高,则可能存在多个客户端分别在部分节点上创建了锁,而没有一个客户端的数量超过 N/2+1。这也就是上面过程的最后一步中,强调一旦客户端失败后,需要等待一个随机时间后再进行重试的原因,如果是一个固定时间,则所有失败的客户端又同时发起重试,情况就还是一样。
|
||||
|
||||
因此最佳的实现就是客户端的 SET 命令能几乎同时到达所有节点,并几乎同时接受到所有执行结果。 想要保证这一点,低延迟的网络通信极为关键,下文介绍的 Redisson 就采用 Netty 来实现了这一功能。
|
||||
|
||||
|
||||
|
||||
### 3.3 持久化与高可用
|
||||
|
||||
为了保证高可用,所有 Redis 节点都需要开启持久化。假设不开启持久化,假设进程 A 获得锁后正在处理业务逻辑,此时节点宕机重启,因为锁数据丢失了,其他进程便可以再次获得该锁,因此所有 Redis 节点都需要开启 AOF 持久化方式。
|
||||
|
||||
AOF 默认的同步机制为 `everysec`,即每秒进程一次持久化,此时能够兼顾性能与数据安全,发生意外宕机的时,最多会丢失一秒的数据。但如果碰巧就是在这一秒的时间内进程 A 创建了锁,此时其他进程也可以获得该锁,锁的互斥性也就失效了。要解决这个问题有两种方式:
|
||||
|
||||
+ **方式一**:修改 Redis.conf 中 `appendfsync` 的值为 always,即每次命令后都进行持久化,此时降低了 Redis 性能,进而也会降低了分布式锁的性能,但锁的互斥性得到了绝对的保证;
|
||||
+ **方式二**:一旦节点宕机了,等到锁的超时时间过了之后才进行重启,此时相当于原有锁自然失效(你需要保证业务在自己设定的超时时间内能完成),这种方案称为延时重启。
|
||||
|
||||
|
||||
|
||||
## 四、Redisson
|
||||
|
||||
Redisson 是 Redis 的 Java 客户端,它提供了各种的 Redis 分布式锁的实现,如可重入锁、公平锁、RedLock、读写锁等等,并且在实现上考虑得也更加全面,适用于生产环境下使用。
|
||||
|
||||
### 4.1 分布式锁
|
||||
|
||||
使用 Redisson 来创建单机版本分布式锁非常简单,示例如下:
|
||||
|
||||
```java
|
||||
// 1.创建RedissonClient,如果与spring集成,可以将RedissonClient声明为Bean,在使用时注入即可
|
||||
Config config = new Config();
|
||||
config.useSingleServer().setAddress("redis://192.168.0.100:6379");
|
||||
RedissonClient redissonClient = Redisson.create(config);
|
||||
|
||||
// 2.创建锁实例
|
||||
RLock lock = redissonClient.getLock("myLock");
|
||||
try {
|
||||
//3.尝试获取分布式锁,第一个参数为等待时间,第二个参数为锁过期时间
|
||||
boolean isLock = lock.tryLock(10, 30, TimeUnit.SECONDS);
|
||||
if (isLock) {
|
||||
// 4.模拟业务处理
|
||||
System.out.println("处理业务逻辑");
|
||||
Thread.sleep(20 * 1000);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
//5.释放锁
|
||||
lock.unlock();
|
||||
}
|
||||
redissonClient.shutdown();
|
||||
```
|
||||
|
||||
此时对应在 Redis 中的数据结构如下:
|
||||
|
||||

|
||||
|
||||
可以看到 key 就是代码中设置的锁名,而 value 值的类型是 hash,其中键 `9280e909-c86b-43ec-b11d-6e5a7745e2e9:13` 的格式为 `UUID + 线程ID`,键对应的值为 1,代表加锁的次数。之所以要采用 hash 这种格式,主要是因为 Redisson 创建的锁是具有重入性的,即你可以多次进行加锁:
|
||||
|
||||
```java
|
||||
boolean isLock1 = lock.tryLock(0, 30, TimeUnit.SECONDS);
|
||||
boolean isLock2 = lock.tryLock(0, 30, TimeUnit.SECONDS);
|
||||
```
|
||||
|
||||
此时对应的值就会变成 2,代表加了两次锁:
|
||||
|
||||

|
||||
|
||||
当然和其他重入锁一样,需要保证加锁的次数和解锁的次数一样,才能完全解锁:
|
||||
|
||||
```java
|
||||
lock.unlock();
|
||||
lock.unlock();
|
||||
```
|
||||
|
||||
|
||||
|
||||
### 4.2 RedLock
|
||||
|
||||
Redisson 也实现了 Redis 官方推荐的 RedLock 方案,这里我们启动三个 Redis 实例进行演示,它们彼此之间可以是完全独立的,并不需要进行集群的相关配置:
|
||||
|
||||
```shell
|
||||
$ ./redis-server ../redis.conf
|
||||
$ ./redis-server ../redis.conf --port 6380
|
||||
$ ./redis-server ../redis.conf --port 6381
|
||||
```
|
||||
|
||||
对应的代码示例如下:
|
||||
|
||||
```java
|
||||
// 1.创建RedissonClient
|
||||
Config config01 = new Config();
|
||||
config01.useSingleServer().setAddress("redis://192.168.0.100:6379");
|
||||
RedissonClient redissonClient01 = Redisson.create(config01);
|
||||
Config config02 = new Config();
|
||||
config02.useSingleServer().setAddress("redis://192.168.0.100:6380");
|
||||
RedissonClient redissonClient02 = Redisson.create(config02);
|
||||
Config config03 = new Config();
|
||||
config03.useSingleServer().setAddress("redis://192.168.0.100:6381");
|
||||
RedissonClient redissonClient03 = Redisson.create(config03);
|
||||
|
||||
// 2.创建锁实例
|
||||
String lockName = "myLock";
|
||||
RLock lock01 = redissonClient01.getLock(lockName);
|
||||
RLock lock02 = redissonClient02.getLock(lockName);
|
||||
RLock lock03 = redissonClient03.getLock(lockName);
|
||||
|
||||
// 3. 创建 RedissonRedLock
|
||||
RedissonRedLock redLock = new RedissonRedLock(lock01, lock02, lock03);
|
||||
|
||||
try {
|
||||
boolean isLock = redLock.tryLock(10, 300, TimeUnit.SECONDS);
|
||||
if (isLock) {
|
||||
// 4.模拟业务处理
|
||||
System.out.println("处理业务逻辑");
|
||||
Thread.sleep(200 * 1000);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
//5.释放锁
|
||||
redLock.unlock();
|
||||
}
|
||||
|
||||
redissonClient01.shutdown();
|
||||
redissonClient02.shutdown();
|
||||
redissonClient03.shutdown();
|
||||
```
|
||||
|
||||
此时每个 Redis 实例上锁的情况如下:
|
||||
|
||||

|
||||
|
||||
可以看到每个实例上都获得了锁。
|
||||
|
||||
### 4.3 延长锁时效
|
||||
|
||||
最后,介绍一下 Redisson 的 WatchDog 机制,它可以用来延长锁时效,示例如下:
|
||||
|
||||
```java
|
||||
Config config = new Config();
|
||||
// 1.设置WatchdogTimeout
|
||||
config.setLockWatchdogTimeout(30 * 1000);
|
||||
config.useSingleServer().setAddress("redis://192.168.0.100:6379");
|
||||
RedissonClient redissonClient = Redisson.create(config);
|
||||
|
||||
// 2.创建锁实例
|
||||
RLock lock = redissonClient.getLock("myLock");
|
||||
try {
|
||||
//3.尝试获取分布式锁,第一个参数为等待时间
|
||||
boolean isLock = lock.tryLock(0, TimeUnit.SECONDS);
|
||||
if (isLock) {
|
||||
// 4.模拟业务处理
|
||||
System.out.println("处理业务逻辑");
|
||||
Thread.sleep(60 * 1000);
|
||||
System.out.println("锁剩余的生存时间:" + lock.remainTimeToLive());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
//5.释放锁
|
||||
lock.unlock();
|
||||
}
|
||||
redissonClient.shutdown();
|
||||
```
|
||||
|
||||
这里我们通过 `config.setLockWatchdogTimeout(30 * 1000)` 将 lockWatchdogTimeout 的值设置为 30000 毫秒(默认值也是 30000 毫秒)。lockWatchdogTimeout 只会对那些没有设置锁超时时间的锁生效,所以我们这里调用的是两个参数的 `tryLock()` 方法:
|
||||
|
||||
```java
|
||||
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
|
||||
```
|
||||
|
||||
而不是包含超时时间的三个参数的 `tryLock()` 方法:
|
||||
|
||||
```java
|
||||
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
|
||||
```
|
||||
|
||||
Redisson 的 WatchDog 机制会以 lockWatchdogTimeout 的 1/3 时长为周期(在这里就是 10 秒)对所有未设置超时时间的锁进行检查,如果业务尚未处理完成(也就是锁还没有被程序主动删除),Redisson 就会将锁的超时时间重置为 lockWatchdogTimeout 指定的值(在这里就是设置的 30 秒),直到锁被程序主动删除。因此在上面的例子中可以看到,不论将模拟业务的睡眠时间设置为多长,其锁都会存在一定的剩余生存时间,直至业务处理完成。
|
||||
|
||||
反之,如果明确的指定了锁的超时时间 leaseTime,则以 leaseTime 的时间为准,WatchDog 机制对明确指定超时时间的锁不会生效。
|
||||
|
||||
|
||||
|
||||
## 参考资料
|
||||
|
||||
+ [Distributed locks with Redis](https://redis.io/topics/distlock)
|
||||
+ [Redisson Distributed locks and synchronizers](https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers)
|
@ -34,10 +34,10 @@ RDB 机制是以指定的时间间隔将 Redis 中的数据生成快照并保存
|
||||
|
||||
除了手动使用命令触发外,在某些场景下也会自动触发 Redis 的 RDB 机制:
|
||||
|
||||
+ 在 `redis.conf` 中配置了 `save m n` ,表示如果在 m 秒内存在了 n 次修改操作时,则自动触发`bgsave`;
|
||||
+ 如果从节点执行全量复制操作,则主节点自动执行`bgsave`,并将生成的 RDB 文件发送给从节点;
|
||||
+ 执行 `debug reload` 命令重新加载 Redis 时,会触发`save`操作;
|
||||
+ 执行 `shutdown` 命令时候,如果没有启用 AOF 持久化则默认采用`bgsave`进行持久化。
|
||||
+ 在 `redis.conf` 中配置了 `save m n` ,表示如果在 m 秒内存在了 n 次修改操作时,则自动触发 `bgsave`;
|
||||
+ 如果从节点执行全量复制操作,则主节点自动执行 `bgsave`,并将生成的 RDB 文件发送给从节点;
|
||||
+ 执行 `debug reload` 命令重新加载 Redis 时,会触发 `save` 操作;
|
||||
+ 执行 `shutdown` 命令时候,如果没有启用 AOF 持久化则默认采用 `bgsave ` 进行持久化。
|
||||
|
||||
### 2.3 相关配置
|
||||
|
||||
@ -45,7 +45,7 @@ RDB 机制是以指定的时间间隔将 Redis 中的数据生成快照并保存
|
||||
|
||||
RDB 文件默认保存在 Redis 的工作目录下,默认文件名为 `dump.rdb`,可以通过静态或动态方式修改:
|
||||
|
||||
+ 静态配置:通过修改 `redis.conf` 中的工作目录`dir`和数据库存储文件名`dbfilename`两个配置;
|
||||
+ 静态配置:通过修改 `redis.conf` 中的工作目录 `dir` 和数据库存储文件名 `dbfilename` 两个配置;
|
||||
|
||||
+ 动态修改:通过在命令行中执行以下命令:
|
||||
|
||||
@ -72,7 +72,7 @@ AOF 是 Redis 提供的另外一种持久化的方式,它以独立日志的方
|
||||
|
||||
### 3.2 同步策略
|
||||
|
||||
Redis 提供了三种同步策略,用于控制 AOF 缓冲区同步数据到磁盘上的行为,由参数`appendfsync`控制:
|
||||
Redis 提供了三种同步策略,用于控制 AOF 缓冲区同步数据到磁盘上的行为,由参数 `appendfsync` 控制:
|
||||
|
||||
| 可选配置 | 说明 |
|
||||
| -------- | ------------------------------------------------------------ |
|
||||
@ -85,11 +85,11 @@ write 和 fsync 操作说明:
|
||||
- write 操作会触发延迟写机制,Linux 在内核提供页缓冲区用来提高硬盘的 IO 性能,write 操作在写入系统缓冲区后直接返回。同步操作依赖于系统调度机制,例如缓冲区页空间写满或达到特定时间周期。 同步文件之前,如果此时系统故障宕机,缓冲区内数据将丢失。
|
||||
- fsync 针对单个文件操作,做强制硬盘同步,fsync 操作将阻塞直到写入硬盘完成后返回,它保证了数据持久化的安全。
|
||||
|
||||
Redis 默认的同步机制为`everysec`,此时能够兼顾性能和保证数据安全,在发生意外宕机的时,最多会丢失一秒的数据。
|
||||
Redis 默认的同步机制为 `everysec`,此时能够兼顾性能和保证数据安全,在发生意外宕机的时,最多会丢失一秒的数据。
|
||||
|
||||
### 3.3 相关配置
|
||||
|
||||
想要使用 AOF 功能,需要配置 `appendonly `的值为`yes`,默认值为`no`。默认 AOF 的文件名为 `appendonly.aof`, 可以通过修改`appendfilename`的值进行修改,和 RDB 文件的保存位置一样,默认保存在 Redis 的工作目录下。
|
||||
想要使用 AOF 功能,需要配置 `appendonly ` 的值为 `yes`,默认值为 `no`。默认 AOF 的文件名为 `appendonly.aof`, 可以通过修改`appendfilename` 的值进行修改,和 RDB 文件的保存位置一样,默认保存在 Redis 的工作目录下。
|
||||
|
||||
## 四、对比分析
|
||||
|
||||
|
357
notes/ZooKeeper_分布式锁原理.md
Normal file
@ -0,0 +1,357 @@
|
||||
# ZooKeeper 分布式锁原理
|
||||
|
||||
## 一、实现原理
|
||||
|
||||
JDK 原生的锁可以让不同**线程**之间以互斥的方式来访问共享资源,但如果想要在不同**进程**之间以互斥的方式来访问共享资源,JDK 原生的锁就无能为力。此时可以使用 Redis 或 Zookeeper 来实现分布式锁。
|
||||
|
||||
### 1.1 临时节点方案
|
||||
|
||||

|
||||
|
||||
临时节点方案的原理如下:
|
||||
|
||||
+ 让多个进程(或线程)竞争性地去创建同一个临时节点,由于 ZooKeeper 不允许存在两个完全相同节点,因此必然只有一个进程能够抢先够创建成功 ;
|
||||
+ 假设进程 A 成功创建,则它获得了该分布式锁。此时其他进程需要在 parent_node 上注册监听,监听其下所有子节点的变化,并挂起当前线程;
|
||||
+ 当 parent_node 下有子节点发生变化时候,它会通知所有在其上注册了监听的进程。这些进程需要判断是否是对应的锁节点上的删除事件。如果是,则让挂起的线程继续执行,并尝试再次获取锁。
|
||||
|
||||
这里之所以使用临时节点是为了避免死锁:进程 A 正常执行完业务逻辑后,会主动地去删除该节点,释放锁。但如果进程 A 意外宕机了,由于声明的是临时节点,因此该节点也会被移除,从而避免死锁。
|
||||
|
||||
临时节点方案的实现比较简单,但是其缺点也比较明显:
|
||||
|
||||
+ **缺点一**:当 parent_node 下其他锁变动或者被删除时,进程 B,C,D 也会收到通知,但是显然它们并不关心其他锁的释放情况。如果 parent_node 下存在大量的锁,并且程序处于高并发状态下,则 ZooKeeper 集群就需要频繁地通知客户端进程,这会带来大量的网络开销;
|
||||
+ **缺点二**:采用临时节点方案创建的锁是非公平的,也就是说在进程 A 释放锁后,进程 B,C,D 发起重试的顺序与其收到通知的时间有关,而与其第一次尝试获取锁的时间无关,即与等待时间的长短无关。
|
||||
|
||||
### 1.2 临时有序节点方案
|
||||
|
||||

|
||||
|
||||
采用临时有序节点时,对应的流程如下:
|
||||
|
||||
+ 每个进程(或线程)都会尝试在 parent_node 下创建临时有序节点,根据临时有序节点的特性,所有的进程都会创建成功;
|
||||
+ 然后每个进程需要获取 parent_node 下该锁的所有临时节点的信息,并判断自己是否是最小的一个节点,如果是,则代表获得该锁;
|
||||
+ 如果不是,则挂起当前线程。并对其前一个节点注册监听(这里可以通过 exists 方法传入需要触发 Watch 事件);
|
||||
+ 如上图所示,当进程 A 处理完成后,会触发进程 B 注册的 Watch 事件,此时进程 B 就知道自己获得了锁,从而可以将挂起的线程继续,开始业务的处理。
|
||||
|
||||
这里需要注意的是:如果进程 B 创建了临时节点,并且通过比较后知道自己不是最小的一个节点,但还没有注册监听;而此时 A 进程恰好处理完成并删除了 01 节点,此时调用 exist 方法时会抛出 IllegalArgumentException 异常。这虽然是一个异常,但是却代表进程 B 获得了锁,因此进程 B 可以开始执行业务逻辑。
|
||||
|
||||
临时有序节点方案正好解决了临时节点方案的两个缺点:
|
||||
|
||||
+ 每个临时有序节点只需要关心它的上一个节点,而不需要关心其他的额外节点和额外事件;
|
||||
+ 实现的锁是公平的,先到达的进程创建的临时有序节点的值越小,能更快地获得锁。
|
||||
|
||||
临时有序节点方案的另外一个优点是其能够实现共享锁,比如读写锁中的读锁。
|
||||
|
||||
### 1.3 读写锁
|
||||
|
||||
如下图所示,可以将临时有序节点分为读锁节点和写锁节点:
|
||||
|
||||
+ 对于读锁节点而言,其只需要关心前一个写锁节点的释放。如果前一个写锁释放了,则多个读锁节点对应的线程可以并发地读取数据;
|
||||
+ 对于写锁节点而言,其只需要关心前一个节点的释放,而不需要关心前一个节点是写锁节点还是读锁节点。因为为了保证有序性,写操作必须要等待前面的读操作或者写操作执行完成。
|
||||
|
||||

|
||||
|
||||
|
||||
|
||||
## 二、 Apache Curator
|
||||
|
||||
### 2.1 基本使用
|
||||
|
||||
Apache Curator 是 ZooKeeper 的 Java 客户端,它基于临时有序节点方案实现了分布式锁、分布式读写锁等功能。基本使用如下:
|
||||
|
||||
```java
|
||||
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
|
||||
CuratorFramework client = CuratorFrameworkFactory.builder()
|
||||
.connectString("192.168.0.105:2181")
|
||||
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
|
||||
.namespace("mySpace").build();
|
||||
client.start();
|
||||
|
||||
// 1. 创建分布式锁
|
||||
InterProcessMutex lock = new InterProcessMutex(client, "/distributed/myLock");
|
||||
// 2.尝试获取分布式锁
|
||||
if (lock.acquire(10, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
System.out.println("模拟业务耗时");
|
||||
Thread.sleep(3 * 1000);
|
||||
} finally {
|
||||
// 3.释放锁
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
client.close();
|
||||
```
|
||||
|
||||
这里需要事先导入 Apache Curator 和 ZooKeeper 相关的依赖,并保证 ZooKeeper 版本与服务器上 ZooKeeper 的版本一致:
|
||||
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-framework</artifactId>
|
||||
<version>4.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-recipes</artifactId>
|
||||
<version>4.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>3.4.14</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
之后就可以启动多个程序进程来进程测试,ZooKeeper 上的数据结构如下:
|
||||
|
||||

|
||||
|
||||
在我们指定的路径下,会依次创建多个临时有序节点,而当业务逻辑处理完成后,这些节点就会被移除。这里我们使用的是单机版本的 ZooKeeper ,而集群环境下也是一样,和 Redis 主从模式下的延迟复制会导致数据不一致的情况不同,ZooKeeper 各个节点上的数据一致性可以由其自身来进行保证。
|
||||
|
||||
|
||||
|
||||
### 2.2 源码解析
|
||||
|
||||
Apache Curator 底层采用的是临时有序节点的实现方案,下面我们来看一下其源码中具体是如何实现的:
|
||||
|
||||
#### 1. 获取锁源码解析
|
||||
|
||||
上面最核心的获取锁的方法 `acquire()` ,其定义如下:
|
||||
|
||||
```java
|
||||
@Override
|
||||
public boolean acquire(long time, TimeUnit unit) throws Exception{
|
||||
return internalLock(time, unit);
|
||||
}
|
||||
```
|
||||
|
||||
它在内部调用了 `internalLock()` 方法:
|
||||
|
||||
```java
|
||||
// threadData是一个线程安全的Map,其中Thread是持有锁的线程,LockData是锁数据
|
||||
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
|
||||
|
||||
private boolean internalLock(long time, TimeUnit unit) throws Exception{
|
||||
Thread currentThread = Thread.currentThread();
|
||||
// 首先查看threadData中是否已经有当前线程对应的锁
|
||||
LockData lockData = threadData.get(currentThread);
|
||||
if ( lockData != null ){
|
||||
//如果锁已存在,则将其计数器加1,这一步是为了实现可重入锁
|
||||
lockData.lockCount.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
// 【核心方法:尝试获取锁】
|
||||
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
|
||||
// 如果获取到锁,则将其添加到threadData中
|
||||
if ( lockPath != null ){
|
||||
LockData newLockData = new LockData(currentThread, lockPath);
|
||||
threadData.put(currentThread, newLockData);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
```
|
||||
|
||||
这里面真正去尝试创建锁的方法是 `attemptLock()`:
|
||||
|
||||
```java
|
||||
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
|
||||
final long startMillis = System.currentTimeMillis();
|
||||
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
|
||||
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
|
||||
int retryCount = 0; // 重试次数
|
||||
String ourPath = null;
|
||||
boolean hasTheLock = false;
|
||||
boolean isDone = false;
|
||||
|
||||
// 当出现NoNodeException异常时候依靠该循环进行重试
|
||||
while ( !isDone ){
|
||||
isDone = true;
|
||||
try{
|
||||
// 【核心方法:根据锁路径来创建对应的节点】
|
||||
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
|
||||
// 【核心方法:获取锁】
|
||||
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
|
||||
}
|
||||
catch ( KeeperException.NoNodeException e ){
|
||||
// 如果出现异常,并且还没有到达给ZooKeeper配置的最大重试时间或最大重试次数,则循环继续,并再次尝试获取锁
|
||||
if ( client.getZookeeperClient().getRetryPolicy()
|
||||
.allowRetry(retryCount++,System.currentTimeMillis() - startMillis,
|
||||
RetryLoop.getDefaultRetrySleeper()) ){
|
||||
isDone = false;
|
||||
}else{
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果获取到锁,则跳出循环,并返回锁的路径
|
||||
if ( hasTheLock ){
|
||||
return ourPath;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
```
|
||||
|
||||
这里两个核心的方法是 `createsTheLock()` 和 `internalLockLoop()` 。createsTheLock 的实现比较简单,就是根据我们指定的路径来创建临时节点有序节点:
|
||||
|
||||
```java
|
||||
@Override
|
||||
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{
|
||||
String ourPath;
|
||||
// 如果lockNodeBytes不为空,则创建一个含数据的临时有序节点
|
||||
if ( lockNodeBytes != null ){
|
||||
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().
|
||||
withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
|
||||
}else{
|
||||
//否则则创建一个空的临时有序节点
|
||||
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().
|
||||
withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
|
||||
}
|
||||
// 返回创建好的节点路径
|
||||
return ourPath;
|
||||
}
|
||||
```
|
||||
|
||||
这里创建好的临时节点的路径会作为参数传递给 `internalLockLoop()` 方法。在文章开头介绍原理时候,我们说过每个线程创建好临时有序节点后,还需要判断它所创建的临时有序节点是否是当前最小的节点,`internalLockLoop()` 方法主要做的就是这事:
|
||||
|
||||
```java
|
||||
private boolean internalLockLoop ( long startMillis, Long millisToWait, String ourPath) throws Exception {
|
||||
// 是否持有锁
|
||||
boolean haveTheLock = false;
|
||||
boolean doDelete = false;
|
||||
try {
|
||||
if (revocable.get() != null) {
|
||||
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
|
||||
}
|
||||
// 如果连接ZooKeeper客户端处于启动状态,也就是想要获取锁的进程仍然处于运行状态,并且还没有获取到锁,则循环继续
|
||||
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
|
||||
// 对所当前所有的子节点按照从小到大进行排序
|
||||
List<String> children = getSortedChildren();
|
||||
// 将createsTheLock方法获得的临时有序节点的路径进行截取,只保留节点名的部分
|
||||
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
|
||||
// 判断当前节点是否是最小的一个节点
|
||||
PredicateResults predicateResults = driver.
|
||||
getsTheLock(client, children, sequenceNodeName, maxLeases);
|
||||
// 如果当前节点就是最小的一个节点(排他锁情况),则此时就获得了锁
|
||||
if (predicateResults.getsTheLock()) {
|
||||
haveTheLock = true;
|
||||
} else {
|
||||
// 如果当前节点不是最小的一个节点,先拼接并获取前一个节点完整的路径
|
||||
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
|
||||
synchronized (this) {
|
||||
try {
|
||||
// 然后对前一个节点进行监听
|
||||
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
|
||||
// 如果设置了等待时间
|
||||
if (millisToWait != null) {
|
||||
// 将等待时间减去到目前为止所耗费的时间
|
||||
millisToWait -= (System.currentTimeMillis() - startMillis);
|
||||
startMillis = System.currentTimeMillis();
|
||||
// 如果等待时间小于0,则说明我们耗费的时间已经超过了等待时间,此时获取的锁无效,需要删除它
|
||||
if (millisToWait <= 0) {
|
||||
//设置删除标志位,并退出循环
|
||||
doDelete = true;
|
||||
break;
|
||||
}
|
||||
// 如果还有剩余时间,则等待获取锁
|
||||
wait(millisToWait);
|
||||
} else {
|
||||
// 如果没有设置等待时间,则持续等待获取锁
|
||||
wait();
|
||||
}
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
// 这个异常抛出时,代表对前一个节点设置监听时,前一个节点已经不存在(被释放),此时捕获该异常,
|
||||
// 但不需要进行任何额外操作,因为循环会继续,就可以再次尝试获取锁
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ThreadUtils.checkInterrupted(e);
|
||||
doDelete = true;
|
||||
throw e;
|
||||
} finally {
|
||||
// 如果抛出了异常或者超时了,都删除掉上一个方法createsTheLock创建的临时有序节点,以便后面的进程进行锁的获取
|
||||
if (doDelete) {
|
||||
deleteOurPath(ourPath);
|
||||
}
|
||||
}
|
||||
return haveTheLock;
|
||||
}
|
||||
```
|
||||
|
||||
这里对上面判断当前节点是否是持有锁的节点的 getsTheLock 方法进行一下说明:
|
||||
|
||||
```java
|
||||
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
|
||||
```
|
||||
|
||||
和上文介绍的一样,判断当前节点是否是持有锁的节点,在不同锁类型(如读写锁和互斥锁)的判断是不同的,因此 getsTheLock 方法有着不同的实现。这里以StandardLockInternalsDriver 为例,它使用的是互斥锁的判断规则:只要当前节点是最小的一个节点,就能持有锁:
|
||||
|
||||
```java
|
||||
public PredicateResults getsTheLock(CuratorFramework client, List<String> children,
|
||||
String sequenceNodeName, int maxLeases) throws Exception {
|
||||
// 获取当前节点在已经排好序的节点中的下标index
|
||||
int ourIndex = children.indexOf(sequenceNodeName);
|
||||
// 如果ourIndexx小于0,则抛出NoNodeException的异常
|
||||
validateOurIndex(sequenceNodeName, ourIndex);
|
||||
// 如果ourIndex小于maxLeases(默认值是1),则代表它就是0,也就是从小到大排好序的集合中的第一个,也就是最小的一个
|
||||
boolean getsTheLock = ourIndex < maxLeases;
|
||||
// 如果是最小的一个,此时就已经获取到锁,不需要返回前一个节点的名称,否则需要返回前一个节点的名称,用于后续的监听操作
|
||||
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
|
||||
return new PredicateResults(pathToWatch, getsTheLock);
|
||||
}
|
||||
```
|
||||
|
||||
这里解释一下 maxLease 这个参数的意义:默认值为 1,就是互斥锁;如果默认值大于 1,假设 maxLease 的值是 5,则前 5 个临时有序节点都可以认为是能持有锁的节点,此时最多可以有 5 个线程并发访问临界区, 在功能上类似于 Java 中Semaphore(信号量)机制 。
|
||||
|
||||
|
||||
|
||||
#### 2. 释放锁源码解析
|
||||
|
||||
以上就是所有获取锁的源码解析,而释放锁的过程就比较简单了。`release()` 方法的源码如下:
|
||||
|
||||
```java
|
||||
public void release() throws Exception {
|
||||
Thread currentThread = Thread.currentThread();
|
||||
// 根据当前线程来获取锁信息
|
||||
InterProcessMutex.LockData lockData = threadData.get(currentThread);
|
||||
// 如果获取不到,则当前线程不是锁的持有者,此时抛出异常
|
||||
if (lockData == null) {
|
||||
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
|
||||
}
|
||||
// 因为Zookeeper实现的锁具有重入性,所以将其计数器减少1
|
||||
int newLockCount = lockData.lockCount.decrementAndGet();
|
||||
if (newLockCount > 0) {
|
||||
return;
|
||||
}
|
||||
// 如果计数器的值小于0,代表解锁次数大于加锁次数,此时抛出异常
|
||||
if (newLockCount < 0) {
|
||||
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
|
||||
}
|
||||
try {
|
||||
// 如果到达这一步,则说明计数器的值正好等于0,此时可以真正将节点删除,释放锁
|
||||
internals.releaseLock(lockData.lockPath);
|
||||
} finally {
|
||||
// 将锁信息从threadData移除
|
||||
threadData.remove(currentThread);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
真正删除锁节点的方法存在于 `releaseLock()` 中,其源码如下:
|
||||
|
||||
```java
|
||||
final void releaseLock(String lockPath) throws Exception{
|
||||
client.removeWatchers();
|
||||
revocable.set(null);
|
||||
deleteOurPath(lockPath); //删除ZooKeeper上对应的节点
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 参考资料
|
||||
|
||||
+ 倪超 . 从 Paxos 到 Zookeeper——分布式一致性原理与实践 . 电子工业出版社 . 2015-02-01
|
||||
+ https://curator.apache.org/curator-recipes/index.html
|
BIN
pictures/redis_分布式锁_cli1.png
Normal file
After Width: | Height: | Size: 7.8 KiB |
BIN
pictures/redis_分布式锁_cli2.png
Normal file
After Width: | Height: | Size: 5.9 KiB |
BIN
pictures/redis_分布式锁_cli3.png
Normal file
After Width: | Height: | Size: 19 KiB |
BIN
pictures/redis_分布式锁原理.png
Normal file
After Width: | Height: | Size: 30 KiB |
BIN
pictures/zookeeper_分布式读写锁.png
Normal file
After Width: | Height: | Size: 11 KiB |
BIN
pictures/zookeeper_分布式锁_cli.png
Normal file
After Width: | Height: | Size: 15 KiB |
BIN
pictures/zookeeper_分布式锁_临时有序节点方案.png
Normal file
After Width: | Height: | Size: 15 KiB |
BIN
pictures/zookeeper_分布式锁_临时节点方法.png
Normal file
After Width: | Height: | Size: 13 KiB |