Spring中的异步任务-Async注解的深度解析

一、Spring异步任务基础
1.1 @Async注解简介
@Async
是Spring框架提供的一个注解,用于标记方法应该异步执行。当一个方法被标记为@Async
时,调用不会阻塞调用者线程,而是在另一个线程中异步执行。
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
String value() default "";
}
1.2 启用异步支持
要使用@Async
功能,需要在配置类上添加@EnableAsync
注解:
@Configuration
@EnableAsync
public class AppConfig {
// 配置
}
1.3、基本使用示例
1.3.1 无返回值异步方法
@Service
public class NotificationService {
private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);
@Async // 使用默认线程池
public void sendNotification(String message) {
logger.info("开始发送通知: " + message);
try {
// 模拟耗时操作
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.info("通知发送完成: " + message);
}
@Async("taskExecutor") // 使用指定线程池
public void sendNotificationWithExecutor(String message) {
logger.info("使用指定线程池发送通知: " + message);
// 业务逻辑...
}
}
1.3.2 有返回值异步方法
@Service
public class CalculationService {
@Async
public Future<Integer> calculateSum(int a, int b) {
try {
// 模拟耗时计算
Thread.sleep(2000);
return new AsyncResult<>(a + b);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return new AsyncResult<>(0);
}
}
@Async
public CompletableFuture<String> fetchData() {
// 模拟从远程获取数据
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "Remote Data";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
1.4、异常处理示例
1.4.1 自定义异常处理器
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomAsyncExceptionHandler.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("异步方法执行出错 - 方法名: " + method.getName(), ex);
logger.error("参数: " + Arrays.toString(params));
// 这里可以添加自定义处理逻辑,如发送警报等
}
}
1.4.2 配置异常处理器
更新之前的AsyncConfig
:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
1.4.3 抛出异常的异步方法
@Service
public class ProblematicService {
@Async
public void doSomethingProblematic() {
throw new RuntimeException("故意抛出的异常");
}
}
二、启动过程分析
2.1 @EnableAsync的作用
@EnableAsync
注解引入了异步支持的核心配置:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
// 配置选项
}
AsyncConfigurationSelector
根据代理模式选择不同的配置类:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
@Override
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
2.2 代理配置 - ProxyAsyncConfiguration
ProxyAsyncConfiguration
负责注册必要的bean, 其中AsyncAnnotationBeanPostProcessor是整个流程的起点
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.state(this.enableAsync != null, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
2.3 AsyncAnnotationBeanPostProcessor
这是实现异步功能的核心后处理器:
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
三、异步切面 - AsyncAnnotationAdvisor
AsyncAnnotationAdvisor
负责创建拦截异步方法的切面, 他是Spring AOP 的 Advisor 实现,专门用于处理带有 @Async
注解的方法。它主要负责:
识别切入点:确定哪些方法需要被异步执行
提供拦截逻辑:定义如何拦截和异步执行目标方法
协调执行器:管理与异步执行相关的线程池资源
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
public AsyncAnnotationAdvisor(
@Nullable Supplier<? extends @Nullable Executor> executor, @Nullable Supplier<? extends @Nullable AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = CollectionUtils.newLinkedHashSet(2);
asyncAnnotationTypes.add(Async.class);
ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
}
catch (ClassNotFoundException ex) {
// If EJB API not present, simply ignore.
}
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
}
catch (ClassNotFoundException ex) {
// If Jakarta Concurrent API not present, simply ignore.
}
// 构建切面通知
this.advice = buildAdvice(executor, exceptionHandler);
// 构建切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
}
四、方法拦截 - AnnotationAsyncExecutionInterceptor
当调用@Async
方法时,实际执行的是拦截器的逻辑。它是异步任务执行的核心拦截器,其作用可简明概括为以下三点:
方法拦截与异步调度
拦截所有被
@Async
注解标记的方法调用将同步方法调用转为异步执行(通过线程池提交任务)
执行资源管理
解析方法指定的执行器(支持
@Async("executorName")
形式)自动选择默认执行器(当未指定时)
结果与异常处理
包装返回结果为
Future
/CompletableFuture
处理未捕获异常(通过配置的
AsyncUncaughtExceptionHandler
)
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 确定执行器
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 创建可调用任务
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交任务执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
五、任务执行流程
5.1 确定执行器
determineAsyncExecutor
方法确定使用哪个执行器:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
5.2 提交任务
doSubmit
方法根据返回类型决定如何提交任务:
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
六、默认执行器配置
如果没有指定执行器,Spring会尝试查找默认的:
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
// 多个执行器时尝试获取名为"taskExecutor"的bean
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
// 没有找到指定名称的bean,继续
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
// 没有找到指定名称的bean,继续
}
}
}
return null;
}
如果默认连接池也为null,则会在AnnotationAsyncExecutionInterceptor的invoke
方法内抛出异常
AsyncTaskExecutor executor = determineAsyncExecutor(userMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
七、异常处理
异步方法的异常处理由AsyncUncaughtExceptionHandler
处理:
public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
if (logger.isErrorEnabled()) {
logger.error("Unexpected exception occurred invoking async method: " + method, ex);
}
}
}
八、完整调用流程总结
启动阶段:
@EnableAsync
导入配置ProxyAsyncConfiguration
注册AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor
创建AsyncAnnotationAdvisor
Bean初始化阶段:
对每个bean检查是否有
@Async
方法为有
@Async
方法的bean创建代理
方法调用阶段:
代理拦截
@Async
方法调用AnnotationAsyncExecutionInterceptor
处理调用确定执行器并提交任务
原始方法在另一个线程中执行
结果处理阶段:
根据返回类型包装结果
处理执行过程中的异常
九、最佳实践与注意事项
执行器配置:
@Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutor-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new CustomAsyncExceptionHandler(); } }
返回值使用:
返回
Future
或CompletableFuture
可以获取异步结果无返回值方法适合"fire and forget"场景
限制与注意事项:
@Async
方法必须定义在Spring管理的bean中自调用(同一个类中方法调用)不会触发异步
注意线程池配置,避免资源耗尽
结论
Spring框架通过@Async
注解提供了简洁而强大的异步执行能力。从启动时的自动配置,到运行时的动态代理和任务调度,Spring构建了一个完整的异步执行生态系统。理解这一完整流程有助于开发者更有效地使用异步功能,并在出现问题时能够快速定位和解决。