- Spring Boot 환경에서 비동기(Async)를 처리하기 위해 @Async 어노테이션을 사용하여 처리할 수 있습니다. 이때의 각각의 Executor를 통해서 비동기 작업에 대해 설정하고 관리하는 역할을 수행합니다.
- Executor를 사용하면 직접 스레드를 관리하지 않고도 비동기 작업을 실행할 수 있어 코드의 가독성과 유지보수성을 높일 수 있습니다. - Spring Boot에서는 기본적으로 SimpleAsyncTaskExecutor를 사용하여 비동기 작업을 처리합니다. 하지만 성능 최적화나 특정 요구 사항에 따라 커스텀 Executor를 정의할 수 있습니다.
- Executor 인터페이스를 구현하기 위해서 여러 구현체로 사용이 됩니다. - SimpleAsyncTaskExecutor는 Executor를 구현하기 위한 구현체로 Spring Framework에 내장되어 있습니다. - ThreadPoolTaskExecutor, ScheduledThreadPoolExecutor, ForkJoinPool는 Executor를 구현하기 위한 구현체로 java.util.concurrent 패키지 내에 포함이 되어 있습니다.
Executor 구현체 종류
설명
특징
SimpleAsyncTaskExecutor
각 작업을 새로운 스레드에서 실행하며, 스레드 풀을 사용하지 않음
설정이 간단하고, 많은 작업을 짧은 시간 내에 실행하면 자원 부족 가능성이 있음.
ThreadPoolTaskExecutor
스레드 풀을 사용하여 비동기 작업을 효율적으로 처리
스레드 풀의 크기, 큐 용량, 스레드 이름 접두사 등 다양한 설정이 가능하며 과도한 스레드 생성 방지 설정이 가능합니다.
- AsyncConfig 내에서 지정한 simpleAsyncTaskExecutor를 이용하여서 비동기를 수행합니다.
/**
* Spring Boot Async Executor 별 사용 예시 : 구현체
*/@ServicepublicclassAsyncExecutorServiceImplimplementsAsyncExecutorService {
/**
* Config 내에서 지정한 simpleAsyncTaskExecutor를 이용한 비동기 통신
*/@Async("simpleAsyncTaskExecutor")@OverridepublicvoidsimpleAsyncTaskExecutor() {
System.out.println("Execute method asynchronously - " + Thread.currentThread().getName());
}
}
💡 Executor :ThreadPoolTaskExecutor - java.util.concurrent 패키지에 포함된 클래스 중 하나로, 스레드 풀을 사용하여 비동기 작업을 효율적으로 처리하는 Executor를 의미합니다. 이는 다수의 작업을 동시에 실행하면서도 시스템 리소스를 최적화할 수 있는 방법을 제공합니다.
스레드 풀을 사용하여 스레드를 재사용하고, 새로운 작업이 들어올 때마다 기존 스레드를 활용합니다. (* 스레드 생성의 오버헤드를 줄이고, 자원을 효율적으로 사용할 수 있습니다)
설정 가능
스레드 풀의 크기, 큐 용량, 스레드 이름 접두사 등 다양한 설정이 가능합니다.
비동기 작업
비동기 작업을 효율적으로 처리하여 응답성을 높이고, 시스템 리소스를 최적화할 수 있습니다.
성능 최적화
스레드 풀을 사용하여 과도한 스레드 생성으로 인한 성능 저하를 방지합니다.
스레드 풀 관리자
스레드 풀의 상태를 모니터링하고 관리할 수 있습니다. getActiveCount(), getPoolSize()를 통해 활성 스레드나 현재 풀의 스레드 수를 확인할 수 있으며 JMX(자바 관리 확장)을 통해 스레드 풀의 상태를 모니터링하고 필요에 따라 설정을 변경할 수 있습니다.
💡 [참고] 주의 사항
1. 적절한 설정 - 스레드 풀의 크기와 큐 용량을 적절히 설정해야 합니다. 너무 많은 스레드를 생성하거나 큐 용량을 초과하면 성능이 저하될 수 있습니다.
2. 자원 모니터링 -스레드 풀의 상태를 주기적으로 모니터링하여 자원이 적절히 사용되고 있는지 확인해야 합니다.
3. 예외 처리 - 비동기 작업에서 발생하는 예외를 적절히 처리해야 합니다. 그렇지 않으면 스레드 풀이 중단될 수 있습니다.
- EnableAsync를 통해서 비동기 처리를 수행하는 부분에 대한 설정 부분입니다. 여기서는 threadPoolTaskExecutor를 이용하여 threadPoolTaskExecutor의 이름을 지정하여 Executor를 지정하였습니다.
1. setCorePoolSize() 메서드를 통해 최초 생성할 스레드 풀을 지정합니다. 2. setMaxPoolSize() 메서드를 통해 동시에 사용할 수 있는 최대 스레드 풀을 지정합니다. 3. setQueueCapacity() 메서드를 통해 큐에 저장이 될 용량을 지정합니다. 4. setThreadNamePrefix() 메서드를 통해 스레드의 이름의 prefix를 지정합니다.
@Configuration@EnableAsyncpublicclassAsyncConfigimplementsAsyncConfigurer {
@Bean(name = "threadPoolTaskExecutor")public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 스레드 풀이 최소한으로 유지할 스레드 수를 의미합니다.
executor.setMaxPoolSize(2); // 스레드 풀이 동시에 사용할 수 있는 최대 스레드 수를 의미합니다.
executor.setQueueCapacity(5); // 작업 큐의 용량을 500으로 설정합니다. 큐가 가득 차면 새로운 작업은 거부됩니다.
executor.setThreadNamePrefix("Async-"); // 생성되는 스레드의 이름 접두사를 "Async-"로 설정합니다. 이는 디버깅 및 모니터링 시 유용할 수 있습니다.
executor.initialize(); // 설정된 값을 바탕으로 executor를 초기화합니다.return executor;
}
}
4.2. 사용 예제 : AsyncExecutorServiceImpl
💡 사용 예제 : AsyncExecutorServiceImpl
- AsyncConfig 내에서 지정한 threadPoolTaskExecutor를 이용하여서 비동기를 수행합니다. - 스레드 풀을 테스트해보기 위해서 파라미터로 index를 받아서 출력을 합니다. 또한 2초간의 Thread를 중지하는 작업을 수행해 두었습니다.
@Async("threadPoolTaskExecutor")@OverridepublicvoidthreadPoolTaskExecutor(int index) {
System.out.println("Executing task " + index + " - " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 2초 동안 작업을 중지
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
4.3. 사용 예제 : 호출 부
💡 사용 예제 : 호출 부
- for문을 수행하면서 총 10번의 ThreadPoolExecutor를 호출하고 인덱스 값을 전달합니다.
@TestvoidthreadPoolTaskExecutor() {
for (inti=0; i < 10; i++) {
asyncExecutorService.threadPoolTaskExecutor(i);
}
}
- 최대 스레드의 수를 2로 지정하고, 총 10개의 작업을 실행하도록 구성하였습니다. - 큐의 용량이 5이므로 2개의 스레드를 사용중일 때 추가작업들은 큐에 저장이 되고, 큐가 가득 차면 새로운 작업은 거부가 됩니다. - 이러한 설정을 통해 스레드 풀의 모든 스레드가 사용됩니다.
[ 더 알아보기 ]
💡 RejectedExecutionException 예외 발생
- 위의 문제에서는 RejectedExecutionException 예외가 발생하였고, 이는 스레드 풀의 용량이 초과되었음을 나타냅니다.
- 해당 ThreadPoolTaskExecutor는 최대 2개의 스레드를 실행할 수 있도록 설정되었고, 용량은 5개로 설정되어 있습니다. - 하지만 10개의 작업이 동시에 실행되려고 시도되면서, 2개의 스레드가 활성화되고 5개의 작업이 큐에 대기 중일 때 추가 작업이 제출되면 큐 용량이 초과되어 거부됩니다. 그렇기에 적절한 스레드와 큐 용량을 지정해야 합니다.
schedule, scheduleAtFixedRate, scheduleWithFixedDelay 등의 메서드를 제공하여 다양한 스케줄링 옵션을 지원합니다.
예외 처리
작업 실행 중 예외가 발생하면, 해당 예외를 처리하고 다음 작업이 정상적으로 실행되도록 합니다.
💡 [참고] 주의사항 1. 자원 관리 - 스레드 풀의 크기를 적절히 설정하여 자원을 효율적으로 사용해야 합니다. 너무 많은 스레드를 생성하면 시스템 자원이 부족해질 수 있습니다. 2. 예외 처리 - 스케줄링된 작업에서 발생하는 예외를 적절히 처리하여 스레드 풀이 중단되지 않도록 해야 합니다. 3. 스레드 종료 - 애플리케이션 종료 시 shutdown() 또는 shutdownNow() 메서드를 호출하여 스레드 풀을 적절히 종료해야 합니다.
- EnableAsync를 통해서 비동기 처리를 수행하는 부분에 대한 설정 부분입니다. - 여기서는 ScheduledThreadPoolExecutor를 이용하여 스레드 풀을 구성하였습니다.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Executor;
@Configuration@EnableAsyncpublicclassAsyncConfig {
/**
* Executor를 ScheduledThreadPoolExecutor로 지정합니다.
*
* @return Executor
*/@Bean(name = "scheduledThreadPoolExecutor")public Executor scheduledThreadPoolExecutor() {
// ScheduledThreadPoolExecutor를 생성하고 스레드 풀 크기를 5로 설정합니다.ScheduledThreadPoolExecutorexecutor=newScheduledThreadPoolExecutor(5);
// shutdown 후에도 기존 주기적 작업을 계속 실행할지 여부를 false로 설정합니다.
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
// shutdown 후에도 기존 지연 작업을 실행할지 여부를 false로 설정합니다.
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
return executor;
}
}
3.2. 사용 예제 : AsyncScheduledServiceImpl
💡 사용 예제 : AsyncScheduledServiceImpl - AsyncConfig 내에서 지정한 scheduledThreadPoolExecutor를 이용하여 스케줄링 작업을 수행합니다.
1. scheduleFixedRateTask(): 5초마다 실행됩니다. 2. scheduleFixedDelayTask(): 이전 작업이 완료된 후 3초 뒤에 실행됩니다. 3. scheduleTaskWithInitialDelay(): 1초 지연 후 시작되고, 이후 2초마다 실행됩니다.
- java.util.concurrent 패키지에 포함된 클래스 중 하나로, Java 7에서 도입된 고성능 병렬 처리를 위한 Executor를 의미합니다. 이는 큰 작업을 작은 단위로 분할(fork)하고 이를 병렬로 처리한 후 다시 결합(join)하는데 최적화되어 있습니다. 특히 재귀적인 작업 분할에 적합합니다.
- 워크-스털링(work-stealing) 알고리즘을 사용하여, 각 스레드가 자신의 작업을 완료한 후 다른 스레드의 작업을 훔쳐서 처리하는 방식입니다.
[ 더 알아보기 ] 💡 워크-스털링(Work-Stealing) 알고리즘 - 병렬 컴퓨팅에서 작업 부하를 동적으로 균등하게 분산시키기 위해 사용되는 기법입니다. 이 알고리즘은 특히 ForkJoinPool과 같은 병렬처리 프레임워크에서 효율적으로 활용됩니다.
1. 작업 분할 - 메인 스레드가 작업을 ForkJoinPool에 제출하면, 이 작업은 여러 개의 작은 작업으로 분할됩니다. 2. 작업 할당 - 분할된 작은 작업들은 각 워커 스레드의 작업 큐에 할당됩니다. 각 워커 스레드는 자신의 큐에 있는 작업을 처리합니다. 3. 작업 훔치기 - 어떤 워커 스레드가 자신의 작업 큐에 더 이상 처리할 작업이 없으면, 다른 워커 스레드의 큐에서 작업을 훔쳐와서 처리합니다. - 유휴 상태의 워커 스레드는 다른 워커 스레드의 큐에서 작업을 훔칠 대상을 무작위로 선택합니다. - 선택된 워커 스레드의 큐에서 가장 오래된 작업(큐의 맨 앞쪽에 있는 작업)을 훔쳐옵니다. - 훔쳐온 작업을 자신의 큐에 추가하고, 이를 처리합니다. 4. 작업 실행 - 모든 워커 스레드가 자신의 큐에 있는 작업을 처리하거나 다른 스레드의 큐에서 작업을 훔쳐와서 처리합니다. 5. 작업 완료 및 결합 - 모든 작은 작업이 완료되면, 결과를 결합하여 최종 결과를 생성합니다.
💡 ForkJoinPool 처리 과정 1. 작업 제출 - 메인 스레드가 ForkJoinPool에 작업을 제출합니다. 이 작업은 RecursiveTask 또는 RecursiveAction의 인스턴스여야 합니다.
2. 작업 분할 (Fork) - 제출된 작업은 특정 조건을 기준으로 더 작은 작업(SubTask)으로 분할됩니다. 이 과정은 compute() 메서드 내에서 이루어지며, fork() 메서드를 호출하여 새로운 서브 태스크를 생성하고 병렬로 실행합니다.
3. 작업 실행 - 분할된 작업들은 ForkJoinPool의 워커 스레드에 의해 병렬로 실행됩니다. 각 워커 스레드는 독립적으로 작업을 처리하며, 필요한 경우 다른 워커 스레드의 작업을 훔쳐서 처리할 수도 있습니다 (work stealing).
4. 결과 결합 (Join) - 모든 서브 태스크가 완료되면, join() 메서드를 통해 각 서브 태스크의 결과를 결합합니다. 이 단계는 재귀적으로 상위 작업까지 올라가며, 최종 결과를 생성합니다. 5. 결과 반환 - 최종 결과가 생성되면, ForkJoinPool은 결과를 호출한 메인 스레드로 반환합니다. 메인 스레드는 이 결과를 이용하여 후속 작업을 수행할 수 있습니다.
- ForkJoinPool에서 실행될 작업을 나타내는 추상 클래스입니다. RecursiveAction(반환 값이 없는 작업)과 RecursiveTask(반환 값이 있는 작업)를 사용하여 작업을 정의합니다. - RecursiveTask(반환 값이 있는 작업)으로 상속을 받은 Fibonacci 클래스의 compute()를 구현합니다. 이는 ForkJoinTask의 추상 메서드로 태스크를 수행할 로직에 대해서 정의합니다.
💡 compute()의 수행할 로직 처리 과정 - 해당 메서드에서는 피보나치의 수열을 재귀적으로 계산을 하는 과정입니다. - 피보나치의 값이 1인 경우 그대로 리턴을 수행하며, 값이 2 이상인 경우 수행을 합니다.
1. n-1과 n-2의 값을 계산하는 두 개의 작업을 생성하였습니다.
2. f1.fork()를 통해서 n-1의 작업을 비동기적으로 실행합니다.
3. f2.compute()는 현재 스레드에서 n-2 값을 계산합니다.
4. f1.join()은 f1 작업이 완료될 때까지 기다린 후 그 결과를 반환합니다.
5. 마지막으로 f2.compute()와 f1.join()의 결과를 더해 최종 값을 반환합니다.