java并发

This commit is contained in:
罗祥
2019-11-29 17:58:46 +08:00
parent 3f371f110d
commit 538f89ebc9
14 changed files with 913 additions and 114 deletions

View File

@ -31,6 +31,7 @@ public class J5_AtomicIntegerFieldUpdater {
CountDownLatch latch = new CountDownLatch(number); CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10); ExecutorService executorService = Executors.newFixedThreadPool(10);
Candidate candidate = new Candidate("候选人", 0); Candidate candidate = new Candidate("候选人", 0);
// 使用字段更新型来保证其线程安全
AtomicIntegerFieldUpdater<Candidate> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); AtomicIntegerFieldUpdater<Candidate> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
for (int i = 0; i < number; i++) { for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch, candidate, fieldUpdater)); executorService.execute(new Task(latch, candidate, fieldUpdater));

View File

@ -0,0 +1,22 @@
package com.heibaiying.createThread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class J3_Method03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<>(task);
new Thread(futureTask).start();
System.out.println("获得线程返回值:" + futureTask.get());
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() {
return 100;
}
}

View File

@ -0,0 +1,22 @@
package com.heibaiying.future;
import java.util.concurrent.*;
public class J0_Callable {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return 100;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executors = Executors.newSingleThreadExecutor();
Future<Integer> future = executors.submit(new Task());
System.out.println("计算结果为:" + future.get());
executors.shutdown();
}
}

View File

@ -1,43 +0,0 @@
package com.heibaiying.future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class J1_Future {
static class Task implements Callable<Integer> {
private int operator;
Task(Integer operator) {
this.operator = operator;
}
@Override
public Integer call() throws Exception {
Thread.sleep(500);
return operator * 10;
}
}
public static void main(String[] args) {
ExecutorService executors = Executors.newFixedThreadPool(20);
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i <= 100; i++) {
Future<Integer> submit = executors.submit(new Task(i));
futureList.add(submit);
}
// 获取所有线程的返回值并计算
Integer reduce = futureList.stream().map(x -> {
try {
return x.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return 0;
}).reduce(0, Integer::sum);
System.out.println("计算结果为:" + reduce);
executors.shutdown();
}
}

View File

@ -0,0 +1,26 @@
package com.heibaiying.future;
import java.util.concurrent.*;
public class J1_FutureTask {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return 100;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask01 = new FutureTask<>(new Task());
FutureTask<Integer> futureTask02 = new FutureTask<>(new Task());
new Thread(futureTask01).start();
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(futureTask02);
System.out.println("futureTask01 计算结果为:" + futureTask01.get());
System.out.println("futureTask02 计算结果为:" + futureTask01.get());
executorService.shutdown();
}
}

View File

@ -26,15 +26,16 @@ public class J2_CompletableFuture {
} }
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
int intermediateResult;
CompletableFuture<Integer> future = new CompletableFuture<>(); CompletableFuture<Integer> future = new CompletableFuture<>();
System.out.println("主线程开始计算"); // 启动子线程
new Thread(new Compute(future)).start(); new Thread(new Compute(future)).start();
int i = 0; System.out.println("启动主线程");
for (int j = 0; j < 100; j++) {
i = i + j;
}
Thread.sleep(2000); Thread.sleep(2000);
System.out.println("主线程计算完成"); System.out.println("主线程计算完成");
future.complete(i); // 假设主线程计算结果为 100
intermediateResult = 100;
// 传递主线程的计算结果给子线程
future.complete(intermediateResult);
} }
} }

View File

@ -6,17 +6,13 @@ import java.util.concurrent.ExecutionException;
public class J3_SupplyAsync { public class J3_SupplyAsync {
private static Integer compute() { private static Integer compute() {
int i = 0;
for (int j = 0; j < 100; j++) {
i = i + j;
}
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("子线程计算完成"); System.out.println("子线程计算完成");
return i; return 100;
} }
public static void main(String[] args) throws ExecutionException, InterruptedException { public static void main(String[] args) throws ExecutionException, InterruptedException {

View File

@ -38,6 +38,6 @@ public class J4_StreamingCall {
.thenApply(J4_StreamingCall::multi) .thenApply(J4_StreamingCall::multi)
.thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了 .thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了
.thenAccept(x -> System.out.println("运算结果:" + x)); .thenAccept(x -> System.out.println("运算结果:" + x));
future.get(); //惰性求值,如果缺少这一步,不会有任何输出 future.get(); //类似于流式计算的惰性求值,如果缺少这一步,不会有任何输出
} }
} }

View File

@ -0,0 +1,33 @@
package com.heibaiying.lockOptimization;
import java.util.Date;
public class Employee {
private String name;
private int age;
private Date birthday;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Date getBirthday() {
return birthday;
}
public void setBirthday(Date birthday) {
this.birthday = birthday;
}
}

View File

@ -0,0 +1,17 @@
package com.heibaiying.lockOptimization;
/**
* 锁消除
*/
public class LockElision {
private String toJson(Employee employee) {
StringBuffer buffer = new StringBuffer();
buffer.append("name:").append(employee.getName());
buffer.append("age:").append(employee.getAge());
buffer.append("birthday:").append(employee.getBirthday());
return buffer.toString();
}
}

View File

@ -23,9 +23,11 @@ public class J2_ThreadSafe {
@Override @Override
public void run() { public void run() {
try { try {
// 如果当前线程中不存在该值,则创建一个
if (threadLocal.get() == null) { if (threadLocal.get() == null) {
threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
} }
// 使用线程私有的SimpleDateFormat
Date parse = threadLocal.get().parse("2018-08-08 08:08:08"); Date parse = threadLocal.get().parse("2018-08-08 08:08:08");
System.out.println(parse); System.out.println(parse);
atomicInteger.incrementAndGet(); atomicInteger.incrementAndGet();

View File

@ -0,0 +1,23 @@
package com.heibaiying.threadPool;
import java.util.concurrent.*;
public class J5_GetThreadResult {
static class CustomThread implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return "线程返回结果";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> submit = executorService.submit(new CustomThread());
String result = submit.get();
System.out.println(result);
executorService.shutdown();
}
}

View File

@ -1,10 +1,10 @@
# Java 并发编程基础 # Java 并发编程基础
## 一、线程基础 ## 一、线程
### 1.1 创建线程 ### 1.1 创建线程
创建线程通常有以下种方式: 创建线程通常有以下种方式:
- 实现 Runnable 接口,并重写其 run 方法: - 实现 Runnable 接口,并重写其 run 方法:
@ -44,29 +44,51 @@ class CustomThread extends Thread {
} }
``` ```
+ 以上两种方式都无法获取线程的返回值,如果想要获取线程的返回值,需要实现 Callable 接口:
```java
public class J3_Method03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<>(task);
new Thread(futureTask).start();
System.out.println("获得线程返回值:" + futureTask.get());
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() {
return 100;
}
}
// 输出:
获得线程返回值100
```
### 1.2 线程属性 ### 1.2 线程属性
**编号 (ID)** :用于标识线程的唯一编号,只读属性。 + **编号 (ID)** :用于标识线程的唯一编号,只读属性。
**名称 (Name)**:用于定义线程名称,可读可写。 + **名称 (Name)**:用于定义线程名称,可读可写。
**线程类别 (Daemon)**:通过线程的 `setDaemon(boolean on)` 方法进行设置,为 true 表示设置为守护线程,否则为用户线程。用户线程会阻止 Java 虚拟机正常停止,守护线程则不会。通常可以把一些不重要的线程设置为守护线程,比如监控其他线程状态的监控线程,当其他工作线程停止后,虚拟机就可以正常退出。 + **线程类别 (Daemon)**:通过线程的 `setDaemon(boolean on)` 方法进行设置,为 true 表示设置为守护线程,否则为用户线程。用户线程会阻止 Java 虚拟机正常停止,守护线程则不会。通常可以把一些不重要的线程设置为守护线程,比如监控其他线程状态的监控线程,当其他工作线程停止后,虚拟机就可以正常退出。
**优先级 (Priority)**Java 线程支持 110 十个优先级,默认值为 5 。Java 线程的优先级本质上只是给线程调度器一个提示信息,它并不能保证线程一定按照优先级的高低顺序运行,所以它是不可靠的,需要谨慎使用。在 Java 平台中,子线程的优先级默认与其父线程相同。 + **优先级 (Priority)**Java 线程支持 110 十个优先级,默认值为 5 。Java 线程的优先级本质上只是给线程调度器一个提示信息,它并不能保证线程一定按照优先级的高低顺序运行,所以它是不可靠的,需要谨慎使用。在 Java 平台中,子线程的优先级默认与其父线程相同。
### 1.3 线程状态 ### 1.3 线程状态
Java 线程的生命周期分为以下五类状态: Java 线程的生命周期分为以下五类状态:
**RUNABLE**该状态包括两个子状态READY 和 RUNING 。处于 READY 状态的线程被称为活跃线程,被线程调度器选中后开始运行,转化为 RUNING 状态。 + **RUNABLE**该状态包括两个子状态READY 和 RUNING 。处于 READY 状态的线程被称为活跃线程,被线程调度器选中后开始运行,转化为 RUNING 状态。
**BLOCKED**:一个线程发起一个阻塞式 IO 操作后(如文件读写或者阻塞式 Socket 读写),或者申请一个由其他线程持有的独占资源(比如锁)时,相应的线程就会处于该状态。 + **BLOCKED**:一个线程发起一个阻塞式 IO 操作后(如文件读写或者阻塞式 Socket 读写),或者申请一个由其他线程持有的独占资源(比如锁)时,相应的线程就会处于该状态。
**WAITING**:线程处于无时间限制的等待状态。 + **WAITING**:线程处于无时间限制的等待状态。
**TIMED_WAITING**:有时间限制的等待状态,如果在指定时间内并没有执行的特定的操作,则该线程自动转换为 RUNABLE。 + **TIMED_WAITING**:有时间限制的等待状态,如果在指定时间内并没有执行的特定的操作,则该线程自动转换为 RUNABLE。
**TERMINATED**`Thread.run()`正常返回或者由于抛出异常而提前终止,则对应的线程都会处于终止状态。 + **TERMINATED**`Thread.run()`正常返回或者由于抛出异常而提前终止,则对应的线程都会处于终止状态。
各个状态之间的转换关系如下图: 各个状态之间的转换关系如下图:
@ -74,7 +96,7 @@ Java 线程的生命周期分为以下五类状态:
### 1.4 线程终止 ### 1.4 线程终止
通常线程会随着代码的运行完成而终止,但如果你在线程中进行了循环操作,此时就需要考虑如何安全地停止?虽然 Thread 类提供了 `stop()` 方法,但是其已经被标识为废弃,因为 `stop()` 只是暴力的停止线程, 此时线程中的操作可能处于中间状态,这会导致错误的结果。想要安全的停止线程,可以通过改变终止标志位的方式来进行实现: 通常线程会随着代码的运行完成而终止,但如果你在线程中进行了循环操作,此时就需要考虑如何安全地停止线程?虽然 Thread 类提供了 `stop()` 方法,但是其已经被标识为废弃,因为 `stop()` 只是暴力的停止线程, 此时线程中的操作可能处于中间状态,此时暴力地停止就可能会产生非预期的结果。想要安全的停止线程,可以通过改变终止标志位的方式来实现:
```java ```java
public class ThreadStop { public class ThreadStop {
@ -102,13 +124,13 @@ public class ThreadStop {
### 1.5 线程中断 ### 1.5 线程中断
JDK 中提供了以下方法用于实现线程中断: 除了终止线程外,JDK 中提供了以下方法用于实现线程中断:
+ `Thread.interrupt() `:用于给目标线程设置中断标志位,但实际上并不能中断线程; + `Thread.interrupt() `:用于给目标线程设置中断标志位,但实际上并不能中断线程;
+ `Thread.isInterrupted()`:通过检查中断标志位,判断当前线程是否被中断; + `Thread.isInterrupted()`:通过检查中断标志位,判断当前线程是否被中断;
+ `Thread.interrupted()`:用来判断当前线程的中断状态,同时会清除当前线程的中断标志位。 + `Thread.interrupted()`:用来判断当前线程的中断状态,同时会清除当前线程的中断标志位。
示例如下: 线程中断与线程终止的区别在于:线程中断只是告诉目标线程,我希望你停止运行,即设置标志位,而线程是否真的停止则是由其自行决定。示例如下:
```java ```java
/** /**
@ -165,13 +187,49 @@ public class J3_Interrupted {
## 二、状态变量与共享变量 ## 二、基本概念
### 2.1 变量分类
**状态变量 (State Variable)** :即类的实例变量,非共享的静态变量。 **状态变量 (State Variable)** :即类的实例变量,非共享的静态变量。
**共享变量 (Shared Variable)** : 即可以被多个线程共同访问的变量。 **共享变量 (Shared Variable)** : 即可以被多个线程共同访问的变量。
### 2.2 竞态
如果多个线程并发的操作共享变量,并且这些操作不全是只读操作,那么它们彼此之间就会存在竞争,这种状态就称为竞态。由于竞态的存在,此时可能存在一个类在单线程的环境下能够正常运转,但在多线程的环境下却运行出错,此时就称这个类是线程不安全的。
```java
public class J1_ThreadUnsafe {
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
IncreaseTask task = new IncreaseTask();
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
// 等待线程结束再打印返回值
thread1.join();
thread2.join();
System.out.println(i); // 线程不安全输出总是小于200000
}
static class IncreaseTask implements Runnable {
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
inc();
}
}
private void inc() {
i++;
}
}
}
```
## 三、原子性 ## 三、原子性
@ -184,11 +242,11 @@ public class J3_Interrupted {
### 3.2 非原子性协定 ### 3.2 非原子性协定
在 Java 语言中,除了 long 类型 和 double 类型以外的任何类型的变量的写操作都是具有原子性的,但对于没有使用 volatile 关键字修饰的 64 位的 long 类型和 double 类型,允许将其的读写操作划分为两次 32 位的操作来进行,这就是 long 和 double 的非原子性协定 ( Nonatomic Treatment of double and long Variables ) 。 在 Java 语言中,除了 long 类型 和 double 类型以外的任何类型的变量的写操作都是具有原子性的,但对于没有使用 volatile 关键字修饰的 64 位的 long 类型和 double 类型,允许将其的读写操作划分为两次 32 位的操作来进行,这就是 long 和 double 类型的非原子性协定 ( Nonatomic Treatment of double and long Variables ) 。
### 3.3 保证原子性 ### 3.3 保证原子性
通过 Java 虚拟机规范和非原子性协定, Java 语言可以保证对基本数据类型的访问具有有原子性,如果想要保证更大范围内的原子性(如多行操作的原子性),此时可以使用字节码指令 monitorenter 和 monitorexit 来隐式执行 lock 和 unlock 操作,从而将串行变成并行来保证原子性monitorenter 和 monitorexit 这两个字节码指令反映到 Java 代码中就是 Synchronized 关键字。 通过 Java 虚拟机规范和非原子性协定, Java 语言可以保证对基本数据类型的访问具有有原子性,如果想要保证更大范围内的原子性(如多行操作的原子性),此时可以使用字节码指令 monitorenter 和 monitorexit 来隐式执行 lock 和 unlock 操作,从而将串行变成并行来保证原子性monitorenter 和 monitorexit 这两个字节码指令反映到 Java 代码中就是 synchronized 关键字。
@ -244,20 +302,20 @@ public class J3_Interrupted {
### 4.6 保证可见性 ### 4.6 保证可见性
在 Java 语言中,保证可见性的典型实现是 Volatile 关键字,它在 Java 语言中一共有三种作用: 在 Java 语言中,保证可见性的典型实现是 volatile 关键字,它在 Java 语言中一共有三种作用:
+ **保证可见性**Java 虚拟机JIT 编译器)会在 volatile 变量写操作之后插入一个通用的 StoreLoad 屏障,它可以充当存储屏障来清空执行处理器的写缓冲器;同时 JIT 编译器还会在变量的读操作前插入一个加载屏障来清空无效化队列。 + **保证可见性**Java 虚拟机JIT 编译器)会在 volatile 变量写操作之后插入一个通用的 StoreLoad 屏障,它可以充当存储屏障来清空执行处理器的写缓冲器;同时 JIT 编译器还会在变量的读操作前插入一个加载屏障来清空无效化队列。
+ **禁止指令重排序**:通过内存屏障, Java 虚拟机可以 volatile 变量之前的任何读写操作都先于这个 volatile 写操作之前被提交,而 volatile 变量的读操作先于之后任何变量的读写操作被提交。 + **禁止指令重排序**:通过内存屏障, Java 虚拟机可以 volatile 变量之前的任何读写操作都先于这个 volatile 写操作之前被提交,而 volatile 变量的读操作先于之后任何变量的读写操作被提交。
+ 除了以上两类语义外Java 虚拟机规范还特别规定了对于使用 volatile 修饰的 64 的 long 类型和 double 类型的变量的读写操作具有原子性。 + 除了以上两类语义外Java 虚拟机规范还特别规定了对于使用 volatile 修饰的 64 的 long 类型和 double 类型的变量的读写操作具有原子性。
除了 Volatile 外,Synchronized 和 final 关键字都能保证可见性: 除了 volatile 外,synchronized 和 final 关键字都能保证可见性:
+ **Synchronized** Synchronized 关键字规定了对其所修饰的变量执行 unlock 操作前,必须先把此变量同步回主内存中。 + **synchronized** synchronized 关键字规定了对其所修饰的变量执行 unlock 操作前,必须先把此变量同步回主内存中。
+ **final** :被 final 修饰的字段在构造器中一旦初始化完成,并且构造器没有把 `this` 的引用逃逸到外部,那么其他线程中就能看到 final 字段的值。 + **final** :被 final 修饰的字段在构造器中一旦初始化完成,并且构造器没有把 `this` 的引用逃逸到外部,那么其他线程中就能看到 final 字段的值,即可见性得到保证
### 4.7 Java 内存模型 ### 4.7 Java 内存模型
以上主要介绍计算机的内存模型对可见性的影响,但是不同架构的处理器在内存模型和支持的指令集上都存在略微的差异。 Java 作为一种跨平台的语言,必须尽量屏蔽这种差异,而且还要尽量利用硬件的各种特性(如寄存器,高速缓存和指令集中的某些特有指令)来获取更好而定执行速度,这就是 Java 的内存模型: 以上主要介绍计算机的内存模型对可见性的影响,但是不同架构的处理器在内存模型和支持的指令集上都存在略微的差异。 Java 作为一种跨平台的语言,必须尽量屏蔽这种差异,而且还要尽量利用硬件的各种特性(如寄存器,高速缓存和指令集中的某些特有指令)来获取更好执行速度,这就是 Java 的内存模型:
+ **Main Memory**主内存Java 内存模型规定了所有的变量都存储在主内存中,主内存可以类比为计算机的主内存,但其只是虚拟机内存的一部分,并不能代表整个计算机内存。 + **Main Memory**主内存Java 内存模型规定了所有的变量都存储在主内存中,主内存可以类比为计算机的主内存,但其只是虚拟机内存的一部分,并不能代表整个计算机内存。
+ **Work Memory**工作内存Java 内存模型规定了每条线程都有自己的工作内存,工作内存可以类比为计算机的高速缓存。工作内存中保存了被该线程使用到的变量的拷贝副本。 + **Work Memory**工作内存Java 内存模型规定了每条线程都有自己的工作内存,工作内存可以类比为计算机的高速缓存。工作内存中保存了被该线程使用到的变量的拷贝副本。
@ -281,7 +339,7 @@ Java 语言中的顺序语义可以分为以下四类:
### 5.2 重排序类型 ### 5.2 重排序类型
通常我们认为以上顺序都是完全相同的,但编译器和处理器出于性能考虑,通常会改变实际代码的执行顺序,这种情况就称为重排序,具体分为以下两类: 编译器和处理器出于性能考虑,通常会改变代码的实际执行顺序,这种情况就称为重排序,具体分为以下两类:
<table> <table>
<tr> <tr>
@ -323,12 +381,10 @@ int c = a + b;
### 5.5 保证顺序性 ### 5.5 保证顺序性
在 Java 语言中,Volatile 和 Synchronized 都能够保证有序性: 在 Java 语言中,volatile 和 synchronized 都能够保证有序性:
+ **Volatile**:通过内存屏障来禁止指令重排序,通过加载屏障和存储屏障来冲刷写缓冲器和清空无效化队列,从而可以避免内存重排序的现象;
+ **Synchronized** :使用 Synchronized 修饰的变量在同一时刻只允许一个线程对其进行 lock 操作,这种限制决定了持有同一个锁的两个同步块只能串行执行,也就避免了乱序问题。
+ **volatile**:通过内存屏障来禁止指令重排序,通过加载屏障和存储屏障来冲刷写缓冲器和清空无效化队列,从而可以避免内存重排序的现象;
+ **synchronized** :使用 synchronized 修饰的变量在同一时刻只允许一个线程对其进行 lock 操作,这种限制决定了持有同一个锁的两个同步块只能串行执行,也就避免了乱序问题。
## 六、锁机制 ## 六、锁机制
@ -406,7 +462,7 @@ public class J2_SynchronizedSafe {
} }
``` ```
通常我们把被修饰的方法体和代码块称为临界区,需要注意的是必须保证多线程锁住的是同一个临界区,否则依然是线程不安全的。如果将上面创建线程的方法修改为如下所示,此时 synchronized 锁住的是不同对象的 `inc()` 方法,所以仍然是线程不安全的: 通常我们把被修饰的方法体和代码块称为临界区,需要注意的是必须保证多线程锁住的是同一个临界区,否则依然是线程不安全的。如果将上面创建线程的方法修改为如下所示,此时 synchronized 锁住的是不同对象的 `inc()` 方法,所以仍然是线程不安全的:
```java ```java
Thread thread1 = new Thread(new IncreaseTask()); Thread thread1 = new Thread(new IncreaseTask());
@ -445,7 +501,7 @@ public class J3_SynchronizedSafe {
} }
``` ```
如果想要调用不同的 `IncreaseTask()` 实例,又想保证线程安全,此时可以使用同一个对象作为 synchronized 关键字的句柄为避免竞态,作为句柄的对象通常使用 `private final` 关键字进行修饰,示例如下: 如果想要调用不同的 `IncreaseTask()` 实例,又想保证线程安全,此时可以使用同一个对象作为 synchronized 关键字的句柄为避免竞态,作为句柄的对象通常使用 `private final` 关键字进行修饰,示例如下:
```java ```java
public class J4_SynchronizedSafe { public class J4_SynchronizedSafe {
@ -480,7 +536,7 @@ public class J4_SynchronizedSafe {
### 6.2 显示锁 ### 6.2 显示锁
显示锁是 `java.util.concurrent.locks.Lock` 接口的例,该接口对显示锁进行了抽象,定义了如下方法: 显示锁是 `java.util.concurrent.locks.Lock` 接口的例,该接口对显示锁进行了抽象,定义了如下方法:
+ **lock()**:获取锁; + **lock()**:获取锁;
+ **lockInterruptibly()**:如果当前线程未被中断,则获取锁; + **lockInterruptibly()**:如果当前线程未被中断,则获取锁;
@ -552,6 +608,8 @@ ReentrantLock 即支持公平锁也支持非公平锁,公平锁在调度时候
private static ReentrantLock fairLock = new ReentrantLock(true); private static ReentrantLock fairLock = new ReentrantLock(true);
``` ```
显示锁相比于内部锁提供了更高的灵活性,但容易存在锁泄露(某个线程持有锁后因为异常而导致锁无法被释放)等问题。而内部锁虽然灵活性不足,但不会存在锁泄露,并且虚拟机也会在编译时对内部锁进行适当的锁优化。
### 6.3 读写锁 ### 6.3 读写锁
由于锁的排它性,导致多个线程无法以安全的方式并发地读取共享变量,这不利于提高系统的并发能力,因此产生了读写锁: 由于锁的排它性,导致多个线程无法以安全的方式并发地读取共享变量,这不利于提高系统的并发能力,因此产生了读写锁:
@ -652,11 +710,248 @@ public class ReadWriteLock {
} }
``` ```
### 6.4 锁优化
**1. 锁消除**
## 七、线程间的协作 锁消除Lock Elision是 JIT 编译器对内部锁所做的一种优化。在编译时JIT 编译器会通过逃逸分析Escape Analysis来判断同步块所使用的锁对象是否只能被一个线程访问而没有逃逸到其他线程如果是则编译器在编译这个同步块时就不生成 synchronized 锁所对应的机器码,这种编译器优化就被称为锁消除。示例如下:
### 7.1 等待与通知 ```java
public class LockElision {
private String toJson(Employee employee) {
StringBuffer buffer = new StringBuffer();
buffer.append("name:").append(employee.getName());
buffer.append("age:").append(employee.getAge());
buffer.append("birthday:").append(employee.getBirthday());
return buffer.toString();
}
}
```
此时的 StringBuffer 实例对象只是一个局部变量,并且该对象并没有被发布到其他线程,因此其对应的内部锁会被消除。
**2. 锁粗化**
对于相邻的几个同步块,如果这些同步块锁使用的是同一个锁的实例,那么 JIT 编译器就会将这些同步块合并为一个大同步块,从而避免一个线程反复申请、释放同一个锁而导致的开销。
**3. 偏向锁**
Java 虚拟机在实现 monitorenter 字节码(申请锁)和 monitorexit 字节码释放锁时需要借助一个原子操作CAS操作这个操作是比较昂贵的因此内部锁在每次被线程获取时它都会将对应的线程记录为偏好线程Biased Thread之后此线程无论是再次申请该锁还是获取该锁都无须借助原先昂贵的原子操作从而减少了锁的申请与释放的开销。
因为锁在每次都获取时,都需要刷新偏好线程的值,这个过程也是需要额外开销的,所以偏向锁只适用于系统中大部分锁争用较少的情况,如果系统中大部分锁的竞争都比较激烈,此时可以考虑在 Java 程序的启动命令行中增加 `-XX:-UseBiasedLocking` 参数来关闭偏向锁。
**4. 适应性锁**
如果一个线程在申请锁时,该锁恰好被其他线程所持有,那么该线程就需要等待持有锁的线程释放该锁,此时常用的方案有两种:
+ 暂停该线程进行等待,但暂停操作会导致上下文切换,这会导致额外的开销;
+ 持续执行空循环进行忙等Busy Wait。这不会导致上下文的切换但是会持续消耗处理器的资源。
暂停策略适用于等待时间较长的场景此时可以抵消上下文切换带来的开销忙等策略适用于等待时间较短的场景此时可以避免持续消耗处理器的资源。Java 虚拟机会根据运行过程中收集到的信息来判断这个锁被线程持有的时间,从而选取最优的策略来进行等待,这就是适应性锁。
## 七、无锁并行
### 7.1 CAS
锁是一种悲观的策略,它假设每一次临界区内的访问都会存在冲突,因此它在同一时刻只允许一个线程进入临界区,而无锁则是一种乐观的策略,它假设临界区内资源的访问很少存在冲突,并采用 CAS 方案来避免这种偶发冲突而导致的线程不安全。CAS 的全称是 Compare And Swap (比较交换),其核心思想如下:
```java
boolean compareAndSwap (Variable V, Object A, Object B){
if(A == V.get()){ // 检查变量值是否被其他线程修改过
V.set(B); // 更新变量值
return true; // 更新成功
}
return false; //变量值已被其他线程修改,更新失败
}
```
如果变量 V 的当前值和调用 CAS 时所提供的变量值 A (即变量的旧值) 一致,那么就说明其他线程并没有修改过变量 V 的值,此时就可以进行更新操作,否则操作失败,整个 CAS 操作的原子性由处理器来进行保证。由于 CAS 操作并不需要频繁的线程调度,因此其通常有着更好的性能表现,为了充分利用 CAS 的特性JDK 提供了原子包来满足各种场景下的使用需求:
| 分组 | 类 |
| ---------- | ------------------------------------------------------------ |
| 基础数据型 | AtomicBoolean、AtomicInteger、AtomicLong |
| 引用型 | AtomicReference、AtomicStampedReference、AtomicMarkableReference |
| 数组型 | AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray |
| 字段更新型 | AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater |
### 7.2 引用型
除了使用原子包中提供的 AtomicBoolean、AtomicInteger、AtomicLong 来保证基本数据类型操作的原子性外,还可以使用 AtomicReference\<V> 来保证任意类型操作原子性,示例如下:
```java
public class J1_SimpleType {
private static int i = 0;
private static AtomicInteger j = new AtomicInteger(0);
/*使用AtomicReference对普通对象进行封装*/
private static AtomicReference<Integer> k = new AtomicReference<>(0);
static class Task implements Runnable {
private CountDownLatch latch;
Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
i++;
j.incrementAndGet();
k.getAndUpdate(x -> x + 1);
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 10000;
CountDownLatch latch = new CountDownLatch(number);
Semaphore semaphore = new Semaphore(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Task task = new Task(latch);
for (int i = 0; i < number; i++) {
semaphore.acquire();
executorService.execute(task);
semaphore.release();
}
latch.await();
System.out.println("输出i的值" + i);
System.out.println("输出j的值" + j.get());
System.out.println("输出K的值" + k.get());
executorService.shutdown();
}
}
```
在使用 CAS 的过程中,一个比较常见的隐患是 **A-B-A 问题**:如果其他线程在将共享变量的值修改为 B 后又立即修改回原值此时这次变更对于其他线程而言可能无法感知到。这对于计数等场景而言是没有问题的但在一些特别的场景下就会导致错误。想要解决这个问题可以在比较时候除了比较变量的值外还应进行时间戳的比较AtomicStampedReference 就是这种比较思路的一种实现。其更新值的方法定义如下:
```java
compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
```
### 7.3 数组型
数组型可以保证对数据内元素的操作是线程安全的,示例如下:
```java
public class J3_ArrayElementThreadUnsafe {
private static int capacity = 10;
// 保证对集合内元素的操作具有原子性
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(capacity);
// 对集合内元素的操作线程不安全
private static Vector<Integer> vector = new Vector<>(capacity);
// 对集合内元素的操作线程不安全
private static ArrayList<Integer> arrayList = new ArrayList<>(capacity);
static {
for (int i = 0; i < capacity; i++) {
arrayList.add(0);
vector.add(0);
}
}
static class Task implements Runnable {
private CountDownLatch latch;
Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
int num = i % capacity;
atomicIntegerArray.getAndIncrement(num);
vector.set(num, vector.get(num) + 1);
arrayList.set(num, arrayList.get(num) + 1);
}
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 1000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch));
}
latch.await();
System.out.println("集合内元素的线程安全:");
System.out.println("atomicIntegerArray size : " + atomicIntegerArray);
System.out.println("vector size : " + vector);
System.out.println("arrayList size : " + arrayList);
executorService.shutdown();
}
}
// 输出如下:
集合内元素的线程安全
atomicIntegerArray size : [100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]
vector size : [69966, 81954, 78605, 79144, 66532, 75082, 77324, 78723, 78022, 76294]
arrayList size : [99045, 99173, 99251, 98609, 99248, 99191, 98848, 99181, 99212, 99083]
```
### 7.4 字段更新型
如果某个类的基本类型的字段在某一环境中存在线程安全,但该字段在多个环境中都有引用,此时直接修改该字段可能会导致多个环境都需要重新验证,在这种情况下可以使用字段更新型来保证其在特定环境下的线程安全:
```java
public class J5_AtomicIntegerFieldUpdater {
static class Task implements Runnable {
private Candidate candidate;
private CountDownLatch latch;
private AtomicIntegerFieldUpdater fieldUpdater;
Task(CountDownLatch latch, Candidate candidate, AtomicIntegerFieldUpdater fieldUpdater) {
this.candidate = candidate;
this.latch = latch;
this.fieldUpdater = fieldUpdater;
}
@Override
public void run() {
fieldUpdater.incrementAndGet(candidate);
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int number = 100000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Candidate candidate = new Candidate("候选人", 0);
// 使用字段更新型来保证其线程安全
AtomicIntegerFieldUpdater<Candidate> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch, candidate, fieldUpdater));
}
latch.await();
System.out.println(candidate.getName() + "获得票数:" + candidate.getScore());
executorService.shutdown();
}
private static class Candidate {
private String name;
// 1. 不能声明为private 2. 必须用volatile关键字修饰
public volatile int score;
.....
}
}
```
需要注意的是由于 CAS 只能保证可见性,不能保证原子性,所以该变量必须使用 volatile 关键字修饰,并且由于 FieldUpdater 是采用反射机制来获取该变量的值,所以其也不能声明为 private 。另外 FieldUpdater 也不能用于 static 类型的变量。
## 八、线程间的协作
### 8.1 等待与通知
为了支持多线程之间的协作JDK 中提供了两个非常重要的方法:`wait()``notify()` ,这两个方法定义在 `Object` 类中,这意味着任何 Java 对象都可以调用者两个方法。如果一个线程调用了 `object.wait()` 方法,那么它就会进入该对象的等待队列中,这个队列中可能包含了多个线程,此时代表多个线程都在等待同一个对象;当 `object.notify()` 方法被调用时,它就会从这个等待队列中**随机**唤醒一个线程。 为了支持多线程之间的协作JDK 中提供了两个非常重要的方法:`wait()``notify()` ,这两个方法定义在 `Object` 类中,这意味着任何 Java 对象都可以调用者两个方法。如果一个线程调用了 `object.wait()` 方法,那么它就会进入该对象的等待队列中,这个队列中可能包含了多个线程,此时代表多个线程都在等待同一个对象;当 `object.notify()` 方法被调用时,它就会从这个等待队列中**随机**唤醒一个线程。
@ -756,7 +1051,7 @@ object.notify();
object.notify(); object.notify();
``` ```
### 7.2 条件变量 ### 8.2 条件变量
综上所述可以使用 `wait()``notify()` 配合内部锁 synchronized 可以实现线程间的等待与唤醒,如果你使用的是显示锁而不是内部锁,此时可以使用 Condition 来实现同样的效果。Condition 接口中定义了如下方法: 综上所述可以使用 `wait()``notify()` 配合内部锁 synchronized 可以实现线程间的等待与唤醒,如果你使用的是显示锁而不是内部锁,此时可以使用 Condition 来实现同样的效果。Condition 接口中定义了如下方法:
@ -810,7 +1105,7 @@ Thread-0线程等待通知...
Thread-0线程后续操作 Thread-0线程后续操作
``` ```
### 7.3 Join ### 8.3 Join
`Thread.join()` 可以让当前线程等待目标线程结束后再开始运行,示例如下: `Thread.join()` 可以让当前线程等待目标线程结束后再开始运行,示例如下:
@ -845,7 +1140,7 @@ public class J2_Join {
// 此时主线程需要等待子线程运行完成输出结果为100000 // 此时主线程需要等待子线程运行完成输出结果为100000
``` ```
### 7.4 CountDownLatch ### 8.4 CountDownLatch
`Thread.join()` 可以让当前线程等待目标线程结束后再开始运行,但大多数时候,你只需要等待目标线程完成特定的操作,而不必等待其完全终止。此时可以使用条件变量 Condition 来实现,也可以使用更为简单的工具类 CountDownLatch 。CountDownLatch 会在内部维护一个计数器,每次完成一个任务,则计数器减 1当计数器为 0 时,则唤醒所有的等待线程,示例如下: `Thread.join()` 可以让当前线程等待目标线程结束后再开始运行,但大多数时候,你只需要等待目标线程完成特定的操作,而不必等待其完全终止。此时可以使用条件变量 Condition 来实现,也可以使用更为简单的工具类 CountDownLatch 。CountDownLatch 会在内部维护一个计数器,每次完成一个任务,则计数器减 1当计数器为 0 时,则唤醒所有的等待线程,示例如下:
@ -915,7 +1210,7 @@ public class J2_CountDown {
// 使用CountDownLatch 时主线程需要等待所有的子线程计算完成后再输出计算结果为100 // 使用CountDownLatch 时主线程需要等待所有的子线程计算完成后再输出计算结果为100
``` ```
### 7.5 CyclicBarrier ### 8.5 CyclicBarrier
CyclicBarrier 和 CountDownLatch 类似,都是用于等待一个或者多个线程完成特定的任务后再执行某项操作,但不同的是它可以循环使用,示例如下: CyclicBarrier 和 CountDownLatch 类似,都是用于等待一个或者多个线程完成特定的任务后再执行某项操作,但不同的是它可以循环使用,示例如下:
@ -967,9 +1262,9 @@ public class J1_CyclicBarrier {
基于 CyclicBarrier 的特性,通常可以用于在测试环境来模仿高并发,如每次等待一万个线程启动后再让其并发执行某项压力测试。 基于 CyclicBarrier 的特性,通常可以用于在测试环境来模仿高并发,如每次等待一万个线程启动后再让其并发执行某项压力测试。
### 7.6 Semaphore ### 8.6 Semaphore
信号量Semaphore可以看做是锁的扩展由于锁的排它性所以一次只允许一个线程来访问某个特定的资源 而 Semaphore 则允许多个线程并发的访问某个特定的资源,并且可以通过配来限制并发访问的线程的数量,因此其可以用于流量控制等场景中: 信号量Semaphore可以看做是锁的扩展由于锁的排它性所以一次只允许一个线程来访问某个特定的资源 而 Semaphore 则允许多个线程并发的访问某个特定的资源,并且可以通过配置许可证的数量来限制并发访问的线程,因此其可以用于流量控制等场景中:
```java ```java
public class J1_Semaphore { public class J1_Semaphore {
@ -1014,11 +1309,48 @@ public class J1_Semaphore {
.... ....
``` ```
### 8.7 LockSupport
LockSupport 可以在线程内的任意位置实现阻塞。它采用和 Semaphore 类似的信号量机制:它为每个线程准备一个许可,如果许可可用,则 `park()` 方法会立即返回,并且消费掉这个许可,让许可不可用;此时因为许可不可用,相应的线程就会被阻塞。而 `unpark()` 则会使得一个许可从不可用变为可用。但和 Semaphore 不同的是:它的许可不能累加,你不可能拥有超过一个许可,它永远只有一个:
```java
public class J1_LockSupport {
static class Task implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
System.out.println("线程" + id + "开始阻塞");
LockSupport.park();
System.out.println("线程" + id + "解除阻塞");
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread01 = new Thread(new Task());
Thread thread02 = new Thread(new Task());
thread01.start();
thread02.start();
Thread.sleep(3000);
System.out.println("主线程干预");
LockSupport.unpark(thread01);
LockSupport.unpark(thread02);
}
}
// 输出:
线程13开始阻塞
线程14开始阻塞
主线程干预
线程13解除阻塞
线程14解除阻塞
```
## 八、线程池
### 8.1 线程池分类 ## 九、线程池
### 9.1 线程池分类
为了避免重复创建和销毁线程而导致额外的性能开销JDK 提供了线程池功能来实现线程的复用,具体分为以下几类: 为了避免重复创建和销毁线程而导致额外的性能开销JDK 提供了线程池功能来实现线程的复用,具体分为以下几类:
+ **newFixedThreadPool()**:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新任务提交时,如果线程池中存在空闲的线程,则立即执行;如果没有,则新任务会被暂时存在一个任务队列中,待有线程空闲时再进行处理。 + **newFixedThreadPool()**:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新任务提交时,如果线程池中存在空闲的线程,则立即执行;如果没有,则新任务会被暂时存在一个任务队列中,待有线程空闲时再进行处理。
@ -1057,7 +1389,7 @@ public class J1_ThreadPool {
} }
``` ```
### 8.2 定时任务 ### 9.2 定时任务
上面线程池分类中的 `newSingleThreadScheduledExecutor()``newScheduledThreadPool()` 都可以用于创建支持定时任务的线程池,它们返回的都是 ScheduledExecutorService 接口的实例。ScheduledExecutorService 接口中定义了如下三类定时方法: 上面线程池分类中的 `newSingleThreadScheduledExecutor()``newScheduledThreadPool()` 都可以用于创建支持定时任务的线程池,它们返回的都是 ScheduledExecutorService 接口的实例。ScheduledExecutorService 接口中定义了如下三类定时方法:
@ -1123,13 +1455,380 @@ public class J2_ScheduledTask {
} }
``` ```
### 8.3 线程池内部实现 ### 9.3 线程池内部实现
## 九、并发容器 不管是使用 `newFixedThreadPool()` 还是使用 `newCachedThreadPool()` 来创建线程池,其最终调用的都是 ThreadPoolExecutor 的构造器,定义如下:
```java
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //超过核心线程数的线程的存活时间
TimeUnit unit, //存活时间的单位
BlockingQueue<Runnable> workQueue, //任务队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler) //拒绝策略
```
**1. 线程工厂**
ThreadFactory 用于指定线程的创建方式,示例如下:
```java
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 将所有线程都设置为守护线程
thread.setDaemon(true);
System.out.println("create" + thread.getName());
return thread;
}
}
```
**2. 拒绝策略**
当线程池中可用线程的数量为 0并且等待队列已满的情况下线程池需要按照 RejectedExecutionHandler 指定的拒绝策略来决定如何处理后续提交任务JDK 中默认提供了以下四种拒绝策略:
+ **ThreadPoolExecutor.AbortPolicy**:直接拒绝新提交的任务,并抛出异常;
+ **ThreadPoolExecutor.DiscardPolicy**静默拒绝新提交的任务,并不抛出异常;
+ **ThreadPoolExecutor.DiscardOldestPolicy**:丢弃等待时间最长的任务,然后再尝试执行新提交的任务;
+ **ThreadPoolExecutor.CallerRunsPolicy**:直接在调用 execute 方法的线程中运行新提交的任务。
### 9.4 线程池扩展
ThreadPoolExecutor 除了提供丰富的参数来满足多样化的需求外,还支持重载其生命周期方法来进行更加灵活的扩展:
```java
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("线程" + t.getName() + "准备执行");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("线程" + r + "执行结束");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
```
### 9.5 线程池大小
线程池的大小可以通过以下公式进行估算:
<div align="center">
N<sub>cpu</sub> = CPU的数量 </br>
U<sub>cpu</sub> = 目标CPU的使用率 0 <= U<sub>cpu</sub> <= 1 </br>
W/C = 等待时间与计算时间的比率 </br></br>
为保证处理器达到期望的使用率,最优的线程池的大小等于: </br>
N<sub>threads</sub> = N<sub>cpu</sub> x U<sub>cpu</sub> x (1+W/C) </br>
</div>
## 十、Future
### 10.1 Future
如果你在创建线程时,使用的是 Runnable 接口,那么此时你是无法获取线程执行结果的,如果想要获取线程的执行结果,需要实现 Callable 接口,示例如下:
```java
public class J0_Callable {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return 100;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executors = Executors.newSingleThreadExecutor();
Future<Integer> submit = executors.submit(new Task());
System.out.println("计算结果为:" + submit.get());
executors.shutdown();
}
}
```
此时通过 `ExecutorService.submit()` 进行提交,得到的是一个 Future 对象,它包含了线程的执行结果,当你调用其 `get()` 方法时,它会阻塞直至获取到线程的返回结果。
### 10.2 FutureTask
使用 Callable 接口的限制是:其只能使用线程池提交,而不能使用单独的线程进行提交。如果想要使用单独的线程提交,可以使用 FutureTask 对其进行包装FutureTask 是 Runnable 接口的实现类,可以用于任何场景下的提交,示例如下:
```java
static class Task implements Callable<Integer> {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return 100;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask01 = new FutureTask<>(new Task());
FutureTask<Integer> futureTask02 = new FutureTask<>(new Task());
// 使用独立的线程执行
new Thread(futureTask01).start();
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 使用线程池提交
executorService.submit(futureTask02);
System.out.println("futureTask01 计算结果为:" + futureTask01.get());
System.out.println("futureTask02 计算结果为:" + futureTask01.get());
executorService.shutdown();
}
```
### 10.3 CompletableFuture
CompletableFuture 是 JDK 8 提供的增强后 Future ,它支持流式调用,等待唤醒等一系列新的功能:
**1. 等待唤醒**
```java
public class J2_CompletableFuture {
static class Compute implements Runnable {
private CompletableFuture<Integer> future;
Compute(CompletableFuture<Integer> future) {
this.future = future;
}
@Override
public void run() {
try {
System.out.println("子线程等待主线程运算完成····");
Integer integer = future.get();
System.out.println("子线程完成后续运算:" + integer * integer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
int intermediateResult;
CompletableFuture<Integer> future = new CompletableFuture<>();
// 启动子线程
new Thread(new Compute(future)).start();
System.out.println("启动主线程");
Thread.sleep(2000);
System.out.println("主线程计算完成");
// 假设主线程计算结果为 100
intermediateResult = 100;
// 传递主线程的计算结果给子线程
future.complete(intermediateResult);
}
}
// 输出
启动主线程
子线程等待主线程运算完成····
主线程计算完成
子线程完成后续运算:10000
```
**2. supplyAsync**
CompletableFuture 的 supplyAsync 可以将一个正常的方法以异步的方式来执行:
```java
public class J3_SupplyAsync {
private static Integer compute() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程计算完成");
return 100;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(J3_SupplyAsync::compute);
System.out.println("主线程等待子线程计算完成");
Integer integer = supplyAsync.get();
System.out.println("主线程计算完成:" + integer * integer);
}
}
```
**3. 流式调用**
CompletableFuture 支持大部分流式处理的特性,示例如下:
```java
public class J4_StreamingCall {
private static Integer compute() {
System.out.println("compute所在线程" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}
private static Integer multi(Integer integer) {
try {
System.out.println("multi所在线程" + Thread.currentThread().getId());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * integer;
}
private static void accept(Integer integer) {
System.out.println("accept所在线程" + Thread.currentThread().getId());
System.out.println("accept方法消费掉计算结果:" + integer);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(J4_StreamingCall::compute)
.thenApply(J4_StreamingCall::multi)
.thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了
.thenAccept(x -> System.out.println("运算结果:" + x));
future.get(); //类似于流式计算的惰性求值,如果缺少这一步,不会有任何输出
}
}
```
**4. 组合多个 CompletableFuture**
除了使用单个的 CompletableFuture还可以通过 thenCompose 或 thenCombineAsync 来组合多个 CompletableFuture
```java
public class J6_Combination {
private static Integer compute() {
System.out.println("compute 所在线程:" + Thread.currentThread().getId());
return 100;
}
private static Integer multi(Integer integer) {
System.out.println("epr 所在线程:" + Thread.currentThread().getId());
return integer * integer;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 组合实现方式1 thenCompose 一个计算的输入依赖另外一个计算的结果
CompletableFuture<Void> future01 = CompletableFuture.supplyAsync(J6_Combination::compute)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> multi(x)))
.thenAccept(x -> System.out.println("运算结果:" + x)); // 运算结果:10000
future01.get();
System.out.println();
// 组合实现方式2 thenCombineAsync 两个计算之间不依赖
CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(J6_Combination::compute);
CompletableFuture<Integer> future03 = CompletableFuture.supplyAsync(() -> J6_Combination.multi(100));
CompletableFuture<Integer> futureAll = future02.thenCombineAsync(future03, (x, y) -> x + y);
System.out.println("运算结果:" + futureAll.get()); // 运算结果:10100
}
}
```
## 十一、ThreadLocal
ThreadLocal 是以增加资源的方式来避免竞态,它会为每一个线程创建一份私有的资源,从而避免对公共资源的竞争。实例如下:
```java
/**
* 线程不安全的SimpleDateFormat
*/
public class J1_ThreadUnsafe {
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static int sum = 1000;
private static CountDownLatch countDownLatch = new CountDownLatch(sum);
private static AtomicInteger atomicInteger = new AtomicInteger(0);
static class Task implements Runnable {
@Override
public void run() {
try {
Date parse = sdf.parse("2018-08-08 08:08:08");
System.out.println(parse);
atomicInteger.incrementAndGet();
} catch (ParseException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < sum; i++) {
executorService.execute(new Task());
}
countDownLatch.await();
System.out.println("格式化成功次数为:" + atomicInteger.get());
}
}
```
因为 SimpleDateFormat 是线程不安全的,因此其格式化成功的次数总是小于 100 次,此时可以使用 ThreadLocal 进行改写,让每个线程都持有自己独立的格式化器,具体如下:
```java
public class J2_ThreadSafe {
private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<>();
private static int sum = 1000;
private static CountDownLatch countDownLatch = new CountDownLatch(sum);
private static AtomicInteger atomicInteger = new AtomicInteger(0);
static class Task implements Runnable {
@Override
public void run() {
try {
// 如果当前线程中不存在该值,则创建一个
if (threadLocal.get() == null) {
threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
// 使用线程私有的SimpleDateFormat
Date parse = threadLocal.get().parse("2018-08-08 08:08:08");
System.out.println(parse);
atomicInteger.incrementAndGet();
} catch (ParseException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < sum; i++) {
executorService.execute(new Task());
}
countDownLatch.await();
System.out.println("格式化成功次数为:" + atomicInteger.get());
executorService.shutdown();
}
}
```
## 十、无锁并行
## 十一、Future

View File

@ -1400,7 +1400,6 @@ public class Test {
##### 3.合理优化线程池的数量 ##### 3.合理优化线程池的数量
<div align="center"> <img src="../pictures/合理线程池数量.png"/> </div></br> <div align="center"> <img src="../pictures/合理线程池数量.png"/> </div></br>
#### 3.2.3 Fork/Join 框架 #### 3.2.3 Fork/Join 框架
ForkJoin主要提供了两个主要的执行任务的接口。RecurisiveAction与RecurisiveTask ForkJoin主要提供了两个主要的执行任务的接口。RecurisiveAction与RecurisiveTask
@ -1626,7 +1625,8 @@ CAS的全称是Compare And Swap 即比较交换,其算法核心思想如下
如果V值等于E值则将V的值设为N。若V值和E值不同则说明已经有其他线程做了更新则当前线程什么都不做。通俗的理解就是CAS操作需要我们提供一个期望值当期望值与当前线程的变量值相同时说明还没线程修改该值当前线程可以进行修改也就是执行CAS操作但如果期望值与当前线程不符则说明该值已被其他线程修改此时不执行更新操作但可以选择重新读取该变量再尝试再次修改该变量也可以放弃操作。 如果V值等于E值则将V的值设为N。若V值和E值不同则说明已经有其他线程做了更新则当前线程什么都不做。通俗的理解就是CAS操作需要我们提供一个期望值当期望值与当前线程的变量值相同时说明还没线程修改该值当前线程可以进行修改也就是执行CAS操作但如果期望值与当前线程不符则说明该值已被其他线程修改此时不执行更新操作但可以选择重新读取该变量再尝试再次修改该变量也可以放弃操作。
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/原子包.png"/> </div></br> <div align="center"> <img src="../pictures/原子包.png"/> </div></br>
```java ```java
// 原子类 // 原子类
public class Test { public class Test {