SpringBoot2 | 第二十四篇(二):@Async与异步线程池、异步异常处理

本片文章对异步线程池、异步异常处理做了个Demo

[TOC]

@Async

​ 在Spring中,基于@Async 标注的方法,称之为 异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。在Spring Boot中,我们只需要通过使用@Async注解就能简单的将原来的同步函数变为异步函数。

异步方法分为两种:

  1. 返回值为void
  2. 返回值为Future

对应的异常处理操作:

  1. void -> 异步异常处理类 AsyncUncaughtExceptionHandler 来捕获并处理
  2. Future -> 在异步方法中 try...catch...(处理) ,然后将异常信息封装到 AsyncResult 对象中返回

环境/版本一览:

  • 开发工具:Intellij IDEA 2018.2.2
  • springboot: 2.0.6.RELEASE
  • jdk:1.8.0_171
  • maven:3.3.9

1、pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
</dependencies>

2、config

​ 异步配置类,实现接口 AsyncConfigurer 。配置类上加@EnableAsync 开启异步功能。配置类中配置两部分内容:异步线程池(组件)和异步异常处理类(组件)。

​ 当我们使用@Async时,SpringBoot如果判断出容器中有AsyncConfigurer类型的配置组件,它就会在该组件中是否有Executor类型的线程池组件,如果没有,则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.fatal.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 异步配置类
* 配置异常线程池,异步异常处理类
* 当我们使用@Async时,SpringBoot如果判断出容器中有`AsyncConfigurer`类型的配置组件,
* 它就会判断,在该组件中是否有`Executor`类型的线程池组件,如果没有,则会报错。
* @author: Fatal
* @date: 2018/10/31 0031 11:22
*/
@Slf4j
@EnableAsync // 开启异步功能
@Configuration
public class MyAsyncConfigurer implements AsyncConfigurer {

@Bean /** 创建一个该类型的组件放到Spring容器中 */
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/** 核心线程数10:线程池创建时候初始化的线程数 */
executor.setCorePoolSize(10);
/** 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程 */
executor.setMaxPoolSize(20);
/** 用来缓冲执行任务的队列 */
executor.setQueueCapacity(200);
/** 允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁 */
executor.setKeepAliveSeconds(60);
/** 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池 */
executor.setThreadNamePrefix("taskExecutor-");
/**
* 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,
* 该策略会直接在 execute 方法调用的线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}

/**
* 自定义异步方法返回类型为void的异常处理类
* @Desc: 返回值为void的异步方法出现异常,就会被这个处理类捕获
*/
class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param : objects) {
log.info("Parameter value - " + param);
}
}

}
}

3、component

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.fatal.component;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.Random;
import java.util.concurrent.Future;

/**
* 测试异步注解@Async的组件
* @author: Fatal
* @date: 2018/10/31 0031 9:47
*/
@Slf4j
@Component
public class Task {

@Async
public Future<String> doTaskOne() throws Exception {
log.info("[----------开始执行任务]");
long start = System.currentTimeMillis();
Thread.sleep(new Random().nextInt(10000));
long end = System.currentTimeMillis();
log.info("[----------执行任务完成] 耗时:" + (end-start) + "毫秒");
return new AsyncResult<>("----------完成任务");
}

@Async
public Future<String> doTaskTwo() throws Exception {
log.info("[++++++++++开始执行任务]");
long start = System.currentTimeMillis();
Thread.sleep(new Random().nextInt(10000));
long end = System.currentTimeMillis();
log.info("[++++++++++执行任务完成] 耗时:" + (end-start) + "毫秒");
return new AsyncResult<>("++++++++++完成任务");
}

@Async
public Future<String> doTaskThree() throws Exception {
log.info("[**********开始执行任务]");
long start = System.currentTimeMillis();
Thread.sleep(new Random().nextInt(10000));
long end = System.currentTimeMillis();
log.info("[**********执行任务完成] 耗时:" + (end-start) + "毫秒");
return new AsyncResult<>("**********完成任务");
}

/**
* 测试异常处理类捕获异步方法抛出的异常
*/
@Async
public void asyncReturnVoidWithException(String param) {
log.info("[测试异步方法(void)中处理异常,parameter={}]", param);
throw new RuntimeException(param);
}


@Async
public Future<String> asyncReturnFutureWithException(String param) {
log.info("[测试异步方法(Future<String>)中处理异常,parameter={}]", param);
Future<String> future;
try {
// Thread.sleep(10000); // 测试Future的get(x,y)方法
throw new RuntimeException(param);
} catch (Exception e) {
log.info("success to catch: [{}]", param);
future = new AsyncResult<String>(param);
}
return future;
}
}

4、Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.fatal;

import com.fatal.component.Task;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class Chapter242ApplicationTests {

@Autowired
private Task task;

/**
* 测试使用自定义异步线程池
*/
@Test
public void testPool() throws Exception {

Future<String> taskOne = task.doTaskOne();
Future<String> taskTwo = task.doTaskTwo();
Future<String> taskThree = task.doTaskThree();

long start = System.currentTimeMillis();
while (true) {
// 判断异步方法是否执行完成
if (taskOne.isDone() && taskTwo.isDone() && taskThree.isDone()) {
break;
}
}

long end = System.currentTimeMillis();
log.info("任务全部完成,总耗时:{} 毫秒", (end - start));
}


/**
* 测试异常处理类
*/
@Test
public void testAsyncExceptionReturnVoid() {
task.asyncReturnVoidWithException("testAsyncExceptionReturnVoid");
}

/**
* 测试Future<String>处理异常
*/
@Test
public void testAsyncExceptionReturnFuture() throws InterruptedException,
ExecutionException, TimeoutException {
Future<String> future = task.asyncReturnFutureWithException("testAsyncExceptionReturnFuture");
/**
* V get(long timeout, TimeUnit unit)
* timeout: 值
* unit: 单位
* 表示超过此时间会抛出超时异常,该线程就被释放会线程池,从而防止线程阻塞
*/
log.info("[在返回类型为Future<String>异步方法里捕获的异常] {}", future.get(5, TimeUnit.SECONDS));
}

}

5、显示

  1. 访问 Chapter242ApplicationTests.testPool() 显示

1540985504829

  1. 访问 Chapter242ApplicationTests.testAsyncExceptionReturnVoid() 显示

1540985626587

  1. 访问 Chapter242ApplicationTests.testAsyncExceptionReturnFuture() 显示

1540985751141

打开 Task.asyncReturnFutureWithExceptionThread.sleep(10000);注释

1540988965666

笔记

异步线程池的配置

上面我们通过使用ThreadPoolTaskExecutor创建了一个线程池,同时设置了以下这些参数:

  • corePoolSize 核心线程数10:线程池创建时候初始化的线程数
  • maxPoolSize 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • queueCapacity 缓冲队列200:用来缓冲执行任务的队列
  • keepAliveSeconds 允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  • threadNamePrefix 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  • rejectedExecutionHandler 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

Future

什么是Future类型?

Future 于具体的Runnable或者Callable任务的 执行结果进行取消是否被取消查询是否完成获取结果的接口。必要时可以通过 get() 方法获取执行结果,该方法会 阻塞 直到任务返回结果。

它的接口定义如下:

1
2
3
4
5
6
7
8
9
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

方法解析:

  • cancel 方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。

    参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务。

    (true 表允许,false 表不允许)

    如果 任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false;即如果取消已经完成的任务一定取消失败;

    如果 任务正在执行,若mayInterruptIfRunning 设置为 true,则返回 true(取消成功),若 mayInterruptIfRunning 设置为 false,则返回false(取消失败);

    如果 任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true;即如果取消已经完成的任务一定取消成功。

  • isCancelled 方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  • isDone 方法表示任务是否已经完成,若任务完成,则返回 true;

  • get() 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

  • get(long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接抛出 TimeoutException ,或者其他异常(具体看源码)。

也就是说Future提供了三种功能:

  1. 判断任务是否完成;
  2. 能够中断任务;
  3. 能够获取任务执行结果。

参考资料

Spring Boot系列二 Spring @Async异步线程池用法总结

Spring Boot使用@Async实现异步调用:自定义线程池

总结

步骤:

  1. 配置类(加@EnableAsync)配置 异步线程池异步异常处理类
  2. 在指定方法上加 @Async 即可

SpringBoot的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn方志朋的专栏程序猿DD纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~

以上文章是我用来学习的Demo,都是基于 SpringBoot2.x 版本。

源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter/24_2

学习 翟永超hry2015 前辈的经验