Spring 的 ThreadPoolTaskExecutor 对 JDK 的 ThreadPoolExecutor 做了很多封装,使用较为简单。
一个显著区别是,Spring 的 ThreadPoolTaskExecutor 不能 shutdown,也不能 awaitTermination. 这是因为 ThreadPoolTaskExecutor 是公用的。
Spring 中配置线程池 Bean 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Configuration
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor kaproIOIntensiveExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(32);
executor.setQueueCapacity(100000);
executor.setThreadNamePrefix("myexecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
|
也可以暴露 JDK 的 ExecutorService:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/**
* For an alternative, you may set up a ThreadPoolExecutor instance directly
* using constructor injection, or use a factory method definition that points to the
* java.util.concurrent.Executors class.
* This is strongly recommended in particular for common @Bean methods in
* configuration classes, where this FactoryBean variant would force you to
* return the FactoryBean type instead of the actual Executor type.
*/
@Bean
public FactoryBean<ExecutorService> nativeExecutorService() {
ThreadPoolExecutorFactoryBean nativeExecutor = new ThreadPoolExecutorFactoryBean();
// ...
return nativeExecutor;
}
|
Spring 的 AsyncResult 是可以转换为 CompletableFuture 的:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Async("kaproIOIntensiveExecutor")
public CompletableFuture<SquareResult> squareAsync(int i) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// 不成功的任务使用日志记录,或使用 Optional
return null;
}
SquareResult result = new SquareResult(i, i * i);
System.out.println(Thread.currentThread().getName() + ", ---" + result);
return new AsyncResult(result).completable(); // 转换为 CompletableFuture
}
|
一个将 List 转换为 CompletableFuture 的方法:
1
2
3
4
5
6
7
|
static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.<T>toList()));
}
|