Java并发

This commit is contained in:
罗祥 2019-11-27 16:54:51 +08:00
parent fdb2e9404c
commit 7a257140b7
8 changed files with 472 additions and 22 deletions

View File

@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class J2_CountDown {
private static int number = 100;
// 指定计数器的初始值
private static CountDownLatch latch = new CountDownLatch(number);
private static AtomicInteger integer = new AtomicInteger(0);
@ -19,7 +20,7 @@ public class J2_CountDown {
// 假设这是一个耗时的任务
Thread.sleep(3000);
integer.incrementAndGet();
// 计数减一
// 计数器减1
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
@ -33,8 +34,8 @@ public class J2_CountDown {
for (int i = 0; i < number; i++) {
executorService.submit(task);
}
// 等待计数器为0时唤醒所有等待的线程
latch.await();
// 会等待所有任务执行完成再输出
System.out.println("integer" + integer);
executorService.shutdown();
}

View File

@ -4,6 +4,7 @@ import java.util.concurrent.Semaphore;
public class J1_Semaphore {
// 限制并发访问的线程的数量为5
private static Semaphore semaphore = new Semaphore(5);
static class IncreaseTask implements Runnable {

View File

@ -0,0 +1,26 @@
package com.heibaiying.stop;
/**
* 线程终止
*/
public class ThreadStop {
private static volatile boolean stopFlag = true;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (stopFlag) {
try {
Thread.sleep(100);
System.out.println("持续输出");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
Thread.sleep(3 * 1000);
stopFlag = false;
System.out.println("线程终止");
}
}

View File

@ -11,15 +11,22 @@ public class J1_ThreadPool {
static class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行");
try {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "正在执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
// 提交任务到线程池
executorService.submit(new Task());
}
// 关闭线程池此时不再接受新任务但仍会等待原有的任务执行完成如果想要立即关闭则可以使用shutdownNow()
executorService.shutdown();
}
}

View File

@ -33,12 +33,13 @@ public class J2_ScheduledTask {
}
public static void main(String[] args) {
// 为避免相互间的影响以下各种场景最好分别测试
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// 只执行一次
pool.schedule(new Task("schedule"), 2, TimeUnit.SECONDS);
// 指定2秒为固定周期执行如果项目执行耗时5秒则项目结束后立马执行下一次任务所以输出的时间间隔为5秒
// 指定2秒为固定周期执行因为项目执行耗时5秒此时项目结束会立马执行下一次任务所以输出的时间间隔为5秒
pool.scheduleAtFixedRate(new Task("FixedRate"), 0, 2, TimeUnit.SECONDS);
// 总是在上一次项目结束后间隔指定周期执行所以项目耗时5秒还需要间隔2秒执行所以输出的时间间隔为7秒
// 总是在上一次项目结束后间隔指定周期执行因为项目耗时5秒还需要间隔2秒执行所以输出的时间间隔为7秒
pool.scheduleWithFixedDelay(new Task("WithFixedDelay"), 0, 2, TimeUnit.SECONDS);
// pool.shutdown();
}

View File

@ -7,7 +7,7 @@ public class J1_Normal {
private static int j = 0;
public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
j++;

View File

@ -72,12 +72,107 @@ Java 线程的生命周期分为以下五类状态:
![线程完整生命周期](../pictures/线程完整生命周期.jpg)
### 1.4 线程终止
通常线程会随着代码的运行完成而终止,但如果你在线程中进行了循环操作,此时就需要考虑如何安全地停止?虽然 Thread 类中提供了 `stop()` 方法,但是其已经被标识为废弃,因为 `stop()` 只是暴力的停止线程, 此时线程中的操作可能处于中间状态,这会导致错误的结果。想要安全的停止线程,可以通过改变终止标志位的方式来进行实现:
```java
public class ThreadStop {
private static volatile boolean stopFlag = true;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (stopFlag) {
try {
Thread.sleep(100);
System.out.println("持续输出");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
Thread.sleep(3 * 1000);
stopFlag = false;
System.out.println("线程终止");
}
}
```
### 1.5 线程中断
JDK 中提供了以下方法用于实现线程中断:
+ `Thread.interrupt() `:用于给目标线程设置中断标志位,但实际上并不能中断线程;
+ `Thread.isInterrupted()`:通过检查中断标志位,判断当前线程是否被中断;
+ `Thread.interrupted()`:用来判断当前线程的中断状态,同时会清除当前线程的中断标志位。
示例如下:
```java
/**
* interrupt() 只是设置中断标志位,并不能中断线程,所以子线程会持续打印
*/
public class J1_Interrupt {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("子线程打印");
}
});
thread.start();
Thread.sleep(10);
thread.interrupt();
}
}
/**
* isInterrupted() 用于检查当前线程是否存在中断标志位配合interrupt()使用可以中断线程
*/
public class J2_IsInterrupted {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("子线程打印");
}
});
thread.start();
Thread.sleep(10);
thread.interrupt();
}
}
/**
* interrupted() 用于判断当前线程的中断状态,并清除当前线程的中断标志位
*/
public class J3_Interrupted {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
// 此时由于标志位被清除,此时子线程依然会持续打印
while (!Thread.interrupted() || !Thread.currentThread().isInterrupted()) {
System.out.println("子线程打印");
}
});
thread.start();
Thread.sleep(10);
thread.interrupt();
}
}
```
## 二、状态变量与共享变量
**状态变量 (State Variable)** :即类的实例变量,非共享的静态变量。
**共享变量 (Shared Variable)** : 即可以被多个线程共同访问的变量。
## 三、原子性
### 3.1 定义
@ -95,6 +190,8 @@ Java 线程的生命周期分为以下五类状态:
通过 Java 虚拟机规范和非原子性协定, Java 语言可以保证对基本数据类型的访问具有有原子性,如果想要保证更大范围内的原子性(如多行操作的原子性),此时可以使用字节码指令 monitorenter 和 monitorexit 来隐式执行 lock 和 unlock 操作从而将串行变成并行来保证原子性。monitorenter 和 monitorexit 这两个字节码指令反映到 Java 代码中就是 Synchronized 关键字。
## 四、可见性
### 4.1 定义
@ -169,6 +266,8 @@ Java 线程的生命周期分为以下五类状态:
![java内存模型](../pictures/java内存模型.png)
## 五、有序性
### 5.1 顺序语义
@ -553,13 +652,15 @@ public class ReadWriteLock {
}
```
## 七、线程间的协作
### 7.1 等待与通知
为了支持多线程之间的协作JDK 中提供了两个非常重要的方法:`wait()``notify()` ,这两个方法定义在 `Object` 类中,这意味着任何 Java 对象都可以调用者两个方法。如果一个线程调用了 `object.wait()` 方法,那么它就会进入该对象的等待队列中,这个队列中可能包含了多个线程,此时代表多个线程都在等待同一个对象;当 `object.notify()` 方法被调用时,它就会从这个等待队列中**随机**唤醒一个线程。
需要特别注意的是在调用这两个方法时,它们都必须位于对应对象的 synchronzied 语句中,因为这两个对象在调用前都需要获得对应对象的监视器,过程如下:
需要特别注意的是在调用这两个方法时,它们都必须位于对应对象的 synchronzied 语句中,因为这两个方法在调用前都需要获得对应对象的监视器(内部锁),过程如下:
TODO
@ -657,12 +758,15 @@ object.notify();
### 7.2 条件变量
综上所述可以使用 `wait()``notify()` 配合内部锁 synchronized 实现线程间的等待与唤醒,如果你使用的是显示锁而不是内部锁,此时可以使用 Condition 来实现同样的等待唤醒效果。Condition 接口中定义了如下方法:
综上所述可以使用 `wait()``notify()` 配合内部锁 synchronized 可以实现线程间的等待与唤醒,如果你使用的是显示锁而不是内部锁,此时可以使用 Condition 来实现同样的效果。Condition 接口中定义了如下方法:
+ await()
+ awaitUninterruptibly()
+ **await()**:使得当前线程进入等待状态,类似于 `object.wait()`
+ **awaitUninterruptibly()**:与 `await()` 类似,但它不会在等待过程中响应中断;
+ **awaitNanos(long nanosTimeout) & await(long time, TimeUnit unit) & awaitUntil(Date deadline)**:有时间限制的等待;
+ **signal()**:用于随机唤醒一个等待;
+ **signalAll()**:用于唤醒所有等待。
使用示例如下:
和 object 的 `wait()\notify()\notifyAll()` 一样,在使用 condition 的 `await()\signal()\signalAll()` 前,也要求线程必须持有相关的重入锁, 示例如下:
```java
public class AwaitAndSignal {
@ -706,11 +810,326 @@ Thread-0线程等待通知...
Thread-0线程后续操作
```
### 7.3 CountDownLatch
### 7.3 Join
`Thread.join()` 可以让当前线程等待目标线程结束后再开始运行,示例如下:
```java
public class J1_Normal {
private static int j = 0;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
j++;
}
});
thread.start();
System.out.println(j);
}
}
// 此时主线程不等待子线程运行完成通常输出结果为0
public class J2_Join {
private static int j = 0;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
j++;
}
});
thread.start();
thread.join();
System.out.println(j);
}
}
// 此时主线程需要等待子线程运行完成输出结果为100000
```
### 7.4 CountDownLatch
`Thread.join()` 可以让当前线程等待目标线程结束后再开始运行,但大多数时候,你只需要等待目标线程完成特定的操作,而不必等待其完全终止。此时可以使用条件变量 Condition 来实现,也可以使用更为简单的工具类 CountDownLatch 。CountDownLatch 会在内部维护一个计数器,每次完成一个任务,则计数器减 1当计数器为 0 时,则唤醒所有的等待线程,示例如下:
```java
public class j1_Normal {
private static AtomicInteger integer = new AtomicInteger(0);
static class IncreaseTask implements Runnable {
@Override
public void run() {
try {
// 假设这是一个耗时的任务
Thread.sleep(3000);
integer.incrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
IncreaseTask task = new IncreaseTask();
ExecutorService executorService = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++) {
executorService.submit(task);
}
System.out.println("integer" + integer);
executorService.shutdown();
}
}
// 不使用CountDownLatch 时,主线程不会子线程等待计算完成,此时输出通常为: 0
public class J2_CountDown {
private static int number = 100;
// 指定计数器的初始值
private static CountDownLatch latch = new CountDownLatch(number);
private static AtomicInteger integer = new AtomicInteger(0);
static class IncreaseTask implements Runnable {
@Override
public void run() {
try {
// 假设这是一个耗时的任务
Thread.sleep(3000);
integer.incrementAndGet();
// 计数器减1
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
IncreaseTask task = new IncreaseTask();
ExecutorService executorService = Executors.newFixedThreadPool(100);
for (int i = 0; i < number; i++) {
executorService.submit(task);
}
// 等待计数器为0时唤醒所有等待的线程
latch.await();
System.out.println("integer" + integer);
executorService.shutdown();
}
}
// 使用CountDownLatch 时主线程需要等待所有的子线程计算完成后再输出计算结果为100
```
### 7.5 CyclicBarrier
CyclicBarrier 和 CountDownLatch 类似,都是用于等待一个或者多个线程完成特定的任务后再执行某项操作,但不同的是它可以循环使用,示例如下:
```java
/**
* 每五个人完成任务后,则算一个小组已完成
*/
public class J1_CyclicBarrier {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("五人小组任务执行完成"));
static class Task implements Runnable {
@Override
public void run() {
try {
long l = new Double(Math.random() * 5000).longValue();
Thread.sleep(l);
System.out.println("任务" + Thread.currentThread().getId() + "执行完成");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int j = 0; j < 10; j++) {
executorService.submit(new Task());
}
executorService.shutdown();
}
}
// 输出如下:
任务21执行完成
任务20执行完成
任务15执行完成
任务14执行完成
任务22执行完成
五人小组任务执行完成
任务17执行完成
任务13执行完成
任务19执行完成
任务18执行完成
任务16执行完成
五人小组任务执行完成
```
基于 CyclicBarrier 的特性,通常可以用于在测试环境来模仿高并发,如每次等待一万个线程启动后再让其并发执行某项压力测试。
### 7.6 Semaphore
信号量Semaphore可以看做是锁的扩展由于锁的排它性所以一次只允许一个线程来访问某个特定的资源 而 Semaphore 则允许多个线程并发的访问某个特定的资源,并且可以通过配额来限制并发访问的线程的数量,因此其可以用于流量控制等场景中:
```java
public class J1_Semaphore {
// 限制并发访问的线程的数量为5
private static Semaphore semaphore = new Semaphore(5);
static class IncreaseTask implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getId() + "获得锁!");
Thread.sleep(5000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
IncreaseTask task = new IncreaseTask();
for (int i = 0; i < 20; i++) {
new Thread(task).start();
}
}
}
// 输出如下,至多只能有五个线程并发获得锁
13获得锁!
15获得锁!
16获得锁!
18获得锁!
17获得锁!
....
19获得锁!
20获得锁!
21获得锁!
22获得锁!
23获得锁!
....
```
## 八、线程池
### 8.1 线程池分类
为了避免重复创建和销毁线程而导致额外的性能开销JDK 提供了线程池功能来实现线程的复用,具体分为以下几类:
+ **newFixedThreadPool()**:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新任务提交时,如果线程池中存在空闲的线程,则立即执行;如果没有,则新任务会被暂时存在一个任务队列中,待有线程空闲时再进行处理。
+ **newSingleThreadExecutor()** 该方法返回一个只有一个线程的线程池。若多个任务被提交到该线程池,则多余的任务会被保存在一个任务队列中,待线程空闲,按照先入先出的顺序被执行。
+ **newCachedThreadPool()**:根据实际情况动态调整线程数量。当新任务提交时,会优先复用空闲的线程;如果所有线程均处于工作状态,则会创建新的线程来进行处理。
+ **newSingleThreadScheduledExecutor()**:该方法返回一个 ScheduledExecutorService 对象,线程池大小为 1 。SeheduledExectorService 在继承 ExecutorService 的基础上还额外支持定时任务的执行。
+ **newScheduledThreadPool()**:与 newSingleThreadScheduledExecutor 方法类似,但可以指定线程池中线程的数量。
线程池的基本使用如下:
```java
public class J1_ThreadPool {
static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "正在执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
// 提交任务到线程池
executorService.submit(new Task());
}
// 关闭线程池此时不再接受新任务但仍会等待原有的任务执行完成如果想要立即关闭则可以使用shutdownNow()
executorService.shutdown();
}
}
```
### 8.2 定时任务
上面线程池分类中的 `newSingleThreadScheduledExecutor()``newScheduledThreadPool()` 都可以用于创建支持定时任务的线程池,它们返回的都是 ScheduledExecutorService 接口的实例。ScheduledExecutorService 接口中定义了如下三类定时方法:
```java
/*在给定的时间,对任务进行一次性调度*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
/**
* 以上一个任务开始执行时间为起点,等待period时间后开始调度下一次任务
* 如果任务耗时大于period则上一次任务结束后立即执行下一次任务
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,
TimeUnit unit);
/**
* 以上一个任务开始执行时间为起点再经过delay时间后开始调度下一次任务
* 不论任务耗时如何上一次任务结束后都需要等待delay时间之后才可以执行下一次任务
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,
TimeUnit unit);
```
使用示例如下:
```java
public class J2_ScheduledTask {
private static long cacheTime = System.currentTimeMillis();
static class Task implements Runnable {
private String type;
Task(String type) {
this.type = type;
}
@Override
public void run() {
try {
Thread.sleep(5000);
long nowTime = System.currentTimeMillis();
System.out.println(type + Thread.currentThread().getId() + "执行耗时" + (nowTime - cacheTime) + "毫秒");
cacheTime = nowTime;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 为避免相互间的影响,以下各种场景最好分别测试:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// 只执行一次
pool.schedule(new Task("schedule"), 2, TimeUnit.SECONDS);
// 指定2秒为固定周期执行因为项目执行耗时5秒此时项目结束会立马执行下一次任务所以输出的时间间隔为5秒
pool.scheduleAtFixedRate(new Task("FixedRate"), 0, 2, TimeUnit.SECONDS);
// 总是在上一次项目结束后间隔指定周期执行因为项目耗时5秒还需要间隔2秒执行所以输出的时间间隔为7秒
pool.scheduleWithFixedDelay(new Task("WithFixedDelay"), 0, 2, TimeUnit.SECONDS);
// pool.shutdown();
}
}
```
### 8.3
## 九、并发容器
### 7.4 CyclicBarrier
### 7.5 Semaphore

View File

@ -60,8 +60,7 @@
+ `Thread.interrupt() `是一个实例方法,它通知目标线程被中断,也就是设置中断标志位;
+ `Thread.isInterrupted()`也是实例方法,判断当前线程是否有被中断(通过检查中断标志位);
- `Thread.interrupted()`是静态方法:用来判断当前线程的中断状态,但同时会清除当前线程的中断标志位状态。
+ `Thread.interrupted()`是静态方法:用来判断当前线程的中断状态,同时会清除当前线程中断标志位的状态。
```java
// Thread 类
@ -1180,7 +1179,6 @@ public class Test {
### 3.2 线程池
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/Executors.png"/> </div></br>
#### 3.2.1 JDK对线程池的支持
**newFixedThreadPool()方法**:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲的线程,则立即执行。若没有,则新的任务会被暂时存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
@ -1189,7 +1187,7 @@ public class Test {
**newCachedThreadPool()方法**:根据实际情况动态调整线程数量。
**newSingleThreadScheduledExecutor()方法**该方法返回一个ScheduledExecutorService对象线程池大小为1。SeheduledExectorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能如在某个固定的延时之后执行或者周期性执行某个任务。
**newSingleThreadScheduledExecutor()方法**该方法返回一个ScheduledExecutorService对象线程池大小为 1。SeheduledExectorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能如在某个固定的延时之后执行或者周期性执行某个任务。
**newScheduledThreadPool()方法**该方法也返回一个ScheduledExecutorService对象但该线程池可以指定线程数量。
@ -1401,9 +1399,7 @@ public class Test {
##### 3.合理优化线程池的数量
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/合理线程池数量.png"/> </div></br>
<div align="center"> <img src="../pictures/合理线程池数量.png"/> </div></br>
#### 3.2.3 Fork/Join 框架
@ -1631,7 +1627,6 @@ CAS的全称是Compare And Swap 即比较交换,其算法核心思想如下
如果V值等于E值则将V的值设为N。若V值和E值不同则说明已经有其他线程做了更新则当前线程什么都不做。通俗的理解就是CAS操作需要我们提供一个期望值当期望值与当前线程的变量值相同时说明还没线程修改该值当前线程可以进行修改也就是执行CAS操作但如果期望值与当前线程不符则说明该值已被其他线程修改此时不执行更新操作但可以选择重新读取该变量再尝试再次修改该变量也可以放弃操作。
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/原子包.png"/> </div></br>
```java
// 原子类
public class Test {