learn-tech/专栏/Java并发:JUC入门与进阶/09结果如何?线程的秘密告白.md
2024-10-16 00:20:59 +08:00

353 lines
15 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

因收到Google相关通知网站将会择期关闭。相关通知内容
09 结果如何?线程的秘密告白
我们在前几节中学习了线程如何组织以及如何保证线程安全,但是我们介绍的线程的使用方式几乎全部是 Runnable 接口,虽然我们也稍微讲了一下 Callable 的使用方式。
那么,本章节将重点讲解 Callable 接口的详细用法。
一、Callable 接口
存在即合理,我们之前所学的 Runnable 接口存在以下两个缺陷:
Runnable 接口不能返回返回值;
Runnable 接口不允许抛出一个异常。
以上的两个缺陷导致于 Runnable 接口在一些特定的开发场景中,实现某一些特定功能很麻烦。比如,我现在有 100w 的数据,需要你采用线程池将 100w 的数据拆分为 10 个线程执行,当其中一个线程出问题后,需要将错误信息,以及出错的区间返回!
如果使用 Runnable 接口来实现就会比较麻烦,需要借助我们之前讲的 CountDownLatch 等类似的工具来进行计数实现,而且 Runnable 还无法返回出错的信息和区间。但如果采用本节课即将讲到的 Callable 接口来实现这个功能,整体就会简单很多。
我们先学习一下它的基础使用方式:
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> stringFutureTask = new FutureTask<>(new Task());
new Thread(stringFutureTask).start();
//获取线程的返回结果
System.out.println(stringFutureTask.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
return "我是执行结果";
}
}
}
从上述的代码可以看到,我们将 Callable 包装成了一个 FutureTask后续对于 Callable 的操作主要集中在 FutureTask 上。
Callable 获取结果的时候会抛出两个异常ExecutionException、InterruptedException。其中 InterruptedException 是当线程被中断或者任务被取消的时候抛出的异常,当任务抛出异常的时候会触发 ExecutionException当任务被取消的时候会出现一个 TimeoutException。
接下来,我们将针对 FutureTask 的主要常用的 API 做一个详细的介绍。
boolean cancel(boolean mayInterruptIfRunning)
作用是用于取消与 Future 关联的计算任务。
参数 mayInterruptIfRunning 用于确定是否中断正在执行任务的线程。
如果任务已经完成或已经被取消,或者由于其他原因不能被取消,则此方法返回 false ;否则,任务将被取消,并返回 true 。
boolean isCancelled()
作用是用于检查与此 Future 关联的计算任务是否已被取消。
如果任务已经被取消,则返回 true ;否则,返回 false 。
boolean isDone()
作用是用于检查与此 Future 关联的计算任务是否已经完成。
如果任务已经完成(包括正常完成、取消或由于异常而完成),则返回 true。
V get()
作用是用于获取与此 Future 关联的计算结果。
如果计算尚未完成,则此方法将阻塞当前线程,直到计算完成为止。
如果计算已经完成,它会立即返回结果。
如果计算抛出异常,此方法也会抛出相应的异常。
V get(long timeout, TimeUnit unit)
作用是用于获取与此 Future 关联的计算结果,但是在指定的时间内如果计算尚未完成,则抛出 TimeoutException 异常。
参数 timeout 表示超时时间, unit 表示时间单位。
注意,这里的 Future 任务虽然提供了取消任务的能力,但是当任务没有处于阻塞状态的时候,实际上任务并不会停止,它只能取消能够响应中断任务的任务。加入类似的任务是一个死循环,此时程序无法被停止。
下面我们学习正确停止死循环的两个方式(重点请关注一个注意点,响应中断任务)。
使用判断线程存活的方式来验证是否需要继续执行:
public class StopTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
new Thread(stringFutureTask).start();
//获取线程的返回结果
Thread.sleep(1000);
System.out.println(stringFutureTask.cancel(true));
System.out.println("任务被停止");
System.out.println(stringFutureTask.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("线程正在运行");
}
return "运行完成";
}
}
}
采用睡眠中断的形式来响应取消的指令:
public class StopTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
new Thread(stringFutureTask).start();
//获取线程的返回结果
Thread.sleep(1000);
System.out.println(stringFutureTask.cancel(true));
System.out.println("任务被停止");
System.out.println(stringFutureTask.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
while (true) {
System.out.println("线程正在运行");
Thread.sleep(500);
}
}
}
}
与第一种案例不同的是,这里使用了 sleep 来阻塞程序当发起取消任务申请的时候Task 会抛出中断异常,从而会从 call 方法的循环中跳出,并结束程序。当任务被取消成功后,调用 get 方法获取结果会抛出异常 CancellationException
二、线程池使用 Callable
后面我们学习如何配合线程池来使用 Callable 接口,线程池使用 Callable 与直接使用类似,基础使用如下:
public class ThreadPoolCallable {
private final static AtomicInteger IDX = new AtomicInteger(0);
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 3, 5, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "test-" + IDX.getAndIncrement());
}
}, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws Exception {
Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
System.out.println(submit.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
try {
Thread.sleep(2000);
}catch (Exception e) {
e.printStackTrace();
}
return "我是返回结果";
}
}
}
与我们之前使用线程池提交任务不同的是,这里使用的是 submit 来提交任务,提交任务完成后返回一个 Future ,内部的 API 与上文同理,这里不做太多的解释。
接下来,我们将针对 Future 来设计几个使用案例来加深你的印象。
三、案例
1. 超时案例
我们有一个系统,需要调用第三方的接口获取数据,但是因为我们系统的用户体验要求,如果 3 秒内接口没有返回,就返回一个第三方接口网络异常;如果 3 秒内返回了,就返回第三方数据访问成功。
public class ThreadPoolCallableCase1 {
private final static AtomicInteger IDX = new AtomicInteger(0);
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
ThreadPoolCallableCase1 threadPoolCallableCase1 = new ThreadPoolCallableCase1();
System.out.println(threadPoolCallableCase1.getData());
}
public String getData(){
Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
try {
return submit.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return "手动中断任务";
} catch (ExecutionException e) {
return "第三方异常";
} catch (TimeoutException e) {
//超时了就取消任务
System.out.println(submit.cancel(true));
return "第三方接口网络超时";
}
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
try {
Thread.sleep((long) (Math.random() * 7000));
}catch (Exception e) {
System.out.println("任务被主动中断");
}
return "第三方数据返回成功";
}
}
}
这里可以看到,我们使用了带有等待时间的 get 方法来获取数据,当在规定时间内还没有返回数据的时候,此时就会抛出第三方接口网络超时的异常信息。
2. 并行计算下的结果获取
假设我们存在 10w 的数据,现在要将其分为 10 个线程处理,每一个线程处理 1w 的数据写入数据库,当数据全部写入成功后,返回写入成功;当数据某一批写入失败,需要返回哪一个区间写入失败。
public class ThreadDbTest {
private final static AtomicInteger IDX = new AtomicInteger(0);
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
List<Future<String>> futures = new ArrayList<>();
Future<String> submit1 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(1, 10000, true));
Future<String> submit2 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(10001, 20000, true));
Future<String> submit3 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(20001, 30000, true));
Future<String> submit4 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(30001, 40000, true));
Future<String> submit5 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(40001, 50000, false));
Future<String> submit6 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(50001, 60000, true));
Future<String> submit7 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(70001, 80000, true));
Future<String> submit8 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(80001, 90000, true));
Future<String> submit9 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(90001, 100000, true));
futures.add(submit1);
futures.add(submit2);
futures.add(submit3);
futures.add(submit4);
futures.add(submit5);
futures.add(submit6);
futures.add(submit7);
futures.add(submit8);
futures.add(submit9);
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static class BatchWriteDbTask implements Callable<String> {
private final Integer minIndex;
private final Integer maxIndex;
/**
* 模拟使用, 当为true的时候就写入成功 当为false就写入失败
*/
private final boolean isSuccess;
public BatchWriteDbTask(Integer minIndex, Integer maxIndex, boolean isSuccess) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
this.isSuccess = isSuccess;
}
@Override
public String call() throws Exception {
System.out.println("开始批量写入数据 " + minIndex + "至" + maxIndex);
if(!isSuccess) {
throw new Exception("数据 " + minIndex + "至" + maxIndex + "写入失败,请手动处理。");
}
Thread.sleep(5000);
return "数据" + minIndex + "至" + maxIndex + "写入成功";
}
}
}
我们查看最终的运行结果:
开始批量写入数据 1至10000
开始批量写入数据 20001至30000
开始批量写入数据 10001至20000
开始批量写入数据 30001至40000
开始批量写入数据 40001至50000
开始批量写入数据 50001至60000
开始批量写入数据 70001至80000
开始批量写入数据 80001至90000
开始批量写入数据 90001至100000
数据1至10000写入成功
数据10001至20000写入成功
数据20001至30000写入成功
数据30001至40000写入成功
数据50001至60000写入成功
数据70001至80000写入成功
数据80001至90000写入成功
数据90001至100000写入成功
java.util.concurrent.ExecutionException: java.lang.Exception: 数据 40001至50000写入失败请手动处理。
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.eight.ThreadDbTest.main(ThreadDbTest.java:44)
Caused by: java.lang.Exception: 数据 40001至50000写入失败请手动处理。
at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:76)
at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:54)
从最终的运行结果中可以看到,我们程序中预设的 “40001 至 50000 写入失败”是被成功捕获异常并返回的。
因为是异步执行,所以程序返回 Future 的时候可能程序并未开始执行或者正在执行中,为了获取最终的计算结果,程序的整体我们使用了一个集合来存储 Future 结果集,然后任务全部提交后遍历这个集合,使用 get 方法来获取真正的执行结果。当任务执行完毕后get 方法会停止阻塞返回运行结果;当程序运行出错的时候,此时 get 方法会抛出最终的异常信息以供检测使用。
四、总结
本节课我们充分介绍了如何使用 Future 来进行获取线程的数据结果,包括对于 API 的介绍以及使用,我们还使用了几个例子使你加深印象,在异步计算中 Future 能够大大加快计算速度。Dubbo 就是使用 Future 来异步获取 API 的结果以及控制超时等能力的。