Spring中的异步任务-Async注解的深度解析

一、Spring异步任务基础

1.1 @Async注解简介

@Async是Spring框架提供的一个注解,用于标记方法应该异步执行。当一个方法被标记为@Async时,调用不会阻塞调用者线程,而是在另一个线程中异步执行。

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
    String value() default "";
}

1.2 启用异步支持

要使用@Async功能,需要在配置类上添加@EnableAsync注解:

@Configuration
@EnableAsync
public class AppConfig {
    // 配置
}

1.3、基本使用示例

1.3.1 无返回值异步方法

@Service
public class NotificationService {

    private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);

    @Async  // 使用默认线程池
    public void sendNotification(String message) {
        logger.info("开始发送通知: " + message);
        try {
            // 模拟耗时操作
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        logger.info("通知发送完成: " + message);
    }

    @Async("taskExecutor")  // 使用指定线程池
    public void sendNotificationWithExecutor(String message) {
        logger.info("使用指定线程池发送通知: " + message);
        // 业务逻辑...
    }
}

1.3.2 有返回值异步方法

@Service
public class CalculationService {

    @Async
    public Future<Integer> calculateSum(int a, int b) {
        try {
            // 模拟耗时计算
            Thread.sleep(2000);
            return new AsyncResult<>(a + b);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return new AsyncResult<>(0);
        }
    }

    @Async
    public CompletableFuture<String> fetchData() {
        // 模拟从远程获取数据
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                return "Remote Data";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

1.4、异常处理示例

1.4.1 自定义异常处理器

public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    private static final Logger logger = LoggerFactory.getLogger(CustomAsyncExceptionHandler.class);

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("异步方法执行出错 - 方法名: " + method.getName(), ex);
        logger.error("参数: " + Arrays.toString(params));

        // 这里可以添加自定义处理逻辑,如发送警报等
    }
}

1.4.2 配置异常处理器

更新之前的AsyncConfig

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

1.4.3 抛出异常的异步方法

@Service
public class ProblematicService {

    @Async
    public void doSomethingProblematic() {
        throw new RuntimeException("故意抛出的异常");
    }
}

二、启动过程分析

2.1 @EnableAsync的作用

@EnableAsync注解引入了异步支持的核心配置:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
    // 配置选项
}

AsyncConfigurationSelector根据代理模式选择不同的配置类:

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    @Override
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }
}

2.2 代理配置 - ProxyAsyncConfiguration

ProxyAsyncConfiguration负责注册必要的bean, 其中AsyncAnnotationBeanPostProcessor是整个流程的起点

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.state(this.enableAsync != null, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}

2.3 AsyncAnnotationBeanPostProcessor

这是实现异步功能的核心后处理器:

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);

		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}
}

三、异步切面 - AsyncAnnotationAdvisor

AsyncAnnotationAdvisor负责创建拦截异步方法的切面, 他是Spring AOP 的 Advisor 实现,专门用于处理带有 @Async 注解的方法。它主要负责:

  1. 识别切入点:确定哪些方法需要被异步执行

  2. 提供拦截逻辑:定义如何拦截和异步执行目标方法

  3. 协调执行器:管理与异步执行相关的线程池资源

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
	public AsyncAnnotationAdvisor(
			@Nullable Supplier<? extends @Nullable Executor> executor, @Nullable Supplier<? extends @Nullable AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = CollectionUtils.newLinkedHashSet(2);
		asyncAnnotationTypes.add(Async.class);

		ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If EJB API not present, simply ignore.
		}
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If Jakarta Concurrent API not present, simply ignore.
		}

		// 构建切面通知
		this.advice = buildAdvice(executor, exceptionHandler);
		// 构建切入点
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}
}

四、方法拦截 - AnnotationAsyncExecutionInterceptor

当调用@Async方法时,实际执行的是拦截器的逻辑。它是异步任务执行的核心拦截器,其作用可简明概括为以下三点:

  1. 方法拦截与异步调度

    • 拦截所有被 @Async 注解标记的方法调用

    • 将同步方法调用转为异步执行(通过线程池提交任务)

  2. 执行资源管理

    • 解析方法指定的执行器(支持 @Async("executorName") 形式)

    • 自动选择默认执行器(当未指定时)

  3. 结果与异常处理

    • 包装返回结果为 Future/CompletableFuture

    • 处理未捕获异常(通过配置的 AsyncUncaughtExceptionHandler

public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        // 确定执行器
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        // 创建可调用任务
        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };

        // 提交任务执行
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }
}

五、任务执行流程

5.1 确定执行器

determineAsyncExecutor方法确定使用哪个执行器:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
        Executor targetExecutor;
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        }
        else {
            targetExecutor = this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    return executor;
}

5.2 提交任务

doSubmit方法根据返回类型决定如何提交任务:

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            }
            catch (Throwable ex) {
                throw new CompletionException(ex);
            }
        }, executor);
    }
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    }
    else {
        executor.submit(task);
        return null;
    }
}

六、默认执行器配置

如果没有指定执行器,Spring会尝试查找默认的:

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    if (beanFactory != null) {
        try {
            return beanFactory.getBean(TaskExecutor.class);
        }
        catch (NoUniqueBeanDefinitionException ex) {
            // 多个执行器时尝试获取名为"taskExecutor"的bean
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                // 没有找到指定名称的bean,继续
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
		        try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                // 没有找到指定名称的bean,继续
            }
        }
    }
    return null;
}

如果默认连接池也为null,则会在AnnotationAsyncExecutionInterceptor的invoke方法内抛出异常

		AsyncTaskExecutor executor = determineAsyncExecutor(userMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}

七、异常处理

异步方法的异常处理由AsyncUncaughtExceptionHandler处理:

public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        if (logger.isErrorEnabled()) {
            logger.error("Unexpected exception occurred invoking async method: " + method, ex);
        }
    }
}

八、完整调用流程总结

  1. 启动阶段

    • @EnableAsync导入配置

    • ProxyAsyncConfiguration注册AsyncAnnotationBeanPostProcessor

    • AsyncAnnotationBeanPostProcessor创建AsyncAnnotationAdvisor

  2. Bean初始化阶段

    • 对每个bean检查是否有@Async方法

    • 为有@Async方法的bean创建代理

  3. 方法调用阶段

    • 代理拦截@Async方法调用

    • AnnotationAsyncExecutionInterceptor处理调用

    • 确定执行器并提交任务

    • 原始方法在另一个线程中执行

  4. 结果处理阶段

    • 根据返回类型包装结果

    • 处理执行过程中的异常

九、最佳实践与注意事项

  1. 执行器配置

    @Configuration
    @EnableAsync
    public class AsyncConfig implements AsyncConfigurer {
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(50);
            executor.setQueueCapacity(100);
            executor.setThreadNamePrefix("AsyncExecutor-");
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new CustomAsyncExceptionHandler();
        }
    }
    
    
  2. 返回值使用

    • 返回FutureCompletableFuture可以获取异步结果

    • 无返回值方法适合"fire and forget"场景

  3. 限制与注意事项

    • @Async方法必须定义在Spring管理的bean中

    • 自调用(同一个类中方法调用)不会触发异步

    • 注意线程池配置,避免资源耗尽

结论

Spring框架通过@Async注解提供了简洁而强大的异步执行能力。从启动时的自动配置,到运行时的动态代理和任务调度,Spring构建了一个完整的异步执行生态系统。理解这一完整流程有助于开发者更有效地使用异步功能,并在出现问题时能够快速定位和解决。