353 lines
15 KiB
Markdown
353 lines
15 KiB
Markdown
|
||
|
||
因收到Google相关通知,网站将会择期关闭。相关通知内容
|
||
|
||
|
||
09 结果如何?线程的秘密告白
|
||
我们在前几节中学习了线程如何组织以及如何保证线程安全,但是我们介绍的线程的使用方式几乎全部是 Runnable 接口,虽然我们也稍微讲了一下 Callable 的使用方式。
|
||
|
||
那么,本章节将重点讲解 Callable 接口的详细用法。
|
||
|
||
一、Callable 接口
|
||
|
||
存在即合理,我们之前所学的 Runnable 接口存在以下两个缺陷:
|
||
|
||
|
||
Runnable 接口不能返回返回值;
|
||
Runnable 接口不允许抛出一个异常。
|
||
|
||
|
||
以上的两个缺陷导致于 Runnable 接口在一些特定的开发场景中,实现某一些特定功能很麻烦。比如,我现在有 100w 的数据,需要你采用线程池将 100w 的数据拆分为 10 个线程执行,当其中一个线程出问题后,需要将错误信息,以及出错的区间返回!
|
||
|
||
如果使用 Runnable 接口来实现就会比较麻烦,需要借助我们之前讲的 CountDownLatch 等类似的工具来进行计数实现,而且 Runnable 还无法返回出错的信息和区间。但如果采用本节课即将讲到的 Callable 接口来实现这个功能,整体就会简单很多。
|
||
|
||
我们先学习一下它的基础使用方式:
|
||
|
||
public class CallableTest {
|
||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||
FutureTask<String> stringFutureTask = new FutureTask<>(new Task());
|
||
new Thread(stringFutureTask).start();
|
||
//获取线程的返回结果
|
||
System.out.println(stringFutureTask.get());
|
||
}
|
||
|
||
private static class Task implements Callable<String> {
|
||
|
||
@Override
|
||
public String call() throws Exception {
|
||
return "我是执行结果";
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
从上述的代码可以看到,我们将 Callable 包装成了一个 FutureTask,后续对于 Callable 的操作主要集中在 FutureTask 上。
|
||
|
||
Callable 获取结果的时候会抛出两个异常,ExecutionException、InterruptedException。其中 InterruptedException 是当线程被中断或者任务被取消的时候抛出的异常,当任务抛出异常的时候会触发 ExecutionException,当任务被取消的时候,会出现一个 TimeoutException。
|
||
|
||
接下来,我们将针对 FutureTask 的主要常用的 API 做一个详细的介绍。
|
||
|
||
|
||
boolean cancel(boolean mayInterruptIfRunning):
|
||
|
||
|
||
作用是用于取消与 Future 关联的计算任务。
|
||
参数 mayInterruptIfRunning 用于确定是否中断正在执行任务的线程。
|
||
如果任务已经完成或已经被取消,或者由于其他原因不能被取消,则此方法返回 false ;否则,任务将被取消,并返回 true 。
|
||
|
||
boolean isCancelled():
|
||
|
||
|
||
作用是用于检查与此 Future 关联的计算任务是否已被取消。
|
||
如果任务已经被取消,则返回 true ;否则,返回 false 。
|
||
|
||
boolean isDone():
|
||
|
||
|
||
作用是用于检查与此 Future 关联的计算任务是否已经完成。
|
||
如果任务已经完成(包括正常完成、取消或由于异常而完成),则返回 true。
|
||
|
||
V get():
|
||
|
||
|
||
作用是用于获取与此 Future 关联的计算结果。
|
||
如果计算尚未完成,则此方法将阻塞当前线程,直到计算完成为止。
|
||
如果计算已经完成,它会立即返回结果。
|
||
如果计算抛出异常,此方法也会抛出相应的异常。
|
||
|
||
V get(long timeout, TimeUnit unit):
|
||
|
||
|
||
作用是用于获取与此 Future 关联的计算结果,但是在指定的时间内如果计算尚未完成,则抛出 TimeoutException 异常。
|
||
参数 timeout 表示超时时间, unit 表示时间单位。
|
||
|
||
|
||
|
||
注意,这里的 Future 任务虽然提供了取消任务的能力,但是当任务没有处于阻塞状态的时候,实际上任务并不会停止,它只能取消能够响应中断任务的任务。加入类似的任务是一个死循环,此时程序无法被停止。
|
||
|
||
下面我们学习正确停止死循环的两个方式(重点请关注一个注意点,响应中断任务)。
|
||
|
||
|
||
使用判断线程存活的方式来验证是否需要继续执行:
|
||
|
||
|
||
public class StopTest {
|
||
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
|
||
FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
|
||
new Thread(stringFutureTask).start();
|
||
//获取线程的返回结果
|
||
Thread.sleep(1000);
|
||
System.out.println(stringFutureTask.cancel(true));
|
||
System.out.println("任务被停止");
|
||
System.out.println(stringFutureTask.get());
|
||
}
|
||
|
||
private static class Task implements Callable<String> {
|
||
|
||
@Override
|
||
public String call() throws Exception {
|
||
while (!Thread.currentThread().isInterrupted()) {
|
||
System.out.println("线程正在运行");
|
||
}
|
||
return "运行完成";
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
|
||
采用睡眠中断的形式来响应取消的指令:
|
||
|
||
|
||
public class StopTest2 {
|
||
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
|
||
FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
|
||
new Thread(stringFutureTask).start();
|
||
//获取线程的返回结果
|
||
Thread.sleep(1000);
|
||
System.out.println(stringFutureTask.cancel(true));
|
||
System.out.println("任务被停止");
|
||
System.out.println(stringFutureTask.get());
|
||
}
|
||
|
||
private static class Task implements Callable<String> {
|
||
|
||
@Override
|
||
public String call() throws Exception {
|
||
while (true) {
|
||
System.out.println("线程正在运行");
|
||
Thread.sleep(500);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
与第一种案例不同的是,这里使用了 sleep 来阻塞程序,当发起取消任务申请的时候,Task 会抛出中断异常,从而会从 call 方法的循环中跳出,并结束程序。当任务被取消成功后,调用 get 方法获取结果会抛出异常 CancellationException!
|
||
|
||
二、线程池使用 Callable
|
||
|
||
后面我们学习如何配合线程池来使用 Callable 接口,线程池使用 Callable 与直接使用类似,基础使用如下:
|
||
|
||
public class ThreadPoolCallable {
|
||
private final static AtomicInteger IDX = new AtomicInteger(0);
|
||
|
||
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 3, 5, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
|
||
@Override
|
||
public Thread newThread(Runnable r) {
|
||
return new Thread(r, "test-" + IDX.getAndIncrement());
|
||
}
|
||
}, new ThreadPoolExecutor.AbortPolicy());
|
||
|
||
|
||
public static void main(String[] args) throws Exception {
|
||
Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
|
||
System.out.println(submit.get());
|
||
|
||
}
|
||
|
||
private static class Task implements Callable<String> {
|
||
|
||
@Override
|
||
public String call() throws Exception {
|
||
try {
|
||
Thread.sleep(2000);
|
||
}catch (Exception e) {
|
||
e.printStackTrace();
|
||
}
|
||
return "我是返回结果";
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
与我们之前使用线程池提交任务不同的是,这里使用的是 submit 来提交任务,提交任务完成后返回一个 Future ,内部的 API 与上文同理,这里不做太多的解释。
|
||
|
||
接下来,我们将针对 Future 来设计几个使用案例来加深你的印象。
|
||
|
||
三、案例
|
||
|
||
1. 超时案例
|
||
|
||
我们有一个系统,需要调用第三方的接口获取数据,但是因为我们系统的用户体验要求,如果 3 秒内接口没有返回,就返回一个第三方接口网络异常;如果 3 秒内返回了,就返回第三方数据访问成功。
|
||
|
||
public class ThreadPoolCallableCase1 {
|
||
private final static AtomicInteger IDX = new AtomicInteger(0);
|
||
|
||
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());
|
||
|
||
|
||
public static void main(String[] args) {
|
||
|
||
ThreadPoolCallableCase1 threadPoolCallableCase1 = new ThreadPoolCallableCase1();
|
||
System.out.println(threadPoolCallableCase1.getData());
|
||
}
|
||
|
||
public String getData(){
|
||
Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
|
||
try {
|
||
return submit.get(3, TimeUnit.SECONDS);
|
||
} catch (InterruptedException e) {
|
||
return "手动中断任务";
|
||
} catch (ExecutionException e) {
|
||
return "第三方异常";
|
||
} catch (TimeoutException e) {
|
||
//超时了就取消任务
|
||
System.out.println(submit.cancel(true));
|
||
return "第三方接口网络超时";
|
||
}
|
||
}
|
||
|
||
private static class Task implements Callable<String> {
|
||
|
||
@Override
|
||
public String call() throws Exception {
|
||
try {
|
||
Thread.sleep((long) (Math.random() * 7000));
|
||
}catch (Exception e) {
|
||
System.out.println("任务被主动中断");
|
||
}
|
||
return "第三方数据返回成功";
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
这里可以看到,我们使用了带有等待时间的 get 方法来获取数据,当在规定时间内还没有返回数据的时候,此时就会抛出第三方接口网络超时的异常信息。
|
||
|
||
2. 并行计算下的结果获取
|
||
|
||
假设我们存在 10w 的数据,现在要将其分为 10 个线程处理,每一个线程处理 1w 的数据写入数据库,当数据全部写入成功后,返回写入成功;当数据某一批写入失败,需要返回哪一个区间写入失败。
|
||
|
||
public class ThreadDbTest {
|
||
private final static AtomicInteger IDX = new AtomicInteger(0);
|
||
|
||
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());
|
||
|
||
|
||
public static void main(String[] args) {
|
||
|
||
List<Future<String>> futures = new ArrayList<>();
|
||
|
||
Future<String> submit1 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(1, 10000, true));
|
||
Future<String> submit2 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(10001, 20000, true));
|
||
Future<String> submit3 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(20001, 30000, true));
|
||
Future<String> submit4 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(30001, 40000, true));
|
||
Future<String> submit5 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(40001, 50000, false));
|
||
Future<String> submit6 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(50001, 60000, true));
|
||
Future<String> submit7 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(70001, 80000, true));
|
||
Future<String> submit8 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(80001, 90000, true));
|
||
Future<String> submit9 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(90001, 100000, true));
|
||
|
||
futures.add(submit1);
|
||
futures.add(submit2);
|
||
futures.add(submit3);
|
||
futures.add(submit4);
|
||
futures.add(submit5);
|
||
futures.add(submit6);
|
||
futures.add(submit7);
|
||
futures.add(submit8);
|
||
futures.add(submit9);
|
||
|
||
for (Future<String> future : futures) {
|
||
try {
|
||
System.out.println(future.get());
|
||
} catch (InterruptedException e) {
|
||
e.printStackTrace();
|
||
} catch (ExecutionException e) {
|
||
e.printStackTrace();
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
public static class BatchWriteDbTask implements Callable<String> {
|
||
|
||
|
||
private final Integer minIndex;
|
||
private final Integer maxIndex;
|
||
|
||
/**
|
||
* 模拟使用, 当为true的时候就写入成功 当为false就写入失败
|
||
*/
|
||
private final boolean isSuccess;
|
||
|
||
public BatchWriteDbTask(Integer minIndex, Integer maxIndex, boolean isSuccess) {
|
||
this.minIndex = minIndex;
|
||
this.maxIndex = maxIndex;
|
||
this.isSuccess = isSuccess;
|
||
}
|
||
|
||
|
||
@Override
|
||
public String call() throws Exception {
|
||
System.out.println("开始批量写入数据 " + minIndex + "至" + maxIndex);
|
||
if(!isSuccess) {
|
||
throw new Exception("数据 " + minIndex + "至" + maxIndex + "写入失败,请手动处理。");
|
||
}
|
||
Thread.sleep(5000);
|
||
return "数据" + minIndex + "至" + maxIndex + "写入成功";
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
我们查看最终的运行结果:
|
||
|
||
开始批量写入数据 1至10000
|
||
开始批量写入数据 20001至30000
|
||
开始批量写入数据 10001至20000
|
||
开始批量写入数据 30001至40000
|
||
开始批量写入数据 40001至50000
|
||
开始批量写入数据 50001至60000
|
||
开始批量写入数据 70001至80000
|
||
开始批量写入数据 80001至90000
|
||
开始批量写入数据 90001至100000
|
||
数据1至10000写入成功
|
||
数据10001至20000写入成功
|
||
数据20001至30000写入成功
|
||
数据30001至40000写入成功
|
||
数据50001至60000写入成功
|
||
数据70001至80000写入成功
|
||
数据80001至90000写入成功
|
||
数据90001至100000写入成功
|
||
java.util.concurrent.ExecutionException: java.lang.Exception: 数据 40001至50000写入失败,请手动处理。
|
||
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
|
||
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
|
||
at com.eight.ThreadDbTest.main(ThreadDbTest.java:44)
|
||
Caused by: java.lang.Exception: 数据 40001至50000写入失败,请手动处理。
|
||
at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:76)
|
||
at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:54)
|
||
|
||
|
||
从最终的运行结果中可以看到,我们程序中预设的 “40001 至 50000 写入失败”是被成功捕获异常并返回的。
|
||
|
||
因为是异步执行,所以程序返回 Future 的时候可能程序并未开始执行或者正在执行中,为了获取最终的计算结果,程序的整体我们使用了一个集合来存储 Future 结果集,然后任务全部提交后遍历这个集合,使用 get 方法来获取真正的执行结果。当任务执行完毕后,get 方法会停止阻塞返回运行结果;当程序运行出错的时候,此时 get 方法会抛出最终的异常信息以供检测使用。
|
||
|
||
四、总结
|
||
|
||
本节课我们充分介绍了如何使用 Future 来进行获取线程的数据结果,包括对于 API 的介绍以及使用,我们还使用了几个例子使你加深印象,在异步计算中 Future 能够大大加快计算速度。Dubbo 就是使用 Future 来异步获取 API 的结果以及控制超时等能力的。
|
||
|
||
|
||
|
||
|