1. ThreadPoolTaskExecutor
- 스레드 풀을 사용하는 Executor
- java.util.concurrent.Executor를 Spring에서 구현한 것 이다.
- org.springframework.scheduling.concurrent 패키지에서 제공
- 주로 spring에서 비동기처리를 위해 사용
- 스레드풀을 사용하여 멀티스레드 구현을 손쉽게 해준다.
- Default 생성자 하나만 존재
2. Configuration
2 - 1) Pool size configuration
@Bean
public ThreadPoolTaskExecutor shineTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(8); // 최대 스레드 수
return taskExecutor;
}
- core와 max 사이즈를 설정할 수 있다.
- 최초 core사이즈만큼 동작하다가 더 이상 처리할 수 없을 경우 max사이즈만큼 스레드가 증가할 것이라 일반적으로 생각하지만 사실 그렇지 않다.
- 내부적으로는 Integer.MAX_VALUE사이즈의 LinkedBlockingQueue를 생성해서 core사이즈만큼의 스레드에서 task를 처리할 수 없을 경우 queue에서 대기하게 됩니다. queue가 꽉 차게 되면 그때 max 사이즈만큼 스레드를 생성해서 처리하게 된다.
2 - 2) Capacity configuration
@Bean
public ThreadPoolTaskExecutor shineTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(8); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
return taskExecutor;
}
- core 사이즈 보다 많은 요청이 발생할 경우 Integer.MAX_VALUE(약 21억) 사이즈만큼의 queue의 용량이 너무 크다고 생각된다면 queueCapacity사이즈를 변경할 수 있다.
- 위의 예시 코드와 같이 설정한다면 최초 5개의 스레드에서 처리하다가 처리 속도가 밀릴 경우 100개 사이즈 queue에서 대기하고 그보다 많은 요청이 들어올 경우 최대 8개의 스레드까지 생성해서 처리하게 된다.
2 - 3) RejectedExecutionHandler configuration
- max 스레드까지 생성하고 queue까지 꽉 찬 상태에서 추가 요청이 오면 RejectedExecutionException 예외가 발생합니다. 더 이상 처리할 수 없다는 오류이다.
- 오류가 발생하고 있는데 그냥 지켜봐야만 하는게 아니라 몇 가지 예외 정책을 설정해줘야합니다.
- 기본적으로 RejectedExecutionHandler 인터페이스를 구현한 몇 가지 예외 처리 class가 존재한다.
- AbortPolicy
- 기본 설정(Default)
- RejectedExecutionException을 발생시킨다
- DiscardOldestPolicy
- 오래된 작업을 skip
- 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용
- DiscardPolicy
- 처리하려는 작업을 skip
- 역시 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용
- CallerRunsPolicy
- shutdown 상태가 아니라면 ThreadPoolTaskExecutor에 요청한 thread에서 직접 처리
- 예외와 누락 없이 최대한 처리하려면 CallerRunsPolicy로 설정하는 것이 좋음
- AbortPolicy
@Bean
public ThreadPoolTaskExecutor shineTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(8); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return taskExecutor;
}
2 - 4) Shutdown configuration
위와 같이 정의한 스레드 풀에서 열심히 작업이 이루어이고 있을 때 애플리케이션 종료를 요청하면 어떻게 될까?
Spring Boot Actuator를 이용해서 종료를 시켜보면 호출 즉시 애플리케이션이 바로 종료 되는 것을 확인할 수 있다.
POST http://localhost:8888/actuator/shutdown
이렇게 즉시 종료되면 아직 처리되지 못한 task는 유실된다.
유실 없이 마지막까지 다 처리하고 종료되길 원한다면 다음과 같이 설정을 추가해야 합니다.
waitForTasksToCompleteOnShutdown을 true로 하게 되면 queue에 남아 있는 모든 작업이 완료될 때 까지 기다리게 된다.
@Bean
public ThreadPoolTaskExecutor shineTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(8); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return taskExecutor;
}
2 - 5) Timeout configuration
만약 모든 작업이 처리되길 기다리기 힘든 경우라면 setAwaitTerminationSeconds메서드를 통해 최대 종료 대기 시간을 설정할 수 있다.
@Bean
public ThreadPoolTaskExecutor shineTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(8); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(30); // shutdown 최대 30초 대기
return taskExecutor;
}
2 - 6) 기타 Configuration
setAllowCoreThreadTimeout | 코어 스레드의 타임아웃을 허용할 것인지에 대한 설정 메서드. true로 설정할 경우 코어 스레드를 10으로 설정했어도 일정시간(keepAliveSeconds)이 지나면 코어 스레드 개수가 줄어듦 |
false |
setkeepAliveSeconds | 코어 스레드 타임아웃을 허용했을 경우 사용되는 설정값으로, 여기 설정된 식나이 지날 때까지 코어 스레드 풀의 스레드가 사용되지 않을 경우 해당 스레드는 terminate된다. | 60초 |
3. 예제 코드
3 - 1 : Autowired로 ThreadPoolTaskExecutor 사용하기
package com.example.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
public class TestService {
private ThreadPoolTaskExecutor executor;
public TestService(ThreadPoolTaskExecutor shineTaskExecutor) {
this.executor = shineTaskExecutor;
}
public void executeThreads() {
System.out.println("executing threads");
for(int i=0;i<10;i++) {
executor.execute(() -> {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName());
}
);
}
}
}
execute메소드에 람다 식으로 Runnable을 전달하였다.
간단하게 자신의 Thread이름을 출력하고 끝난다.
3 - 2 : @Async로 ThreadPoolTaskExecutor 사용하기
우선 @EnableAsync의 적용방법은 다음과 같다.
3-2-1) AsyncConfigurer 인터페이스 활용하기
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(5);
executor.setKeepAliveSeconds(20);
executor.setThreadNamePrefix("test-executor-");
executor.initialize();
return executor;
}
}
AsyncConfigurer 인터페이스를 구현하여 별도의 TaskExecutor를 설정해줄 수 있다.
3-2-1) @Bean 활용하기
1) ThreadPoolTaskExecutor bean 설정 클래스에 @EnableAsync도 추가해야 함
@Configuration
@EnableAsync
public class TaskExecutorTestConfig {
@Bean
public Executor asyncTestExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setThreadNamePrefix("test1-");
executor.initialize();
return executor;
}
}
2) 서비스의 메서드에 @Async(“스레드풀네임”)어노테이션 추가
package com.example.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
public class TestService2 {
@Async("shineTaskExecutor")
public void executeThreads() {
System.out.println("executing threads");
try {
Thread.sleep(3000);
System.out.println("[TestService2]" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
정리
1) @Configuration으로 등록한 클래스에 executor @Bean추가 (@Async를 이용할 경우 @EnableSync도 추가)
2) @Autowired, @Qualifier로 주입하여 사용하거나 또는 메서드 레벨에 @Async를 붙여 사용
3 - 3 : @Async 어노테이션 사용시 주의사항
3-3-1) 기본으로 사용되는 SimpleAsyncTaskExecutor
별도의 TaskExecutor를 직전의 코드처럼, 또는 다른 방식으로 설정을 해주지 않으면, SimpleAsyncTaskExecutor가 default로 적용된다.
SimpleAsyncTaskExecutor는 미리 스레드 풀을 생성하는 방식으로 사용하지 않고, 매 요청마다 새로운 스레드를 생성해 작업을 수행하는 방식이기 때문에, 테스트 환경용이거나 로컬에서 연습하는 거 아닌이상 현실적으로 사용하기는 어려워 보인다.
3-3-2) Async is not a silver bullet(@Async는 은환탄이 아니다)
- private 메서드에는 적용이 안된다. public만 된다.
- self-invocation(자가 호출)해서는 안된다. -> 같은 클래스 내부의 메서드를 호출하는 것은 안된다
다음 예시를 먼저 살펴보자!
예시
▶ TestController.java
@RestController
public class TestController {
@Autowired
private TestService testService;
@GetMapping("/test2")
public void test2() {
for(int i=0;i<10000;i++) {
testService.innerMethodCall(i);
}
}
}
▶ TestService.java
@Service
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class);
@Async
public void innerMethod(int i) {
logger.info("async i = " + i);
}
public void innerMethodCall(int i) {
innerMethod(i);
}
}
위 코드로 테스트 해보면 controller에서 testService.innerMethodCall()를 동기로 호출하지만 내부에서 하는 작업이 비동기 @Async가 걸린 innerMethod를 호출하니까 결국에는 비동기로 로그가 찍힐 것을 예상할 수 있다.
하지만 틀렸다. 아래 처럼 하나의 스레드로 동기 처리됨을 볼 수 있다.
3 - 4) 위와 같은 결과가 발생하는 이유
https://dzone.com/articles/effective-advice-on-spring-async-part-1
위의 출처에서 제대로 설명해준다.
이는 AOP가 적용되어 spring context에 등록되어 있는 빈 객체의 메서드가 호출되었을 때 스프링이 끼어들 수 있고
@Async가 적용되어 있따면 스프링이 메서드를 가로채서 다른 스레드(풀)에서 실행시켜주는 메커니즘이라는 것이다.
직전의 제약조건이 왜 필요한지 다시 생각해보자.
public이어야 가로챈 스프링의 다른 클래스에서 호출이 가능할 것이고,
self-invocation이 불가능 했던 이유도 spring context에 등록된 메소드 호출이어야 프록시를 적용받을 수 있기 때문이다.
내부 메서드 호출은 프록시 영향을 받지 않게된다.
3 - 5) 해결책
@Service
public class AsyncService {
@Async
public void run(Runnable runnable) {
runnable.run();
}
}
위의 코드와 같이 AsyncService를 하나 두고 해당 서비스는 유틸 클래스처럼 전역에서 사용하도록 두는 것이다.
@Async메서드 run을 통해 들어오는 Runnable을 그냥 실행만 해주는 메서드다.
@Service
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class);
@Autowired
private AsyncService asyncService;
public void innerMethod(int i) {
logger.info("async i = " + i);
}
public void innerMethodCall(int i) {
asyncService.run(()->innerMethod(i));
}
}
그 다음에 비동기 메서드 호출이 필요할 때 해당 서비스로 메서드를 호출해버리는 것이다.
저렇게 하니깐 결과도 비동기로 처리하는 모습을 확인할 수 있다.
위와 같은 해결책은 service의 메서드는 동기로 호출되길 원하지만 내부에서 하는 기능(동작)에서 일부만 비동기로 실행되기를 원할때 사용하면 좋다
참고한 블로그 작성자님의 생각으로는 차라리 CompletableFuture를 쓰되 해당 스레드 풀에서 실행되기를 바라면 아래와 같이 Executor를 주입받고 호출하는 것이 나을 것 같다고 한다.
@Service public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class);
@Autowired
private Executor executor;
public void innerMethod(int i) {
logger.info("async i = " + i);
}
public void innerMethodCall(int i) {
CompletableFuture.runAsync(()->innerMethod(i),executor);
}
}
위 코드를 실행해도 executor로 등록한 스레드풀이 주입되어 해당 풀에서 작업들이 수행된다.
4. ThreadPoolTaskExecutor의 동작 방식
마지막으로, ThreadPoolTaskExecutor가 어떻게 동작하는지 흐름정도를 살펴보자.
- 스레드풀에 작업(task)를 등록하면, 스레드풀에 corePoolSize 만큼의 스레드가 존재하는지 확인한다.
- 스레드에 작업을 할당한다.
- 스레드풀의 스레드 개수가 corePoolSize보다 적으면, 스레드풀에 새로운 스레드를 생성하고 작업을 할당한다. (corePoolSize는 스레드풀에 항상 유지되어야하는 스레드의 최소 수로, 대기중인 기존 스레드가 존재해도 새롭게 생성한다.)
- 스레드풀의 스래드 개수가 corePoolSize보다 크면, 스레드풀의 대기 상태 스레드에게 작업을 할당한다.
- 스레드풀에 존재하는 모든 스레드가 작업중이면(대기중인 스레드가 없으면) BlockingQueue에 작업을 넣어 작업을 대기시킨다.
- BlockingQueue가 가득 찬 경우, 현재 스레드풀의 스레드 수가 maxPoolSize를 넘지 않으면 새로운 스레드를 생성하여 작업을 할당한다.
- 스레드 풀의 스레드 수가 maxPoolSize에 도달한 상태에서 새로운 요청이 들어오면, 더 이상 스레드를 생성할 수 없고 큐에도 대기시킬 수 없다. → TaskRejectedException이 발생한다.
- 작업중인 스레드가 작업을 마치면, BlockingQueue에 대기중인 작업이 있는지 확인한다.
- 대기중인 작업이 있으면, 해당 작업을 가져와 다시 작업을 수행한다.
- 대기중인 작업이 없으면, 해당 스레드는 대기 상태로 돌아간다. 만약, 스레드풀의 스레드 개수가 corePoolSize보다 크면 keepAliveTime이 지나고 해당 스레드는 스레드풀에서 제거된다.
5. 출처
https://kapentaz.github.io/spring/Spring-ThreadPoolTaskExecutor-%EC%84%A4%EC%A0%95/#
https://jeonyoungho.github.io/posts/ThreadPoolTaskExecutor/
https://jeong-pro.tistory.com/187
'BackEnd > Spring' 카테고리의 다른 글
[Spring] Spring Security에서 @WithMockUser를 커스텀하기 (0) | 2023.01.02 |
---|---|
[Spring] SpringSecurity에서 @AuthenticationPrincipal 대신 DTO로 받기 (0) | 2023.01.02 |
[Spring] Feign Client 적용기 (0) | 2022.12.20 |
[Spring] @Configuration 이란? (0) | 2022.09.12 |
[Spring] @Profile과 @ActiveProfiles 를 통한 활성 프로파일(Profile)의 관리 (0) | 2022.08.23 |
댓글