workQueue, //任务队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler) //拒绝策略
```
**1. 线程工厂**
ThreadFactory 用于指定线程的创建方式,示例如下:
```java
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 将所有线程都设置为守护线程
thread.setDaemon(true);
System.out.println("create" + thread.getName());
return thread;
}
}
```
**2. 拒绝策略**
当线程池中可用线程的数量为 0,并且等待队列已满的情况下,线程池需要按照 RejectedExecutionHandler 指定的拒绝策略来决定如何处理后续提交任务,JDK 中默认提供了以下四种拒绝策略:
+ **ThreadPoolExecutor.AbortPolicy**:直接拒绝新提交的任务,并抛出异常;
+ **ThreadPoolExecutor.DiscardPolicy**:静默拒绝新提交的任务,并不抛出异常;
+ **ThreadPoolExecutor.DiscardOldestPolicy**:丢弃等待时间最长的任务,然后再尝试执行新提交的任务;
+ **ThreadPoolExecutor.CallerRunsPolicy**:直接在调用 execute 方法的线程中运行新提交的任务。
### 9.4 线程池扩展
ThreadPoolExecutor 除了提供丰富的参数来满足多样化的需求外,还支持重载其生命周期方法来进行更加灵活的扩展:
```java
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("线程" + t.getName() + "准备执行");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("线程" + r + "执行结束");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
```
### 9.5 线程池大小
线程池的大小可以通过以下公式进行估算:
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1
W/C = 等待时间与计算时间的比率
为保证处理器达到期望的使用率,最优的线程池的大小等于:
Nthreads = Ncpu x Ucpu x (1+W/C)
## 十、Future
### 10.1 Future
如果你在创建线程时,使用的是 Runnable 接口,那么此时你是无法获取线程执行结果的,如果想要获取线程的执行结果,需要实现 Callable 接口,示例如下:
```java
public class J0_Callable {
static class Task implements Callable {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return 100;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executors = Executors.newSingleThreadExecutor();
Future submit = executors.submit(new Task());
System.out.println("计算结果为:" + submit.get());
executors.shutdown();
}
}
```
此时通过 `ExecutorService.submit()` 进行提交,得到的是一个 Future 对象,它包含了线程的执行结果,当你调用其 `get()` 方法时,它会阻塞直至获取到线程的返回结果。
### 10.2 FutureTask
使用 Callable 接口的限制是:其只能使用线程池提交,而不能使用单独的线程进行提交。如果想要使用单独的线程提交,可以使用 FutureTask 对其进行包装,FutureTask 是 Runnable 接口的实现类,可以用于任何场景下的提交,示例如下:
```java
static class Task implements Callable {
@Override
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return 100;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask01 = new FutureTask<>(new Task());
FutureTask futureTask02 = new FutureTask<>(new Task());
// 使用独立的线程执行
new Thread(futureTask01).start();
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 使用线程池提交
executorService.submit(futureTask02);
System.out.println("futureTask01 计算结果为:" + futureTask01.get());
System.out.println("futureTask02 计算结果为:" + futureTask01.get());
executorService.shutdown();
}
```
### 10.3 CompletableFuture
CompletableFuture 是 JDK 8 提供的增强后 Future ,它支持流式调用,等待唤醒等一系列新的功能:
**1. 等待唤醒**
```java
public class J2_CompletableFuture {
static class Compute implements Runnable {
private CompletableFuture future;
Compute(CompletableFuture future) {
this.future = future;
}
@Override
public void run() {
try {
System.out.println("子线程等待主线程运算完成····");
Integer integer = future.get();
System.out.println("子线程完成后续运算:" + integer * integer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
int intermediateResult;
CompletableFuture future = new CompletableFuture<>();
// 启动子线程
new Thread(new Compute(future)).start();
System.out.println("启动主线程");
Thread.sleep(2000);
System.out.println("主线程计算完成");
// 假设主线程计算结果为 100
intermediateResult = 100;
// 传递主线程的计算结果给子线程
future.complete(intermediateResult);
}
}
// 输出
启动主线程
子线程等待主线程运算完成····
主线程计算完成
子线程完成后续运算:10000
```
**2. supplyAsync**
CompletableFuture 的 supplyAsync 可以将一个正常的方法以异步的方式来执行:
```java
public class J3_SupplyAsync {
private static Integer compute() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程计算完成");
return 100;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(J3_SupplyAsync::compute);
System.out.println("主线程等待子线程计算完成");
Integer integer = supplyAsync.get();
System.out.println("主线程计算完成:" + integer * integer);
}
}
```
**3. 流式调用**
CompletableFuture 支持大部分流式处理的特性,示例如下:
```java
public class J4_StreamingCall {
private static Integer compute() {
System.out.println("compute所在线程:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}
private static Integer multi(Integer integer) {
try {
System.out.println("multi所在线程:" + Thread.currentThread().getId());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * integer;
}
private static void accept(Integer integer) {
System.out.println("accept所在线程:" + Thread.currentThread().getId());
System.out.println("accept方法消费掉计算结果:" + integer);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(J4_StreamingCall::compute)
.thenApply(J4_StreamingCall::multi)
.thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了
.thenAccept(x -> System.out.println("运算结果:" + x));
future.get(); //类似于流式计算的惰性求值,如果缺少这一步,不会有任何输出
}
}
```
**4. 组合多个 CompletableFuture**
除了使用单个的 CompletableFuture,还可以通过 thenCompose 或 thenCombineAsync 来组合多个 CompletableFuture:
```java
public class J6_Combination {
private static Integer compute() {
System.out.println("compute 所在线程:" + Thread.currentThread().getId());
return 100;
}
private static Integer multi(Integer integer) {
System.out.println("epr 所在线程:" + Thread.currentThread().getId());
return integer * integer;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 组合实现方式1 thenCompose 一个计算的输入依赖另外一个计算的结果
CompletableFuture future01 = CompletableFuture.supplyAsync(J6_Combination::compute)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> multi(x)))
.thenAccept(x -> System.out.println("运算结果:" + x)); // 运算结果:10000
future01.get();
System.out.println();
// 组合实现方式2 thenCombineAsync 两个计算之间不依赖
CompletableFuture future02 = CompletableFuture.supplyAsync(J6_Combination::compute);
CompletableFuture future03 = CompletableFuture.supplyAsync(() -> J6_Combination.multi(100));
CompletableFuture futureAll = future02.thenCombineAsync(future03, (x, y) -> x + y);
System.out.println("运算结果:" + futureAll.get()); // 运算结果:10100
}
}
```
## 十一、ThreadLocal
ThreadLocal 是以增加资源的方式来避免竞态,它会为每一个线程创建一份私有的资源,从而避免对公共资源的竞争。实例如下:
```java
/**
* 线程不安全的SimpleDateFormat
*/
public class J1_ThreadUnsafe {
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static int sum = 1000;
private static CountDownLatch countDownLatch = new CountDownLatch(sum);
private static AtomicInteger atomicInteger = new AtomicInteger(0);
static class Task implements Runnable {
@Override
public void run() {
try {
Date parse = sdf.parse("2018-08-08 08:08:08");
System.out.println(parse);
atomicInteger.incrementAndGet();
} catch (ParseException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < sum; i++) {
executorService.execute(new Task());
}
countDownLatch.await();
System.out.println("格式化成功次数为:" + atomicInteger.get());
}
}
```
因为 SimpleDateFormat 是线程不安全的,因此其格式化成功的次数总是小于 100 次,此时可以使用 ThreadLocal 进行改写,让每个线程都持有自己独立的格式化器,具体如下:
```java
public class J2_ThreadSafe {
private static ThreadLocal threadLocal = new ThreadLocal<>();
private static int sum = 1000;
private static CountDownLatch countDownLatch = new CountDownLatch(sum);
private static AtomicInteger atomicInteger = new AtomicInteger(0);
static class Task implements Runnable {
@Override
public void run() {
try {
// 如果当前线程中不存在该值,则创建一个
if (threadLocal.get() == null) {
threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
// 使用线程私有的SimpleDateFormat
Date parse = threadLocal.get().parse("2018-08-08 08:08:08");
System.out.println(parse);
atomicInteger.incrementAndGet();
} catch (ParseException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < sum; i++) {
executorService.execute(new Task());
}
countDownLatch.await();
System.out.println("格式化成功次数为:" + atomicInteger.get());
executorService.shutdown();
}
}
```
## 参考资料
1. 黄文海 . Java多线程编程实战指南(核心篇). 电子工业出版社 . 2017-04
2. 葛一鸣 . 实战Java高并发程序设计(第2版). 电子工业出版社 . 2018-10
3. 周志明 . 深入理解Java虚拟机(第2版). 机械工业出版社 . 2013-09-01
4. 汪文君 . Java高并发编程详解 . 机械工业出版社 . 2018-06