반응형
해당 글에서는 비동기 처리를 수행하기 위한 Executor에 대해 이해를 돕기 위해 작성한 글입니다.
1) Spring Boot Async
💡 Spring Boot Async
- Spring Boot 환경에서 비동기 프로그래밍을 지원하는 기능을 의미합니다. 이를 통해서 메서드 호출이 즉시 반환되고 실제 작업은 별도의 스레드에서 비동기적으로 실행이 됩니다.
- 비동기 메서드를 사용하면 서버의 성능을 향상하고 응답 시간을 줄일 수 있습니다.
- 주로 I/O 작업이 긴 시간 소요 되는 작업에 대해 별도의 스레드를 수행하여 메인 스레드가 블로킹되지 않도록 합니다.
2) Executor
💡 Executor
- Spring Boot 환경에서 비동기(Async)를 처리하기 위해 @Async 어노테이션을 사용하여 처리할 수 있습니다. 이때의 각각의 Executor를 통해서 비동기 작업에 대해 설정하고 관리하는 역할을 수행합니다.
- Executor를 사용하면 직접 스레드를 관리하지 않고도 비동기 작업을 실행할 수 있어 코드의 가독성과 유지보수성을 높일 수 있습니다.
- Spring Boot에서는 기본적으로 SimpleAsyncTaskExecutor를 사용하여 비동기 작업을 처리합니다. 하지만 성능 최적화나 특정 요구 사항에 따라 커스텀 Executor를 정의할 수 있습니다.
💡 Executor 인터페이스
- Executor 인터페이스를 구현하기 위해서 여러 구현체로 사용이 됩니다.
- SimpleAsyncTaskExecutor는 Executor를 구현하기 위한 구현체로 Spring Framework에 내장되어 있습니다.
- ThreadPoolTaskExecutor, ScheduledThreadPoolExecutor, ForkJoinPool는 Executor를 구현하기 위한 구현체로 java.util.concurrent 패키지 내에 포함이 되어 있습니다.
Executor 구현체 종류 | 설명 | 특징 |
SimpleAsyncTaskExecutor | 각 작업을 새로운 스레드에서 실행하며, 스레드 풀을 사용하지 않음 | 설정이 간단하고, 많은 작업을 짧은 시간 내에 실행하면 자원 부족 가능성이 있음. |
ThreadPoolTaskExecutor | 스레드 풀을 사용하여 비동기 작업을 효율적으로 처리 | 스레드 풀의 크기, 큐 용량, 스레드 이름 접두사 등 다양한 설정이 가능하며 과도한 스레드 생성 방지 설정이 가능합니다. |
ScheduledThreadPoolExecutor | 주기적 또는 일정 시간 후에 작업을 실행 | 주기적 작업이나 지연된 작업을 예약하며 주기적인 작업 스케줄링에 사용합니다. |
ForkJoinPool | 병렬 처리를 위해 설계된 Executor | 작업을 작은 단위로 분할하여 병렬로 처리합니다. |
💡 Executor 인터페이스의 java.util.concurrent 패키지 클래스 구조
3) Executor : SimpleAsyncTaskExecutor
💡 Executor : SimpleAsyncTaskExecutor
- Spring Framework에서 제공하는 Executor 인터페이스의 간단한 구현체로, 비동기 작업을 처리하기 위해 사용됩니다.
- 이 클래스는 각 작업을 새로운 스레드에서 실행하며, 스레드 풀을 사용하지 않습니다. 따라서 다수의 작업을 동시에 실행할 수 있지만, 스레드 수가 많아질 경우 시스템의 리소스를 과도하게 사용할 수 있습니다.
1. 주요 특징
특징 | 설명 |
새로운 스레드 생성 | 각 작업을 실행할 때마다 새로운 스레드를 생성합니다. |
스레드 풀 없음 | 스레드 재사용을 위한 스레드 풀을 사용하지 않습니다. |
간단한 사용법 | 설정이 간단하고, 특별한 설정 없이도 사용할 수 있습니다. |
Concurrency Limit | setConcurrencyLimit 메서드를 통해 동시에 실행될 수 있는 최대 스레드 수를 제한할 수 있습니다. |
💡 [참고] 주의 사항
1. 자원관리
- 매번 새로운 스레드를 생성하므로 자원 관리에 주의해야 합니다. 많은 작업을 짧은 시간 내에 실행하면 시스템 자원이 부족해질 수 있습니다.
2. 스레드 풀 사용 권장
- 많은 수의 비동기 작업이 필요한 경우, ThreadPoolTaskExecutor와 같은 스레드 풀을 사용하는 것이 더 효율적입니다.
2. 사용 예제
2.1. 사용 예제 : AsyncConfig
💡 사용 예제 : AsyncConfig
- EnableAsync를 통해서 비동기 처리를 수행하는 부분에 대한 설정 부분입니다.
- 여기서는 SimpleAsyncTaskExecutor를 이용하여서, @Bean을 통해 simpleAsyncTaskExecutor의 이름을 지정하여 Executor를 구성하였습니다.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Bean(name = "simpleAsyncTaskExecutor")
public Executor simpleAsyncTaskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
2.2. 사용 예제 : AsyncExecutorServiceImpl
💡 사용 예제 : AsyncExecutorServiceImpl
- AsyncConfig 내에서 지정한 simpleAsyncTaskExecutor를 이용하여서 비동기를 수행합니다.
/**
* Spring Boot Async Executor 별 사용 예시 : 구현체
*/
@Service
public class AsyncExecutorServiceImpl implements AsyncExecutorService {
/**
* Config 내에서 지정한 simpleAsyncTaskExecutor를 이용한 비동기 통신
*/
@Async("simpleAsyncTaskExecutor")
@Override
public void simpleAsyncTaskExecutor() {
System.out.println("Execute method asynchronously - " + Thread.currentThread().getName());
}
}
3. 사용 예제 결과
💡 사용 예제 결과
- 위와 같이 Thread 이름을 출력하였을 때, SimpleAsyncTaskExecutor-1으로 출력이 됨을 확인하였습니다.
- 해당 경우는 스레드 풀을 사용하지 않고, 비동기 작업이 발생하는 경우 새로운 스레드를 생성하는 경우입니다.
4) Executor :ThreadPoolTaskExecutor
💡 Executor :ThreadPoolTaskExecutor
- java.util.concurrent 패키지에 포함된 클래스 중 하나로, 스레드 풀을 사용하여 비동기 작업을 효율적으로 처리하는 Executor를 의미합니다. 이는 다수의 작업을 동시에 실행하면서도 시스템 리소스를 최적화할 수 있는 방법을 제공합니다.
1. 스레드 풀 생성 및 처리 과정
💡 스레드 풀 생성 과정
- 애플리케이션이 시작될 때, 사전에 미리 지정한 스레드의 개수에 따라서 스레드 풀 내에 스레드가 생성이 됩니다.
💡 스레드 풀 처리 과정
- 생성된 스레드를 기반으로 이를 이용한 처리과정을 이해합니다
1. 애플리케이션(Application)
- 비동기 처리의 하나의 새로운 작업(New Task)을 발생시켰고, 이를 작업 큐로 제출(Submit)을 하게 됩니다.
2. 작업 큐(Task Queue)
- 전달받은 작업(Task)은 큐에서 스레드의 사용이 가능할 때까지 보관을 합니다.
- 또한 큐에서는 FIFO 형태로 먼저 들어온 데이터가 먼저 처리하는 구조를 가지며 순차적으로 데이터가 쌓이고 들어온 순서대로 처리가 수행됩니다.
3. 스레드(Thread)
- 스레드 풀에 있는 스레드 중 하나가 사용이 가능해지면, 작업 큐에서 작업을 가져와서 실행을 하게 됩니다.
- 작업이 완료되면, 스레드는 유휴 상태가 되며, 작업이 발생하면 다시 큐에서 다음 작업을 가져와서 처리를 합니다.
2. 주요 특징
특징 | 설명 |
스레드 풀 | 스레드 풀을 사용하여 스레드를 재사용하고, 새로운 작업이 들어올 때마다 기존 스레드를 활용합니다. (* 스레드 생성의 오버헤드를 줄이고, 자원을 효율적으로 사용할 수 있습니다) |
설정 가능 | 스레드 풀의 크기, 큐 용량, 스레드 이름 접두사 등 다양한 설정이 가능합니다. |
비동기 작업 | 비동기 작업을 효율적으로 처리하여 응답성을 높이고, 시스템 리소스를 최적화할 수 있습니다. |
성능 최적화 | 스레드 풀을 사용하여 과도한 스레드 생성으로 인한 성능 저하를 방지합니다. |
스레드 풀 관리자 | 스레드 풀의 상태를 모니터링하고 관리할 수 있습니다. getActiveCount(), getPoolSize()를 통해 활성 스레드나 현재 풀의 스레드 수를 확인할 수 있으며 JMX(자바 관리 확장)을 통해 스레드 풀의 상태를 모니터링하고 필요에 따라 설정을 변경할 수 있습니다. |
💡 [참고] 주의 사항
1. 적절한 설정
- 스레드 풀의 크기와 큐 용량을 적절히 설정해야 합니다. 너무 많은 스레드를 생성하거나 큐 용량을 초과하면 성능이 저하될 수 있습니다.
2. 자원 모니터링
-스레드 풀의 상태를 주기적으로 모니터링하여 자원이 적절히 사용되고 있는지 확인해야 합니다.
3. 예외 처리
- 비동기 작업에서 발생하는 예외를 적절히 처리해야 합니다. 그렇지 않으면 스레드 풀이 중단될 수 있습니다.
3. ThreadPoolTaskExecutor 클래스의 주요 메서드
메서드 | 리턴 타입 | 설명 |
cancelRemainingTask(Runnable task) | protected void | ExecutorService.shutdownNow()에서 반환된, 실행이 시작되지 않은 남은 작업을 취소합니다. |
createQueue(int queueCapacity) | protected BlockingQueue<Runnable> | ThreadPoolExecutor에 사용할 BlockingQueue를 생성합니다. |
execute(Runnable task) | void | 주어진 작업을 실행합니다. |
getActiveCount() | int | 현재 활성 스레드의 수를 반환합니다. |
getCorePoolSize() | int | ThreadPoolExecutor의 기본 유지 스레드 수를 반환합니다. |
getKeepAliveSeconds() | int | ThreadPoolExecutor의 유지 시간(초)을 반환합니다. |
getMaxPoolSize() | int | ThreadPoolExecutor의 최대 스레드 수를 반환합니다. |
getPoolSize() | int | 현재 풀의 스레드 수를 반환합니다. |
getQueueCapacity() | int | ThreadPoolExecutor의 BlockingQueue 용량을 반환합니다. |
getQueueSize() | int | 현재 큐의 크기를 반환합니다. |
getThreadPoolExecutor() | ThreadPoolExecutor | 네이티브 접근을 위한 ThreadPoolExecutor를 반환합니다. |
initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) | protected ExecutorService | 이 메서드는 ExecutorService를 기본 클래스에 노출하지만 실제 ThreadPoolExecutor 핸들을 내부적으로 저장합니다. |
initiateEarlyShutdown() | protected void | 초기 종료 신호: 추가 작업을 트리거하지 않고 기존 작업을 완료하게 합니다. |
setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) | void | 기본 스레드의 타임아웃 허용 여부를 지정합니다. |
setCorePoolSize(int corePoolSize) | void | ThreadPoolExecutor의 기본 유지 스레드 수를 설정합니다. |
setKeepAliveSeconds(int keepAliveSeconds) | void | ThreadPoolExecutor의 유지 시간(초)을 설정합니다. |
setMaxPoolSize(int maxPoolSize) | void | ThreadPoolExecutor의 최대 스레드 수를 설정합니다. |
setPrestartAllCoreThreads(boolean prestartAllCoreThreads) | void | 모든 기본 스레드를 미리 시작할지 여부를 지정합니다. |
setQueueCapacity(int queueCapacity) | void | ThreadPoolExecutor의 BlockingQueue 용량을 설정합니다. |
setStrictEarlyShutdown(boolean defaultEarlyShutdown) | void | 컨텍스트 종료 시 초기 종료 신호를 지정하여 모든 유휴 스레드를 해제하고 추가 작업 제출을 거부할지 여부를 지정합니다. |
setTaskDecorator(TaskDecorator taskDecorator) | void | 실행될 Runnable에 적용할 사용자 정의 TaskDecorator를 지정합니다. |
submit(Runnable task) | Future<?> | 실행을 위해 Runnable 작업을 제출하고 해당 작업을 나타내는 Future를 받습니다. |
submit(Callable<T> task) | <T> Future<T> | 실행을 위해 Callable 작업을 제출하고 해당 작업을 나타내는 Future를 받습니다. |
submitListenable(Runnable task) | ListenableFuture<?> | 실행을 위해 Runnable 작업을 제출하고 해당 작업을 나타내는 ListenableFuture를 받습니다. |
submitListenable(Callable<T> task) | <T> ListenableFuture<T> | 실행을 위해 Callable 작업을 제출하고 해당 작업을 나타내는 ListenableFuture를 받습니다. |
4. 사용 예제
4.1. 사용 예제 : AsyncConfig
💡 사용 예제 : AsyncConfig
- EnableAsync를 통해서 비동기 처리를 수행하는 부분에 대한 설정 부분입니다. 여기서는 threadPoolTaskExecutor를 이용하여 threadPoolTaskExecutor의 이름을 지정하여 Executor를 지정하였습니다.
1. setCorePoolSize() 메서드를 통해 최초 생성할 스레드 풀을 지정합니다.
2. setMaxPoolSize() 메서드를 통해 동시에 사용할 수 있는 최대 스레드 풀을 지정합니다.
3. setQueueCapacity() 메서드를 통해 큐에 저장이 될 용량을 지정합니다.
4. setThreadNamePrefix() 메서드를 통해 스레드의 이름의 prefix를 지정합니다.
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 스레드 풀이 최소한으로 유지할 스레드 수를 의미합니다.
executor.setMaxPoolSize(2); // 스레드 풀이 동시에 사용할 수 있는 최대 스레드 수를 의미합니다.
executor.setQueueCapacity(5); // 작업 큐의 용량을 500으로 설정합니다. 큐가 가득 차면 새로운 작업은 거부됩니다.
executor.setThreadNamePrefix("Async-"); // 생성되는 스레드의 이름 접두사를 "Async-"로 설정합니다. 이는 디버깅 및 모니터링 시 유용할 수 있습니다.
executor.initialize(); // 설정된 값을 바탕으로 executor를 초기화합니다.
return executor;
}
}
4.2. 사용 예제 : AsyncExecutorServiceImpl
💡 사용 예제 : AsyncExecutorServiceImpl
- AsyncConfig 내에서 지정한 threadPoolTaskExecutor를 이용하여서 비동기를 수행합니다.
- 스레드 풀을 테스트해보기 위해서 파라미터로 index를 받아서 출력을 합니다. 또한 2초간의 Thread를 중지하는 작업을 수행해 두었습니다.
@Async("threadPoolTaskExecutor")
@Override
public void threadPoolTaskExecutor(int index) {
System.out.println("Executing task " + index + " - " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 2초 동안 작업을 중지
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
4.3. 사용 예제 : 호출 부
💡 사용 예제 : 호출 부
- for문을 수행하면서 총 10번의 ThreadPoolExecutor를 호출하고 인덱스 값을 전달합니다.
@Test
void threadPoolTaskExecutor() {
for (int i = 0; i < 10; i++) {
asyncExecutorService.threadPoolTaskExecutor(i);
}
}
5. 사용 예제 결과
💡 사용 예제 결과
- 최대 스레드의 수를 2로 지정하고, 총 10개의 작업을 실행하도록 구성하였습니다.
- 큐의 용량이 5이므로 2개의 스레드를 사용중일 때 추가작업들은 큐에 저장이 되고, 큐가 가득 차면 새로운 작업은 거부가 됩니다.
- 이러한 설정을 통해 스레드 풀의 모든 스레드가 사용됩니다.
[ 더 알아보기 ]
💡 RejectedExecutionException 예외 발생
- 위의 문제에서는 RejectedExecutionException 예외가 발생하였고, 이는 스레드 풀의 용량이 초과되었음을 나타냅니다.
- 해당 ThreadPoolTaskExecutor는 최대 2개의 스레드를 실행할 수 있도록 설정되었고, 용량은 5개로 설정되어 있습니다.
- 하지만 10개의 작업이 동시에 실행되려고 시도되면서, 2개의 스레드가 활성화되고 5개의 작업이 큐에 대기 중일 때 추가 작업이 제출되면 큐 용량이 초과되어 거부됩니다. 그렇기에 적절한 스레드와 큐 용량을 지정해야 합니다.
5) Executor : ScheduledThreadPoolExecutor
💡 Executor : ScheduledThreadPoolExecutor
- java.util.concurrent 패키지에 포함된 클래스 중 하나로, 주기적으로 또는 일정한 지연 후에 작업을 실행할 수 있도록 스레드 풀을 관리하는 기능을 제공하는 Executor를 의미합니다.
- 이는 스레드 풀을 사용하여 작업을 처리하며, 작업이 일정 시간 간격으로 실행되거나 일정 시간 이후에 실행되도록 스케줄링을 구성할 수 있습니다.
- 주로 타이머 작업, 주기적인 데이터 백업, 정기적인 상태 체크 등 일정한 주기나 시간에 작업이 필요할 때 사용됩니다.
1. 주요 특징
항목 | 설명 |
스케줄링 | 작업이 일정 시간 후에 실행되거나 주기적으로 반복 실행되도록 스케줄링할 수 있습니다. |
스레드 풀 사용 | 스레드 풀을 사용하여 작업을 실행하며, 스레드 풀의 크기를 설정할 수 있습니다. |
다양한 메서드 | schedule, scheduleAtFixedRate, scheduleWithFixedDelay 등의 메서드를 제공하여 다양한 스케줄링 옵션을 지원합니다. |
예외 처리 | 작업 실행 중 예외가 발생하면, 해당 예외를 처리하고 다음 작업이 정상적으로 실행되도록 합니다. |
💡 [참고] 주의사항
1. 자원 관리
- 스레드 풀의 크기를 적절히 설정하여 자원을 효율적으로 사용해야 합니다. 너무 많은 스레드를 생성하면 시스템 자원이 부족해질 수 있습니다.
2. 예외 처리
- 스케줄링된 작업에서 발생하는 예외를 적절히 처리하여 스레드 풀이 중단되지 않도록 해야 합니다.
3. 스레드 종료
- 애플리케이션 종료 시 shutdown() 또는 shutdownNow() 메서드를 호출하여 스레드 풀을 적절히 종료해야 합니다.
2. 주요 메서드
메서드 | 리턴타입 | 설명 |
decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) | protected <V> RunnableScheduledFuture<V> | 실행할 Runnable을 수정하거나 대체합니다. |
decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) | protected <V> RunnableScheduledFuture<V> | 실행할 Callable을 수정하거나 대체합니다. |
execute(Runnable command) | void | 지연 없이 command를 실행합니다. |
getContinueExistingPeriodicTasksAfterShutdownPolicy() | boolean | 이 실행자가 shutdown된 후에도 기존 주기 작업을 계속 실행할지 여부에 대한 정책을 반환합니다. |
getExecuteExistingDelayedTasksAfterShutdownPolicy() | boolean | 이 실행자가 shutdown된 후에도 기존 지연 작업을 실행할지 여부에 대한 정책을 반환합니다. |
getQueue() | BlockingQueue<Runnable> | 이 실행자가 사용하는 작업 큐를 반환합니다. |
getRemoveOnCancelPolicy() | boolean | 취소된 작업을 취소 시점에 작업 큐에서 즉시 제거할지 여부에 대한 정책을 반환합니다. |
schedule(Runnable command, long delay, TimeUnit unit) | ScheduledFuture<?> | 주어진 지연 후에 활성화되는 일회성 작업을 제출합니다. |
schedule(Callable<V> callable, long delay, TimeUnit unit) | <V> ScheduledFuture<V> | 주어진 지연 후에 활성화되는 값 반환 일회성 작업을 제출합니다. |
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) | ScheduledFuture<?> | 주어진 초기 지연 후 처음 활성화되고 이후에 주어진 기간마다 활성화되는 주기적 작업을 제출합니다. |
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) | ScheduledFuture<?> | 주어진 초기 지연 후 처음 활성화되고 이후에 하나의 실행 종료와 다음 실행 시작 사이의 주어진 지연 후에 활성화되는 주기적 작업을 제출합니다. |
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) | void | 이 실행자가 shutdown된 후에도 기존 주기 작업을 계속 실행할지 여부에 대한 정책을 설정합니다. |
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) | void | 이 실행자가 shutdown된 후에도 기존 지연 작업을 실행할지 여부에 대한 정책을 설정합니다. |
setRemoveOnCancelPolicy(boolean value) | void | 취소된 작업을 취소 시점에 작업 큐에서 즉시 제거할지 여부에 대한 정책을 설정합니다. |
shutdown() | void | 이전에 제출된 작업은 실행되지만 새로운 작업은 수락되지 않는 정리된 종료를 시작합니다. |
shutdownNow() | List<Runnable> | 모든 활성 실행 작업을 중지하고, 대기 중인 작업의 처리를 중단하며, 실행 대기 중이던 작업 목록을 반환합니다. |
submit(Runnable task) | Future<?> | 실행을 위해 Runnable 작업을 제출하고 해당 작업을 나타내는 Future를 반환합니다. |
submit(Runnable task, T result) | <T> Future<T> | 실행을 위해 Runnable 작업을 제출하고 해당 작업을 나타내는 Future를 반환합니다. |
submit(Callable<T> task) | <T> Future<T> | 실행을 위해 값 반환 작업을 제출하고 해당 작업의 대기 중인 결과를 나타내는 Future를 반환합니다. |
3. 사용 예제
3.1. 사용 예제 : AsyncConfig
💡 사용 예제 : AsyncConfig
- EnableAsync를 통해서 비동기 처리를 수행하는 부분에 대한 설정 부분입니다.
- 여기서는 ScheduledThreadPoolExecutor를 이용하여 스레드 풀을 구성하였습니다.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig {
/**
* Executor를 ScheduledThreadPoolExecutor로 지정합니다.
*
* @return Executor
*/
@Bean(name = "scheduledThreadPoolExecutor")
public Executor scheduledThreadPoolExecutor() {
// ScheduledThreadPoolExecutor를 생성하고 스레드 풀 크기를 5로 설정합니다.
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
// shutdown 후에도 기존 주기적 작업을 계속 실행할지 여부를 false로 설정합니다.
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
// shutdown 후에도 기존 지연 작업을 실행할지 여부를 false로 설정합니다.
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
return executor;
}
}
3.2. 사용 예제 : AsyncScheduledServiceImpl
💡 사용 예제 : AsyncScheduledServiceImpl
- AsyncConfig 내에서 지정한 scheduledThreadPoolExecutor를 이용하여 스케줄링 작업을 수행합니다.
1. scheduleFixedRateTask(): 5초마다 실행됩니다.
2. scheduleFixedDelayTask(): 이전 작업이 완료된 후 3초 뒤에 실행됩니다.
3. scheduleTaskWithInitialDelay(): 1초 지연 후 시작되고, 이후 2초마다 실행됩니다.
package com.adjh.springbootasync.config;
import org.quartz.Scheduler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* Please explain the class!!
*
* @author : jonghoon
* @fileName : SchedulerConfig
* @since : 8/3/24
*/
@Configuration
@EnableScheduling
public class SchedulerConfig {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
@Async("scheduledThreadPoolExecutor")
@Scheduled(fixedRate = 5000)
public void scheduleFixedRateTask() {
System.out.println("Thread Name :: " + Thread.currentThread().getName());
System.out.println("Fixed rate task - " + LocalDateTime.now().format(formatter));
}
@Async("scheduledThreadPoolExecutor")
@Scheduled(fixedDelay = 3000)
public void scheduleFixedDelayTask() {
System.out.println("Thread Name :: " + Thread.currentThread().getName());
System.out.println("Fixed delay task - " + LocalDateTime.now().format(formatter));
}
@Async("scheduledThreadPoolExecutor")
@Scheduled(initialDelay = 1000, fixedRate = 2000)
public void scheduleTaskWithInitialDelay() {
System.out.println("Thread Name :: " + Thread.currentThread().getName());
System.out.println("Task with initial delay - " + LocalDateTime.now().format(formatter));
}
}
💡 [참고] 스케줄링으로 사용되는 Spring Boot Quartz에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
4. 사용 예제 결과
4.1. 사용 예제 결과 : scheduleFixedRateTask
💡 사용 예제 결과 -1 : scheduleFixedRateTask
- 지정한 매 초마다 수행되는 스케줄링을 구성하였습니다.
- 해당 메서드 내에서는 5초마다 비동기 적으로 처리되는 형태를 확인할 수 있습니다.
4.2. 사용 예제 결과 : scheduledThreadPoolExecutor
💡 사용 예제 결과 : scheduledThreadPoolExecutor
- 이전 작업이 완료된 후 지정한 시간에 맞춘 시간 뒤에 실행됩니다.
- 해당 메서드 내에서는 이전 작업이 완료된 이후 3초 뒤에 비동기적으로 처리되는 형태를 확인할 수 있습니다.
4.3. 사용 예제 결과 : scheduleTaskWithInitialDelay
💡 사용 예제 결과 : scheduleTaskWithInitialDelay
- 1초 지연 후 시작되고, 이후 2초마다 실행됩니다.
- 해당 메서드 내에서는 1초 지연 후 시작되고, 이후 2초마다 비동기적으로 처리되는 형태를 확인할 수 있습니다.
6) Executor : ForkJoinPool
💡 ForkJoinPool
- java.util.concurrent 패키지에 포함된 클래스 중 하나로, Java 7에서 도입된 고성능 병렬 처리를 위한 Executor를 의미합니다. 이는 큰 작업을 작은 단위로 분할(fork)하고 이를 병렬로 처리한 후 다시 결합(join)하는데 최적화되어 있습니다. 특히 재귀적인 작업 분할에 적합합니다.
- 워크-스털링(work-stealing) 알고리즘을 사용하여, 각 스레드가 자신의 작업을 완료한 후 다른 스레드의 작업을 훔쳐서 처리하는 방식입니다.
[ 더 알아보기 ]
💡 워크-스털링(Work-Stealing) 알고리즘
- 병렬 컴퓨팅에서 작업 부하를 동적으로 균등하게 분산시키기 위해 사용되는 기법입니다. 이 알고리즘은 특히 ForkJoinPool과 같은 병렬처리 프레임워크에서 효율적으로 활용됩니다.
1. 작업 분할
- 메인 스레드가 작업을 ForkJoinPool에 제출하면, 이 작업은 여러 개의 작은 작업으로 분할됩니다.
2. 작업 할당
- 분할된 작은 작업들은 각 워커 스레드의 작업 큐에 할당됩니다. 각 워커 스레드는 자신의 큐에 있는 작업을 처리합니다.
3. 작업 훔치기
- 어떤 워커 스레드가 자신의 작업 큐에 더 이상 처리할 작업이 없으면, 다른 워커 스레드의 큐에서 작업을 훔쳐와서 처리합니다.
- 유휴 상태의 워커 스레드는 다른 워커 스레드의 큐에서 작업을 훔칠 대상을 무작위로 선택합니다.
- 선택된 워커 스레드의 큐에서 가장 오래된 작업(큐의 맨 앞쪽에 있는 작업)을 훔쳐옵니다.
- 훔쳐온 작업을 자신의 큐에 추가하고, 이를 처리합니다.
4. 작업 실행
- 모든 워커 스레드가 자신의 큐에 있는 작업을 처리하거나 다른 스레드의 큐에서 작업을 훔쳐와서 처리합니다.
5. 작업 완료 및 결합
- 모든 작은 작업이 완료되면, 결과를 결합하여 최종 결과를 생성합니다.
1. ForkJoinPool 처리 과정
💡 ForkJoinPool 처리 과정
1. 작업 제출
- 메인 스레드가 ForkJoinPool에 작업을 제출합니다. 이 작업은 RecursiveTask 또는 RecursiveAction의 인스턴스여야 합니다.
2. 작업 분할 (Fork)
- 제출된 작업은 특정 조건을 기준으로 더 작은 작업(SubTask)으로 분할됩니다. 이 과정은 compute() 메서드 내에서 이루어지며, fork() 메서드를 호출하여 새로운 서브 태스크를 생성하고 병렬로 실행합니다.
3. 작업 실행
- 분할된 작업들은 ForkJoinPool의 워커 스레드에 의해 병렬로 실행됩니다. 각 워커 스레드는 독립적으로 작업을 처리하며, 필요한 경우 다른 워커 스레드의 작업을 훔쳐서 처리할 수도 있습니다 (work stealing).
4. 결과 결합 (Join)
- 모든 서브 태스크가 완료되면, join() 메서드를 통해 각 서브 태스크의 결과를 결합합니다. 이 단계는 재귀적으로 상위 작업까지 올라가며, 최종 결과를 생성합니다.
5. 결과 반환
- 최종 결과가 생성되면, ForkJoinPool은 결과를 호출한 메인 스레드로 반환합니다. 메인 스레드는 이 결과를 이용하여 후속 작업을 수행할 수 있습니다.
용어 | 설명 | 메서드 |
Task (태스크) | 처리해야 할 작업의 단위입니다. | - |
Fork (포크) | 큰 작업을 작은 작업으로 나누는 과정입니다. | ForkJoinTask 클래스의 fork() 메소드를 사용하여 새로운 서브태스크를 생성하고 실행할 수 있습니다. |
SubTask (서브태스크) | 큰 작업(Task)을 나누어 생성된 작은 작업 단위입니다. | - |
Join (조인) | 나뉜 작은 작업의 결과를 결합하는 과정입니다. | join() 메소드를 사용하여 분할된 작업의 결과를 모을 수 있습니다. |
Process Result (처리 결과) | 작업이 완료된 후 얻어진 결과입니다. | - |
2. 주요 특징
특징 | 설명 |
작업 분할 | 큰 작업을 작은 작업으로 재귀적으로 분할하여 병렬로 처리할 수 있습니다. |
작업 도둑 스케줄링 | 각 스레드가 자신의 작업 큐를 가지고 있으며, 다른 스레드의 큐에서 작업을 훔쳐와서 균형있게 작업을 분배합니다. |
성능 최적화 | CPU 코어를 최대한 활용하여 성능을 극대화합니다. |
ForkJoinTask 클래스 | ForkJoinPool에서 실행될 작업은 ForkJoinTask 클래스를 확장하여 구현합니다. |
💡 [참고] 주의 사항
1. 적절한 작업 분할
- 작업을 너무 많이 분할하면 오버헤드가 증가할 수 있습니다. 적절한 크기로 작업을 분할하는 것이 중요합니다.
2. 워크 스틸링의 한계
- 워크 스틸링은 각 스레드가 자신만의 작업 큐를 가지고 다른 스레드의 작업을 훔쳐 처리하는 방식입니다. 그러나 모든 작업이 고르게 분산되지 않으면 병목 현상이 발생할 수 있습니다.
3. 자원 관리
- ForkJoinPool의 스레드 풀 크기를 적절히 설정하여 시스템 자원을 효율적으로 사용해야 합니다.
3. 주요 메서드
메서드 | 리턴 타입 | 설명 |
awaitQuiescence(long timeout, TimeUnit unit) | boolean | 이 풀이 작동 중인 ForkJoinTask에 의해 호출된 경우, https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ForkJoinTask.html#helpQuiesce()와 동일한 효과를 가집니다. |
awaitTermination(long timeout, TimeUnit unit) | boolean | 종료 요청 후 모든 작업이 완료될 때까지, 또는 타임아웃이 발생할 때까지, 또는 현재 스레드가 인터럽트될 때까지 블록됩니다. |
commonPool() | static ForkJoinPool | 공용 풀 인스턴스를 반환합니다. |
drainTasksTo(Collection<? super ForkJoinTask<?>> c) | protected int | 실행되지 않은 제출된 작업과 포크된 작업을 일정 큐에서 제거하고 주어진 컬렉션에 추가합니다. |
execute(Runnable task) | void | 주어진 작업을 미래의 어느 시점에 실행합니다. |
execute(ForkJoinTask<?> task) | void | 주어진 작업의 (비동기) 실행을 준비합니다. |
getActiveThreadCount() | int | 현재 작업을 훔치거나 실행 중인 스레드의 수를 추정하여 반환합니다. |
getAsyncMode() | boolean | 이 풀이 로컬 선입선출(FIFO) 스케줄링 모드를 사용하는 경우 true를 반환합니다. |
getCommonPoolParallelism() | static int | 공용 풀의 목표 병렬 처리 수준을 반환합니다. |
getFactory() | ForkJoinPool.ForkJoinWorkerThreadFactory | 새 작업자를 구성하기 위한 팩토리를 반환합니다. |
getParallelism() | int | 이 풀의 목표 병렬 처리 수준을 반환합니다. |
getPoolSize() | int | 시작되었지만 아직 종료되지 않은 작업자 스레드의 수를 반환합니다. |
getQueuedSubmissionCount() | int | 이 풀에 제출되었지만 아직 실행되지 않은 작업의 수를 추정하여 반환합니다. |
getQueuedTaskCount() | long | 작업자 스레드에 의해 큐에 현재 보유 중인 작업의 전체 수를 추정하여 반환합니다. |
getRunningThreadCount() | int | 작업 조인 대기 또는 기타 관리 동기화를 기다리지 않는 작업자 스레드의 수를 추정하여 반환합니다. |
getStealCount() | long | 제출한 사람 외 다른 스레드에 의해 실행된 완료된 작업의 총 수를 추정하여 반환합니다. |
getUncaughtExceptionHandler() | Thread.UncaughtExceptionHandler | 작업을 실행하는 동안 복구 불가능한 오류로 인해 종료된 내부 작업자 스레드에 대한 핸들러를 반환합니다. |
hasQueuedSubmissions() | boolean | 아직 실행되지 않은 작업이 있는 경우 true를 반환합니다. |
invoke(ForkJoinTask<T> task) | <T> T | 주어진 작업을 수행하고 완료 시 그 결과를 반환합니다. |
invokeAll(Collection<? extends Callable<T>> tasks) | <T> List<Future<T>> | 주어진 작업을 실행하고 완료 시 상태와 결과를 보유하는 Future 목록을 반환합니다. |
isQuiescent() | boolean | 모든 작업자 스레드가 현재 유휴 상태이면 true를 반환합니다. |
isShutdown() | boolean | 이 풀이 종료된 경우 true를 반환합니다. |
isTerminated() | boolean | 종료 후 모든 작업이 완료된 경우 true를 반환합니다. |
isTerminating() | boolean | 종료 프로세스가 시작되었지만 아직 완료되지 않은 경우 true를 반환합니다. |
managedBlock(ForkJoinPool.ManagedBlocker blocker) | static void | 주어진 가능성 있는 블로킹 작업을 실행합니다. |
pollSubmission() | protected ForkJoinTask<?> | 사용 가능한 경우 다음 실행되지 않은 제출 작업을 제거하고 반환합니다. |
shutdown() | void | 이전에 제출된 작업을 실행하지만 새 작업은 수락하지 않는 질서 있는 종료를 시작할 수 있습니다. |
shutdownNow() | List<Runnable> | 모든 작업을 취소하거나 중지하고, 이후 제출된 작업을 거부하려고 시도할 수 있습니다. |
submit(Runnable task) | ForkJoinTask<?> | Runnable 작업을 실행하도록 제출하고 해당 작업을 나타내는 Future를 반환합니다. |
submit(Runnable task, T result) | <T> ForkJoinTask<T> | Runnable 작업을 실행하도록 제출하고 해당 작업을 나타내는 Future를 반환합니다. |
submit(Callable<T> task) | <T> ForkJoinTask<T> | 값을 반환하는 작업을 실행하도록 제출하고 해당 작업의 결과를 나타내는 Future를 반환합니다. |
submit(ForkJoinTask<T> task) | <T> ForkJoinTask<T> | ForkJoinTask를 실행하도록 제출합니다. |
toString() | String | 이 풀을 식별하는 문자열을 반환하고, 실행 상태, 병렬 처리 수준, 작업자 및 작업 수를 포함합니다. |
3. 사용 예제 : Fibonacci
3.1. 사용 예제 : Fibonacci
💡 사용 예제 : Fibonacci
- ForkJoinPool에서 실행될 작업을 나타내는 추상 클래스입니다. RecursiveAction(반환 값이 없는 작업)과 RecursiveTask(반환 값이 있는 작업)를 사용하여 작업을 정의합니다.
- RecursiveTask(반환 값이 있는 작업)으로 상속을 받은 Fibonacci 클래스의 compute()를 구현합니다. 이는 ForkJoinTask의 추상 메서드로 태스크를 수행할 로직에 대해서 정의합니다.
💡 compute()의 수행할 로직 처리 과정
- 해당 메서드에서는 피보나치의 수열을 재귀적으로 계산을 하는 과정입니다.
- 피보나치의 값이 1인 경우 그대로 리턴을 수행하며, 값이 2 이상인 경우 수행을 합니다.
1. n-1과 n-2의 값을 계산하는 두 개의 작업을 생성하였습니다.
2. f1.fork()를 통해서 n-1의 작업을 비동기적으로 실행합니다.
3. f2.compute()는 현재 스레드에서 n-2 값을 계산합니다.
4. f1.join()은 f1 작업이 완료될 때까지 기다린 후 그 결과를 반환합니다.
5. 마지막으로 f2.compute()와 f1.join()의 결과를 더해 최종 값을 반환합니다.
import java.util.concurrent.RecursiveTask;
public class Fibonacci extends RecursiveTask<Integer> {
private final int n;
public Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
4.2. 사용 예제 : AsyncExecutorServiceImpl
💡 사용 예제 : AsyncExecutorServiceImpl
- ForkJoinPool을 사용하여 Fibonacci 작업을 병렬로 계산합니다.
1. new ForkJoinPool() : ForkJoinPool 인스턴스를 생성합니다.
2.Fibonacci(10): 이를 통해서 피보나치수열의 10번째 숫자를 계산합니다.
3.** forkJoinPool.invoke: forkJoinPool을 사용하여서 task를 실행하고 결과값을 반환받습니다. invoke 메서드를 통해 동기적으로 호출되어 작업이 완료될 땨까지 기다립니다.
@Override
public void forkJoinPoolExecutor() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Fibonacci task = new Fibonacci(10);
Integer result = forkJoinPool.invoke(task);
System.out.println("Fibonacci result: " + result);
}
4. 사용결과
💡 사용 결과
- 위와 같은 실행 결과 ForkJoinPool Thread가 병렬적으로 처리되는 과정을 통해서 Fibonacci 결과 값이 반환됨을 확인하였습니다.
💡 [참고] 해당 구성 파일들을 아래에서 확인이 가능합니다.
오늘도 감사합니다. 😀
반응형