728x170
해당 글에서는 반응형 프로그래밍 중 이를 구현한 Spring WebFlux의 Publisher/Subscriber의 데이터 타입과 에러핸들러, 백프레셔 이용방법에 대해 알아봅니다
💡 [참고] Spring WebFlux 관련 글에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
분류 | 링크 |
Spring Boot Webflux 이해하기 -1 : 흐름 및 주요 특징 이해 | https://adjh54.tistory.com/232 |
Spring Boot Webflux 이해하기 -2 : 활용하기 | https://adjh54.tistory.com/233 |
Spring Boot WebFlux 이해하고 구현하기 -1 : 반응형 프로그래밍에서 WebFlux까지 흐름 | https://adjh54.tistory.com/627 |
Spring Boot WebFlux 활용하여 구현하기 -2: 계층구조 및 활용예시 | https://adjh54.tistory.com/628 |
Spring Boot WebFlux 활용하여 구현하기 -3: Publisher/Subscriber 데이터 처리 타입, 에러 핸들러, 백프레셔 | https://adjh54.tistory.com/629 |
Spring Boot WebFlux 활용 Github | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-webflux |
1) Spring Boot WebFlux의 Publisher - Subscriber 패턴
💡 Spring Boot WebFlux의 Publisher - Subscriber 패턴
- 발행자(Publisher)는 데이터 스트림을 생성하고 방출(emit)하는 역할을 수행하며, Mono, Flux의 주요 타입을 제공합니다.
- 구독자(Subscriber)는 발행자(Publisher)가 방출한 데이터를 수신하고 처리하는 역할을 수행합니다.
- 해당 처리되는 과정을 통해 ‘데이터의 흐름이 제어’되고, 백프레셔(backpressure)를 통해 Subscriber가 처리할 수 있는 만큼의 데이터만 받을 수 있도록 보장됩니다.
1. Subscriber → Publisher : subscribe
- 데이터를 받고자 하는 Subscriber는 Publisher에게 구독(subscribe)을 신청하는 단계입니다.
2. Subscription → Subscriber : onSubscribe
- Publisher가 구독(subscribe) 요청을 수락하고, Subscription 객체를 통해 Subscriber에게 구독이 성공했음을 알립니다.
3. Subscriber → Subscription : request(n)/cancel
- Subscriber가 처리할 수 있는 데이터의 양(n)을 지정하여 요청하거나, 필요한 경우 구독을 취소할 수 있습니다.
4. Subscription → Subscriber : onNext(data)
- 요청받은 데이터를 Subscriber에게 하나씩 전달합니다
5. Subscription → Subscriber : onComplete/onError
- 모든 데이터 처리가 완료되었거나 오류가 발생했을 때 Subscriber에게 알립니다.
구성 요소 | 설명 |
Publisher (발행자) | 데이터 스트림을 생성하고 발행하는 주체 |
Subscriber (구독자) | Publisher가 발행한 데이터를 받아 처리하는 주체 |
Subscription | Publisher와 Subscriber 간의 통신을 조정하는 중개자 |
2) Publisher 반응형 데이터 타입 : Mono
💡 Publisher 반응형 데이터 타입 : Mono
- Reactor 라이브러리에서 제공하는 Reactive Streams의 Publisher 중 하나로 오직 ‘0개 또는 하나의 데이터 항목 생성’하고 이 결과가 생성되고 나면 스트림이 종료되면 결과 생성을 종료합니다.
- Mono를 사용하여 비동기적으로 결과를 반환하면 해당 결과를 구독하는 클라이언트는 결과가 생성될 때까지 블로킹하지 않고 다른 작업을 수행할 수 있습니다.
public Mono<String> getData() {
// perform some database or API call to get data
return Mono.just("example data");
}
💡 [참고] Publisher-Subscriber 패턴에서 Publisher가 단일 데이터(data)를 전송하고자 할 때 Mono를 이용합니다.
1. Mono 처리과정(Timeline)
💡Mono 처리과정
- Mono는 최대 하나의 항목을 방출한 다음 신호(값의 유무에 관계없이 성공적인 Mono)로 종료되거나 혹은 단일 신호(실패한 Mono)만 방출합니다.
1. This is the timeline of the MonoTime flows from left to right
- 타임라인 흐름: Mono의 데이터는 왼쪽에서 오른쪽으로 시간순으로 처리됩니다.
2. These dotted lines and this boxindicate that a transformationis being applied to the Mono. The text inside the box showsthe nature of the transformation
- 변환 과정: 점선과 박스는 Mono에 적용되는 변환 작업을 나타내며, 박스 안의 텍스트는 변환의 종류를 설명합니다.
💡 상단 흐름 (Mono 클래스 수행 처리 성공) : 하나의 데이터를 반환하는 형태
1. This is the optional item emitted by the Mono
- 데이터 방출: Mono는 선택적으로 하나의 데이터 항목을 방출할 수 있습니다.
2. This vertical line indicates thatthe Mono has completed successfully
- 정상 완료: 세로 선은 Mono가 성공적으로 완료되었음을 나타냅니다.
💡 하단 흐름(Mono 클래스 수행 처리 실패)
1. This Mono is the result of the transformation
- 변환 결과: 변환 작업 후의 Mono는 새로운 결과값을 가집니다.
2. If for some reason the Mono terminatesabnormally, with an error, the verticalline is replaced by an X
- 비정상 종료: Mono가 에러로 인해 비정상 종료되면, 세로 선 대신 X 표시로 표시됩니다. ( * 해당 부분이 발생하면 데이터 처리는 중단이 됩니다.)
[ 더 알아보기 ]
💡 If for some reason the Mono terminatesabnormally, with an error, the verticalline is replaced by an X는 무슨 의미일까?
- 즉, Mono의 실행 과정에서 에러가 발생하면 타임라인 다이어그램에서 정상 완료를 나타내는 수직선 대신 실패를 나타내는 X 표시가 나타납니다.
- 예를 들어서 아래의 코드와 같습니다. 이는 map 연산 중 에러가 발생하면, Mono의 타임라인은 X로 종료되고 에러 핸들러가 호출됩니다.
Mono.just("data")
.map(str -> {
throw new RuntimeException("에러 발생!"); // 여기서 에러 발생
return str;
})
.subscribe(
data -> System.out.println("성공: " + data),
error -> System.out.println("실패: " + error.getMessage())
);
2. Mono 인스턴스 생성
💡 Mono 인스턴스 생성
- Mono 클래스를 객체로 구성하기 위해 이를 초기값을 지정하는 과정이 인스턴스 생성입니다.
- 이는 0개 또는 하나의 데이터 항목을 생성하는 Mono 클래스의 인스턴스 생성방법들에 대해 알아봅니다.
메서드 | 리턴 값 | 설명 |
empty() | static <T> Mono<T> | 빈 Mono를 생성합니다. 아무 값도 방출하지 않고 완료 신호만 보냅니다. |
just(T data) | static <T> Mono<T> | 주어진 데이터를 포함하는 Mono를 생성합니다. 하나의 값을 방출하고 완료됩니다. |
justOrEmpty(Optional<? extends T> data) | Mono<T> | Optional 값을 포함하는 Mono를 생성합니다. |
never() | Mono<T> | Optional 값을 포함하는 Mono를 생성합니다. |
error(Throwable error) | static <T> Mono<T> | 에러를 포함하는 Mono를 생성합니다. 지정된 에러로 즉시 종료됩니다. |
defer(Supplier<? extends Mono<? extends T>> supplier) | static <T> Mono<T> | 구독 시점에 Mono를 생성합니다. 각 구독마다 새로운 Mono 인스턴스가 생성됩니다. |
deferContextual(Function<ContextView, Mono<T>> factory) | Mono<T> | 컨텍스트 정보를 사용하여 지연 실행을 구현합니다. |
fromCallable(Callable<T> supplier) | static <T> Mono<T> | Callable을 Mono로 변환합니다. Callable의 실행 결과를 방출합니다. |
fromFuture(CompletableFuture<? extends T> future) | static <T> Mono<T> | Future를 Mono로 변환합니다. Future의 완료 결과를 방출합니다. |
fromRunnable(Runnable runnable) | static <T> Mono<T> | Runnable을 Mono로 변환합니다. Runnable 실행 완료 후 완료 신호를 보냅니다. |
package com.blog.springbootwebflux.component.mono;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Mono 인스턴스를 생성하는 다양한 예시
*
* @author : jonghoon
* @fileName : MonoInstanceExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class MonoInstanceExam {
/**
* Mono Instance 생성
*/
public void generateInstance() {
// 빈 Mono 생성
Mono<String> emptyMono = Mono.empty();
emptyMono.subscribe(value -> log.debug("empty 값: {}", value));
// 값으로 Mono 생성
Mono<String> mono = Mono.just("Hello");
mono.subscribe(value -> log.debug("just 값: {}", value));
// Optional 값으로 Mono 생성
Optional<String> optionalValue = Optional.of("Optional Value");
Mono<String> optionalMono = Mono.justOrEmpty(optionalValue);
optionalMono.subscribe(value -> log.debug("justOrEmpty 값: {}", value));
// 신호를 방출하지 않는 Mono 생성
Mono<String> neverMono = Mono.never();
neverMono.subscribe(value -> log.debug("never 값: {}", value));
// 에러로 Mono 생성
// Mono<String> errorMono = Mono.error(new RuntimeException("Error"));
// errorMono.subscribe(value -> log.debug("error 값: {}", value));
// defer를 사용한 지연 생성
Mono<String> deferredMono = Mono.defer(() -> Mono.just("Deferred Value"));
deferredMono.subscribe(value -> log.debug("defer 값: {}", value));
// Callable을 이용한 Mono 생성
Mono<String> callableMono = Mono.fromCallable(() -> {
// 시간이 걸리는 작업 수행
Thread.sleep(1000);
return "Callable 결과";
});
callableMono.subscribe(value -> log.debug("fromCallable 값: {}", value));
// Future를 이용한 Mono 생성
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Future 결과");
Mono<String> futureMono = Mono.fromFuture(future);
futureMono.subscribe(value -> log.debug("futureMono 값: {}", value));
// Runnable을 이용한 Mono 생성
Mono<Void> runnableMono = Mono.fromRunnable(() -> {
// 결과값 없이 실행만 하는 작업
System.out.println("작업 실행");
});
runnableMono.subscribe(value -> log.debug("fromRunnable 값: {}", value));
}
}
💡 출력 결과
- 아래와 같이 빈 Mono에 대해서는 출력이 되지 않고 있으며, 이외에는 출력이 잘되고 있음을 확인하였습니다
3. Mono 정적 메서드 활용
💡 Mono 정적 메서드 활용
- 클래스에 속하는 메서드로 클래스의 인스턴스 생성 없이 즉시 호출이 가능한 메서드입니다.
메서드 | 리턴타입 | 설명 |
delay(Duration duration) | Mono<Long> | 지정된 시간만큼 실행을 지연시킵니다. |
delay(Duration duration, Scheduler scheduler) | Mono<Long> | 지정된 스케줄러에서 지연 실행을 수행합니다. |
firstWithSignal(Iterable<Mono<T>> monos) | Mono<T> | 여러 Mono 중 첫 번째 신호를 방출하는 것을 선택합니다. |
firstWithValue(Iterable<Mono<T>> monos) | Mono<T> | 여러 Mono 중 첫 번째 값을 방출하는 것을 선택합니다. |
from(Publisher<T> source) | Mono<T> | Publisher를 Mono로 변환합니다. |
fromCallable(Callable<T> supplier) | Mono<T> | Callable을 Mono로 변환합니다. |
fromCompletionStage(CompletionStage<? extends T> completionStage) | Mono<T> | CompletionStage를 Mono로 변환합니다. |
fromCompletionStage(Supplier<CompletionStage<? extends T>> stageSupplier) | Mono<T> | CompletionStage Supplier를 Mono로 변환합니다. |
fromDirect(Publisher<? extends I> source) | Mono<I> | Publisher를 직접 Mono로 변환합니다. |
fromFuture(CompletableFuture<? extends T> future) | Mono<T> | CompletableFuture를 Mono로 변환합니다. |
fromRunnable(Runnable runnable) | Mono<T> | Runnable을 Mono로 변환합니다. |
fromSupplier(Supplier<? extends T> supplier) | Mono<T> | Supplier를 Mono로 변환합니다. |
ignoreElements(Publisher<T> source) | Mono<T> | Publisher의 모든 요소를 무시하고 완료 신호만 전달합니다. |
sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) | Mono<Boolean> | 두 Publisher의 시퀀스가 동일한지 비교합니다. |
using(Callable, Function) | Mono<T> | 리소스를 관리하며 Mono를 생성합니다. |
using(Callable, Function, boolean) | Mono<T> | eager cleanup 옵션을 포함한 리소스 관리 Mono를 생성합니다. |
using(Callable, Function, Consumer) | Mono<T> | 커스텀 cleanup 로직을 포함한 리소스 관리 Mono를 생성합니다. |
usingWhen(Publisher, Function, Function) | Mono<T> | 비동기 리소스 관리를 위한 Mono를 생성합니다. |
when(Iterable<Publisher>) | Mono<Void> | 여러 Publisher가 완료될 때까지 기다립니다. |
when(Publisher...) | Mono<Void> | 지정된 Publisher들이 완료될 때까지 기다립니다. |
whenDelayError(Iterable<Publisher>) | Mono<Void> | 에러를 지연시키며 Publisher들의 완료를 기다립니다. |
zip(Function, Mono...) | Mono<R> | 여러 Mono의 결과를 결합합니다. |
package com.blog.springbootwebflux.component.mono;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
/**
* Mono 타입의 Static Method 사용 예시
*
* @author : jonghoon
* @fileName : MonoStaticMethodExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class MonoStaticMethodExam {
public void staticMethodExam() {
// 1. delay() - 지정된 시간만큼 실행을 지연
Mono<Long> delayedMono = Mono.delay(Duration.ofSeconds(2));
delayedMono.subscribe(value -> log.debug("delay 값: {}", value));
// 2. firstWithValue() - 여러 Mono 중 첫 번째 값을 선택
Mono<String> mono1 = Mono.just("First");
Mono<String> mono2 = Mono.just("Second");
Mono<String> firstMono = Mono.firstWithValue(Arrays.asList(mono1, mono2));
firstMono.subscribe(value -> log.debug("firstWithValue 값: {}", value));
// 3. fromCallable() - Callable을 Mono로 변환
Mono<String> callableMono = Mono.fromCallable(() -> {
// 시간이 걸리는 작업 수행
Thread.sleep(1000);
return "작업 완료";
});
callableMono.subscribe(value -> log.debug("fromCallable 값: {}", value));
// 4. zip() - 여러 Mono의 결과를 결합
Mono<String> monoA = Mono.just("Hello");
Mono<String> monoB = Mono.just("World");
Mono<String> zipped = Mono.zip(monoA, monoB,
(a, b) -> a + " " + b);
zipped.subscribe(value -> log.debug("zip 값: {}", value));
}
}
💡 출력 결과
- 아래와 같이 정상적으로 출력되었으며, delay() 메서드의 경우는 특정 시간을 늦게 출력을 지정하였기에 추후에 호출이 됨을 확인하였습니다.
4. Mono 인스턴스 메서드
💡 인스턴스 메서드
- 객체의 인스턴스에 속하는 메서드로 인스턴스 생성 후 호출이 가능합니다.
- 아래의 예시에서는 Mono.just()는 정적 메서드를 사용하여 Mono 인스턴스를 생성하고, 그 후에 인스턴스 메서드인 map()과 filter()를 체이닝 하여 데이터를 변환하고 필터링합니다.
// 인스턴스 생성
Mono<String> mono1 = Mono.just("Hello"); // Mono 인스턴스 생성
// 인스턴스 메서드 예시
Mono<String> result = mono1
.map(str -> str.toUpperCase()) // map()은 인스턴스 메서드
.filter(str -> str.length() > 3); // filter()는 인스턴스 메서드
💡 인스턴스 메서드의 종류
- 아래의 메서드외에 공식사이트를 확인하시면 다양한 메서드를 확인할 수 있습니다.
메서드 | 리턴 타입 | 설명 |
map(Function) | Mono | 데이터를 변환하여 새로운 타입의 Mono를 생성합니다. |
flatMap(Function) | Mono | 데이터를 다른 Mono로 변환하고 중첩을 평탄화합니다. |
filter(Predicate) | Mono | 조건에 맞는 데이터만 필터링합니다. |
defaultIfEmpty(T) | Mono | 비어있는 경우 기본값을 제공합니다. |
switchIfEmpty(Mono) | Mono | 비어있는 경우 다른 Mono로 전환합니다. |
cache() | Mono | 결과를 캐시하여 재사용합니다. |
timeout(Duration) | Mono | 지정된 시간 내에 데이터가 없으면 에러를 발생시킵니다. |
retry() | Mono | 오류 발생 시 재시도합니다. |
zipWith(Mono) | Mono | 다른 Mono와 결합하여 튜플을 생성합니다. |
then() | Mono | 완료 신호만 전달하고 데이터는 무시합니다. |
package com.blog.springbootwebflux.component.mono;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* Mono Instance 메서드 사용예시
*
* @author : jonghoon
* @fileName : MonoInstanceMethod
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class MonoInstanceMethodExam {
public void staticMethodExam() {
// 기본적인 Mono 생성
Mono<String> mono = Mono.just("Hello");
// 1. map() - 데이터 변환
Mono<Integer> lengthMono = mono.map(str -> str.length());
lengthMono.subscribe(value -> log.debug("map 값: {}", value));
// 2. filter() - 조건에 맞는 데이터 필터링
Mono<String> filteredMono = mono.filter(str -> str.length() > 3);
filteredMono.subscribe(value -> log.debug("filter 값: {}", value));
// 3. defaultIfEmpty() - 비어있는 경우 기본값 제공
Mono<String> defaultMono = mono
.filter(str -> str.length() > 10)
.defaultIfEmpty("기본값");
defaultMono.subscribe(value -> log.debug("defaultIfEmpty 값: {}", value));
// 4. timeout() - 시간 제한 설정
Mono<String> timeoutMono = mono.timeout(Duration.ofSeconds(5));
timeoutMono.subscribe(value -> log.debug("timeout 값: {}", value));
// 5. zipWith() - 두 Mono 결합
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("World");
Mono<String> zippedMono = mono1.zipWith(mono2)
.map(tuple -> tuple.getT1() + " " + tuple.getT2());
zippedMono.subscribe(value -> log.debug("zipWith 값: {}", value));
// 6. flatMap() - 중첩된 Mono를 평탄화
Mono<String> flatMappedMono = mono.flatMap(str ->
Mono.just(str + " World")
.delayElement(Duration.ofSeconds(1))
);
flatMappedMono.subscribe(value -> log.debug("flatMap 값: {}", value));
}
}
💡 출력 결과
- 위에서 출력한 결과에 따라 아래와 같이 콘솔에서 출력이 됨을 확인하였습니다.
3) Publisher 반응형 데이터 타입 : Flux
💡 Publisher 반응형 데이터 타입 : Flux
- Reactor 라이브러리에서 제공하는 Reactive Streams의 Publisher 중 하나로 Mono와 달리 ‘여러 개의 데이터 항목’를 생성하고 스트림이 종료되면 결과 생성을 종료합니다.
- 비동기 작업을 수행하면 작업이 완료될 때까지 블로킹하지 않고 다른 작업을 수행할 수 있습니다.
- Spring WebFlux에서 Flux를 사용하여 HTTP 요청을 처리하는 경우, 요청을 수신한 즉시 해당 요청을 처리하고 결과를 생성하는 대신 결과 생성이 완료될 때까지 다른 요청을 처리할 수 있습니다.
public Flux<String> getStreamedData() {
// perform a continuous stream of data
return Flux.just("streamed data 1", "streamed data 2", "streamed data 3");
}
💡 [참고] Publisher-Subscriber 패턴에서 Publisher가 다중 데이터(data)를 전송하고자 할 때 Flux를 이용합니다.
1. Flux 처리과정(Timeline)
💡 Flux 처리과정(Timeline)
💡 상단 흐름 (성공적인 Flux)
- Flux는 0~N개의 요소를 방출한 후 완료(성공적으로 또는 오류 발생)되는 반응형 스트림입니다.
1. This is the timeline of the Flux Time flows from left to right
- 타임라인 흐름: Flux의 데이터는 왼쪽에서 오른쪽으로 시간순으로 처리됩니다.
2. These dotted lines and this box indicate that a transformation is being applied to the Flux. The text inside the box shows the nature of the transformation
- 변환 과정: 점선과 박스는 Flux에 적용되는 변환 작업을 나타내며, 박스 안의 텍스트는 변환의 종류를 설명합니다.
💡 상단 흐름 (성공적인 Flux) : 여러개의 데이터를 반환하는 형태
1. These are the items emitted by the Flux
- Flux는 여러 개의 데이터 항목을 연속적으로 방출합니다.
2. This vertical line indicates that the Flux has completed successfully
- 정상 완료: 세로 선은 Flux가 성공적으로 완료되었음을 나타냅니다.
💡 하단 흐름(Flux 클래스 수행 처리 실패)
1. This Flux is the result of the transformation
- 변환 결과: 변환 작업 후의 Flux는 새로운 결과값들을 가집니다.
2. If for some reason the Flux terminates abnormally, with an error, the vertical line is replaced by an X
- 비정상 종료: Flux가 에러로 인해 비정상 종료되면, 세로 선 대신 X 표시로 표시됩니다. ( * 해당 부분이 발생하면 데이터 처리는 중단이 됩니다.)
[ 더 알아보기 ]
💡 This Flux is the result of the transformation의 의미가 정확히 뭘까?
- 변환 작업(transformation) 이후에 생성된 새로운 Flux 스트림을 의미합니다.
- 아래와 같은 예를 통해서, 원본 Flux의 각 요소를 2배로 변환한 후, 그 결과로 새로운 Flux가 생성됩니다. 이처럼 변환 작업 후에 생성되는 새로운 Flux 스트림을 "transformation의 결과"라고 표현합니다.
- 이는 리액티브 프로그래밍의 불변성(immutability) 원칙을 반영하는 것으로, 원본 Flux를 직접 수정하는 대신 새로운 Flux를 생성하는 방식으로 동작합니다.
Flux.just(1, 2, 3)
.map(n -> n * 2) // 변환 작업 수행
// 이 시점에서 새로운 Flux(2, 4, 6)가 생성됨
.subscribe(System.out::println);
2. Flux 인스턴스 생성
💡 Flux 인스턴스 생성
- Flux 클래스를 객체로 구성하기 위해 이를 초기값을 지정하는 과정이 인스턴스 생성입니다.
- ‘여러 개의 데이터 항목’을 처리할 수 있는 Flux 클래스의 인스턴스 생성 방법들에 대해 알아봅니다.
메서드 | 설명 | 예시 |
just() | 하나 이상의 항목으로 Flux 생성 | Flux.just("A", "B", "C") |
fromArray() | 배열로부터 Flux 생성 | Flux.fromArray(new String[] {"A", "B"}) |
fromIterable() | Iterable로부터 Flux 생성 | Flux.fromIterable(Arrays.asList("A", "B")) |
range() | 연속된 정수 시퀀스 생성 | Flux.range(1, 5) |
interval() | 주기적으로 증가하는 Long 값 생성 | Flux.interval(Duration.ofSeconds(1)) |
empty() | 비어있는 Flux 생성 | Flux.empty() |
error() | 에러를 발생시키는 Flux 생성 | Flux.error(new RuntimeException()) |
fromStream() | Stream으로부터 Flux 생성 | Flux.fromStream(Stream.of("A", "B")) |
generate() | 프로그래밍 방식으로 Flux 생성 | Flux.generate(sink -> sink.next("데이터")) |
create() | 프로그래밍 방식으로 복잡한 Flux 생성 | Flux.create(sink -> { /* 복잡한 로직 */ }) |
/**
* Flux 인스턴스 생성
*/
private void fluxInstanceGenerate() {
// 1. just() - 하나 이상의 항목으로 Flux 생성
Flux<String> justFlux = Flux.just("A", "B", "C");
// 2. fromArray() - 배열로부터 Flux 생성
String[] array = {"A", "B"};
Flux<String> arrayFlux = Flux.fromArray(array);
// 3. fromIterable() - Iterable로부터 Flux 생성
List<String> list = Arrays.asList("A", "B");
Flux<String> iterableFlux = Flux.fromIterable(list);
// 4. range() - 연속된 정수 시퀀스 생성
Flux<Integer> rangeFlux = Flux.range(1, 5); // 1부터 5까지
// 5. interval() - 주기적으로 증가하는 Long 값 생성
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
// 6. empty() - 비어있는 Flux 생성
Flux<String> emptyFlux = Flux.empty();
// 7. error() - 에러를 발생시키는 Flux 생성
Flux<String> errorFlux = Flux.error(new RuntimeException("에러 발생"));
// 8. fromStream() - Stream으로부터 Flux 생성
Stream<String> stream = Stream.of("A", "B");
Flux<String> streamFlux = Flux.fromStream(stream);
// 9. generate() - 프로그래밍 방식으로 Flux 생성
Flux<String> generateFlux = Flux.generate(sink -> {
sink.next("데이터");
sink.complete();
});
// 10. create() - 프로그래밍 방식으로 복잡한 Flux 생성
Flux<String> createFlux = Flux.create(sink -> {
sink.next("데이터1");
sink.next("데이터2");
sink.complete();
});
}
💡 출력 결과
- 이전 Mono와 다르가 다중 데이터에 대해서 순차적으로 전달이 됨을 확인하였습니다.
3. Flux 정적 메서드 활용
💡 Flux 정적 메서드 활용
- Flux 클래스에서 제공하는 정적 메서드로 인스턴스 생성 없이 직접 호출이 가능합니다.
메서드 | 반환 타입 | 설명 |
concat() | Flux | 여러 Publisher를 순차적으로 연결하여 새로운 Flux를 생성합니다. |
merge() | Flux | 여러 Publisher를 병렬로 병합하여 새로운 Flux를 생성합니다. |
zip() | Flux | 여러 Publisher의 요소들을 조합하여 새로운 Flux를 생성합니다. |
combineLatest() | Flux | 여러 Publisher의 최신 요소들을 조합하여 새로운 Flux를 생성합니다. |
firstWithSignal() | Flux | 가장 먼저 신호를 발생시키는 Publisher의 요소를 포함하는 Flux를 생성합니다. |
package com.blog.springbootwebflux.component.flux;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
/**
* Flux 타입의 Static Method 사용 예시
*
* @author : jonghoon
* @fileName : FluxStaticMethodExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class FluxStaticMethodExam {
public void staticMethod() {
// concat() 예시 - 순차적 연결
Flux<String> flux1 = Flux.just("A", "B");
Flux<String> flux2 = Flux.just("C", "D");
Flux<String> concatenated = Flux.concat(flux1, flux2); // 결과: A, B, C, D
concatenated.subscribe(value -> log.debug("concat 값: {}", value));
// merge() 예시 - 병렬 병합
Flux<String> merged = Flux.merge(flux1, flux2); // 순서가 보장되지 않음
merged.subscribe(value -> log.debug("merge 값: {}", value));
// zip() 예시 - 요소 조합
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<String> letters = Flux.just("A", "B", "C");
Flux<String> zipped = Flux.zip(numbers, letters,
(n, l) -> n + l); // 결과: "1A", "2B", "3C"
zipped.subscribe(value -> log.debug("zip 값: {}", value));
}
}
💡 출력 결과
- 위에 예시에 따르는 결과값이 출력됨을 확인하였습니다.
4. Flux 인스턴스 메서드 활용
💡 Flux 인스턴스 메서드 활용
- 객체의 인스턴스에 속하는 메서드로 인스턴스 생성 후 호출이 가능한 인스턴스 메서드입니다.
- 아래의 예시에서는 Flux.just()는 정적 메서드를 사용하여 Just 인스턴스를 생성하고, 그 후에 인스턴스 메서드인 map(), filter(), flatMap() 등 체이닝 하여 데이터를 변환하고 필터링합니다.
메서드 | 반환 타입 | 설명 |
map() | Flux | 각 요소를 변환하여 새로운 Flux를 생성합니다. |
filter() | Flux | 조건에 맞는 요소만 필터링합니다. |
flatMap() | Flux | 각 요소를 새로운 Flux로 변환하고 하나의 Flux로 병합합니다. |
distinct() | Flux | 중복된 요소를 제거합니다. |
take(n) | Flux | 처음 n개의 요소만 추출합니다. |
skip(n) | Flux | 처음 n개의 요소를 건너뜁니다. |
merge() | Flux | 여러 Flux를 하나로 병합합니다. |
concat() | Flux | 여러 Flux를 순차적으로 연결합니다. |
zip() | Flux | 여러 Flux의 요소들을 조합합니다. |
reduce() | Mono | 모든 요소를 결합하여 단일 결과를 생성합니다. |
collectList() | Mono | 모든 요소를 List로 수집합니다. |
retry() | Flux | 오류 발생 시 재시도합니다. |
timeout() | Flux | 지정된 시간 내에 요소가 생성되지 않으면 오류를 발생시킵니다. |
subscribe() | void | Flux의 데이터를 구독하고 처리합니다. |
package com.blog.springbootwebflux.component.flux;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
/**
* Flux Instance 메서드 사용예시
*
* @author : jonghoon
* @fileName : FluxInstanceMethodExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class FluxInstanceMethodExam {
public void instanceMethod() {
Flux<Integer> numbers = Flux.just(1, 2, 2, 3, 4, 5, 5);
numbers
// map(): 각 요소를 2배로 변환
.map(n -> n * 2)
// filter(): 짝수만 필터링
.filter(n -> n % 2 == 0)
// distinct(): 중복 제거
.distinct()
// take(): 처음 3개 요소만 선택
.take(3)
// flatMap(): 각 숫자를 문자열로 변환하고 분리
.flatMap(n -> Flux.fromArray(String.valueOf(n).split("")))
// collectList(): List로 변환
.collectList()
// subscribe(): 결과 구독 및 처리
.subscribe(
result -> System.out.println("결과: " + result),
error -> System.out.println("에러 발생: " + error),
() -> System.out.println("처리 완료")
);
numbers.subscribe(value -> log.debug("다양한 처리 결과값: {}", value));
// 여러 Flux 결합 예시
Flux<String> flux1 = Flux.just("A", "B");
Flux<String> flux2 = Flux.just("C", "D");
// merge(): 여러 Flux를 하나로 병합
Flux<String> fluxMerge = Flux.merge(flux1, flux2);
fluxMerge.subscribe(value -> log.debug("merge 값: {}", value));
// zip(): 여러 Flux 요소들을 조합
Flux<String> fluxZip = Flux.zip(flux1, flux2, (f1, f2) -> f1 + f2);
fluxZip.subscribe(value -> log.debug("zip 값: {}", value));
}
}
💡 출력 결과
- 순차적으로 데이터가 전달이 됨을 확인하였습니다.
4) Subscriber 데이터 처리 타입 : onSubscribe, onNext, onError, onComplete
💡 Subscriber 데이터 처리 타입 : onSubscribe, onNext, onError, onComplete
- 리액티브 스트림에서 데이터를 수신하고 처리하는 컴포넌트입니다. Subscriber는 다음 네 가지 주요 콜백 메서드를 통해 데이터 스트림을 처리합니다
- 이러한, 메서드들은 순차적으로 호출되며, onError나 onComplete는 스트림의 종료를 의미하므로 둘 중 하나만 호출될 수 있습니다.
- 데이터의 전달은 onSubscribe → onNext(data) → onComplete/onError 순으로 처리가 됩니다.
메서드 | 설명 |
onSubscribe(Subscription s) | 구독이 시작될 때 가장 먼저 호출되며, Subscription 객체를 통해 데이터 요청을 제어할 수 있습니다. |
onNext(T t) | Publisher로부터 새로운 데이터가 발행될 때마다 호출되어 실제 데이터 처리를 수행합니다. |
onError(Throwable t) | 스트림 처리 중 에러가 발생했을 때 호출되며, 에러 처리 로직을 구현합니다. |
onComplete() | 모든 데이터의 처리가 성공적으로 완료되었을 때 호출됩니다. |
💡 Subscriber 예시
- onSubscribe: 구독이 시작될 때 "Subscribed!" 메시지를 출력합니다.
- onNext: 1부터 5까지의 각 숫자가 도착할 때마다 "Received: [숫자]"를 출력합니다.
- onComplete: 모든 데이터 처리가 끝나면 "Completed!" 메시지를 출력합니다.
- onError: 에러가 발생했을 경우 에러 메시지를 출력합니다 (위 예시에서는 에러가 발생하지 않습니다).
package com.blog.springbootwebflux.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
/**
* Please explain the class!!
*
* @author : jonghoon
* @fileName : SubscriberExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class SubscriberExam {
public void subsciberExam() {
// Publisher Flux 기반 데이터 생성
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
// Subscriber 데이터 구독
numbers.subscribe(
// onNext - 각 데이터 처리
data -> log.debug("Received: " + data),
// onError - 에러 처리
error -> log.debug("Error occurred: " + error),
// onComplete - 완료 처리
() -> log.info("Completed!"),
// onSubscribe - 구독 시작 시 처리
subscription -> {
log.debug("Subscribed!");
subscription.request(Long.MAX_VALUE);
}
);
}
}
💡사용 결과
- 아래와 같이 최초 onSubscribe → onNext - onComplete 순으로 수행이 됨을 확인하였습니다.
5) Webflux 에러 핸들러(Error Handler) : Mono, Flux
💡 Webflux 에러 핸들러(Error Handler) : Mono, Flux
- Spring WebFlux에서 처리과정 중 발생하는 에러 처리를 하는 방법입니다.
- Mono와 Flux에서 제공하는 다양한 에러 처리 메서드를 통해 예외 상황을 효과적으로 관리할 수 있습니다.
- 또한, 전역으로 에러 핸들러를 구성하여서 중앙에서 관리할 수 있습니다.
Handling Errors in Spring WebFlux | Baeldung
1. 기본적인 에러 처리 메서드
메서드 | 설명 |
onErrorReturn() | 에러 발생 시 대체값으로 기본값을 반환합니다. |
onErrorResume() | 에러 발생 시 대체 Publisher를 반환합니다. |
onErrorMap() | 발생한 에러를 다른 타입의 에러로 변환합니다. |
doOnError() | 에러 발생 시 부가적인 작업을 실행합니다. |
2. 에러 상세 메서드
메서드 | 리턴 타입 | 설명 |
onErrorReturn(Class<E> type, T fallbackValue) | Mono<T>/Flux<T> | 지정된 타입의 에러가 발생했을 때 대체값을 반환합니다. |
onErrorReturn(Predicate<Throwable> predicate, T fallbackValue) | Mono<T>/Flux<T> | 조건에 맞는 에러가 발생했을 때 대체값을 반환합니다. |
onErrorReturn(T fallbackValue) | Mono<T>/Flux<T> | 어떤 에러가 발생하든 대체값을 반환합니다. |
onErrorResume(Class<E> type, Function fallback) | Mono<T>/Flux<T> | 지정된 타입의 에러 발생 시 대체 Publisher를 구독합니다. |
onErrorResume(Function fallback) | Mono<T>/Flux<T> | 어떤 에러가 발생하든 대체 Publisher를 구독합니다. |
onErrorResume(Predicate predicate, Function fallback) | Mono<T>/Flux<T> | 조건에 맞는 에러 발생 시 대체 Publisher를 구독합니다. |
onErrorMap(Class<E> type, Function mapper) | Mono<T>/Flux<T> | 지정된 타입의 에러를 다른 에러로 변환합니다. |
onErrorMap(Function mapper) | Mono<T>/Flux<T> | 발생한 에러를 다른 에러로 변환합니다. |
onErrorMap(Predicate predicate, Function mapper) | Mono<T>/Flux<T> | 조건에 맞는 에러를 다른 에러로 변환합니다. |
doOnError(Class<E> type, Consumer onError) | Mono<T>/Flux<T> | 지정된 타입의 에러 발생 시 추가 동작을 실행합니다. |
doOnError(Consumer onError) | Mono<T>/Flux<T> | 에러 발생 시 추가 동작을 실행합니다. |
doOnError(Predicate predicate, Consumer onError) | Mono<T>/Flux<T> | 조건에 맞는 에러 발생 시 추가 동작을 실행합니다. |
💡 사용예시
1. onErrorReturn 예시 : 값 대체
- 예시에서는 Mono의 값이 빈값일 경우 IllegalArgumentException이 발생하며, 그렇지 않으면 “기본값”이라는 문자열을 반환(onErrorReturn)합니다.
2. onErrorReturn with Predicate 예시 : 특정 오류에 대한 값 대체
- 예시에서는 Mono의 값이 빈값일 경우 IllegalArgumentException이 발생하며, 특정 지정한 에러(IllegalArgumentException) 발생 시 “빈 문자열 에러 발생”라는 값으로 대체(onErrorReturn)합니다.
3. onErrorMap: 특정 오류에 대한 커스텀 에러로 대체
- 예시에서는 Mono의 값이 빈값일 경우 IllegalArgumentException이 발생하며, Custom으로 구성한 에러(CustomException)로 대체합니다.
4. onErrorResume : 특정 publisher 데이터 타입(Mono/Flux)로 대체
- 예시에서는 Mono의 값이 빈값일 경우 IllegalArgumentException이 발생하며, 에러 발생 시 “대체 데이터(onErrorResume)"라는 값으로 반환합니다.
5. doOnError: 에러 발생시 부가적인 작업(로깅 등..)을 수행함
- 예시에서는 Mono의 값이 빈값일 경우 IllegalArgumentException이 발생하며, 로깅과 같은 부가적인 작업을 수행(doOnError)하고 최종적으로 “기본값”이라는 문자열을 반환(onErrorReturn)합니다.
package com.blog.springbootwebflux.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
/**
* Please explain the class!!
*
* @author : jonghoon
* @fileName : ErrorHandlerExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class ErrorHandlerExam {
public void errorHandler() {
// 1. onErrorReturn 예시
Mono<String> mono1 = Mono.just("data")
.map(str -> {
if (str.isEmpty()) {
throw new IllegalArgumentException("Empty string");
}
return str.toUpperCase();
})
.onErrorReturn("기본값");
mono1.subscribe(value -> log.debug("onErrorReturn 사용예시: {}", value));
// 2. onErrorReturn with Predicate 예시
Mono<String> mono2 = Mono.just("")
.map(str -> {
if (str.isEmpty()) {
throw new IllegalArgumentException("Empty string");
}
return str.toUpperCase();
})
.onErrorReturn(
e -> e instanceof IllegalArgumentException, // 조건 체크
"빈 문자열 에러 발생" // IllegalArgumentException 발생시에만 이 값 반환
);
mono2.subscribe(value -> log.debug("onErrorReturn 사용예시: {}", value));
// 3. onErrorMap 예시
Mono<Object> mono3 = Mono.just("data")
.map(str -> {
throw new RuntimeException("원본 에러");
})
.onErrorMap(e -> new CustomException("변환된 에러", e))
.onErrorReturn("CustomException 발생시 기본값"); // 에러 처리 추가
mono3.subscribe(value -> log.debug("onErrorReturn 사용예시: {}", value));
// 4. onErrorResume 예시
Mono<Object> mono4 = Mono.just("data")
.map(str -> {
throw new RuntimeException("에러 발생");
})
.onErrorResume(e -> {
// 에러 발생시 대체 Publisher 반환
return Mono.just("대체 데이터");
});
mono4.subscribe(value -> log.debug("onErrorResume 사용예시: {}", value));
// 5. doOnError 예시
Mono<Object> mono5 = Mono.just("data")
.map(str -> {
throw new RuntimeException("에러 발생");
})
.doOnError(e -> {
System.err.println("에러 발생: " + e.getMessage());
// 로깅이나 기타 부가 작업 수행
})
.onErrorReturn("기본값");
mono5.subscribe(value -> log.debug("doOnError 사용예시: {}", value));
}
}
💡 사용 결과
1. onErrorReturn
- 해당 값이 빈값이 아니기에 Upper Case로 변환된 “DATA” 값이 출력됨을 확인하였습니다.
2. onErrorReturn with Predicate 예시
- 해당 값이 빈값이기에 IllegalArgumentException이 발생하였고, onErrorReturn에서 IllegalArgumentException가 발생하면 대체되는 “빈 문자열 에러 발생”가 출력이 됨을 확인하였습니다.
3. onErrorMap 예시
- 해당 값이 빈값이면 커스텀 오류로 처리가 되나, 빈 문자열이 아니기에 대체가 되는 “CustomException 발생 시 기본값”으로 출력이 되었습니다.
4. onErrorResume 예시
- 이를 수행하면 RuntimeException 오류가 발생하며, 오류가 발생시 대체가 되는 Publisher 타입인 Mono 객체를 반환을 함을 확인하였습니다.
5. doOnError 예시
- 이를 수행하면 RuntimeException 오류가 발생하며, 오류 발생 이후 doOnError()가 수행되어서 “에러 발생:”이 출력이 되며, 또한 "기본값"으로 리턴이 되는 것을 확인하였습니다.
💡 [참고] Mono, Flux의 에러처리를 위한 메서드에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
3. 고급 에러 처리 기법
💡 고급 에러 처리 기법
- 기본적인 에러 처리 방법 외에도 더 복잡한 시나리오를 처리할 수 있는 고급 에러 처리 기법을 제공합니다.
- 이러한 기법들은 재시도 로직, 타임아웃 처리, 백프레셔 조절 등을 포함하며, 실제 프로덕션 환경에서 발생할 수 있는 다양한 에러 상황을 효과적으로 관리할 수 있게 해줍니다.
함수 | 설명 |
retry() | 오류 발생 시 지정된 횟수만큼 재시도하는 함수입니다. |
timeout() | 특정 시간 내 응답이 없을 경우 TimeoutException 발생하는 함수입니다. |
package com.blog.springbootwebflux.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* Please explain the class!!
*
* @author : jonghoon
* @fileName : ErrorHandlerExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class ErrorHandlerExam {
/**
* retry(), timeout()을 통해 고급 에러 처리 방법
*/
public void advancedErrorHandler() {
// retry() 사용예시
Flux<Integer> fluxRetry = Flux.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error");
return i;
})
.retry(3) // 최대 3번 재시도
.onErrorReturn(
e -> e instanceof RuntimeException, // 조건 체크
0
);
fluxRetry.subscribe(value -> log.debug("처리 결과 :: {}", value));
// timeout() 사용예시
Mono<String> monoStr = Mono.just("data")
.delayElement(Duration.ofSeconds(2))
.timeout(Duration.ofSeconds(1))
.onErrorReturn("Timeout occurred");
monoStr.subscribe(value -> log.debug("처리 결과 :: {}", value));
}
}
💡 처리 결과
1. retry() 사용예시
- 순차적으로 1, 2, 3으로 구성이 된 경우에서 총 3번을 수행하도록 retry(3)을 수행합니다.
- 그렇기에, 순차적인 첫 번째 시도로 1의 값을 출력하고, 3번 수행하여 2를 넘기지 못하고 1을 3번 출력하여 총 4번 출력을 하며, RuntimeException이 발생하였기에 0이라는 값을 출력합니다.
2. timeout() 사용예시
- delayElement()를 통해서 2초간 지연을 시킵니다.
- timeout(Duration.ofSeconds(1))으로 1초 이내에 응답이 오지 않으면 TimeoutException이 발생하도록 설정합니다.
- onErrorReturn("Timeout occurred")를 통해 TimeoutException이 발생하면 "Timeout occurred"라는 문자열을 반환합니다.
- 결과적으로 2초 지연이 1초 timeout 설정보다 길어서 TimeoutException이 발생하고, "Timeout occurred"가 출력됩니다.
4. 전역 에러 핸들러 설정
💡 전역 에러 핸들러 설정
- 애플리케이션 전체에서 발생하는 예외를 중앙에서 처리하는 메커니즘입니다. Spring WebFlux에서는 WebExceptionHandler를 구현하여 전역 에러 처리를 설정할 수 있습니다.
- 애플리케이션 전체에 적용되는 일관된 에러 처리 로직을 구현할 수 있습니다.
- 다양한 예외 타입에 대해 서로 다른 처리 방식을 정의할 수 있습니다.
- 사용자에게 일관된 에러 응답 형식을 제공할 수 있습니다. 로깅, 모니터링 등의 부가 기능을 통합할 수 있습니다.
4.1. GlobalErrorWebExceptionHandler
💡GlobalErrorWebExceptionHandler
- 클라이언트에서 API 요청을 수행하는 경우 라우팅 되는 과정에서 전역 오류를 관리하는 Handler입니다.
- 애플리케이션에서 에러가 발생하면, 이 핸들러가 @Order(-2)로 높은 우선순위를 가지고 에러를 캐치합니다.
- 주요 기능은 모든 요청에 대한 에러 처리를 담당하는 역할을 수행하며, 에러 발생 시 BAD_REQUEST(400) 상태코드 반환하며 JSON 형식으로 에러 정보를 응답합니다.
1. getRoutingFunction()
- 메서드에서 모든 요청(RequestPredicates.all())에 대해 renderErrorResponse 메서드를 라우팅 하도록 설정되어있습니다.
2. renderErrorResponse()
- 에러 속성을 가져와서 BAD_REQUEST 상태와 함께 JSON 형식으로 응답을 생성합니다.
package com.blog.springbootwebflux.config;
import org.springframework.boot.autoconfigure.web.WebProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import java.util.Map;
/**
* 전역 에러 핸들러
*
* @author : jonghoon
* @fileName : GlobalErrorWebExceptionHandler
* @since : 24. 12. 28.
*/
@Component
@Order(-2)
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
public GlobalErrorWebExceptionHandler(ErrorAttributes errorAttributes, WebProperties webProperties, ApplicationContext applicationContext, ServerCodecConfigurer serverCodecConfigurer) {
super(errorAttributes, webProperties.getResources(), applicationContext);
this.setMessageWriters(serverCodecConfigurer.getWriters());
}
/**
* 모든 요청에 대해 renderErrorResponse 메서드를 라우팅합니다.
*
* @param errorAttributes ErrorAttributes
* @return RouterFunction<ServerResponse>
*/
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
/**
* 에러 속성을 가져와서 BAD_REQUEST 상태와 함께 JSON 형식으로 응답을 생성합니다.
*
* @param request ServerRequest
* @return Mono<ServerResponse>
*/
private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
return ServerResponse
.status(HttpStatus.BAD_REQUEST)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(errorPropertiesMap));
}
}
4.2. GlobalErrorAttributes
💡GlobalErrorAttributes
- Spring WebFlux에서 전역 에러 속성을 커스터마이징 하는 클래스입니다.
- 애플리케이션에서 발생하는 에러에 대해 일관된 형식의 응답을 제공하며, 클라이언트에게 더 명확한 에러 정보를 전달할 수 있게 해 줍니다.
1. getErrorAttributes()
- 사용자 정의의 에러 속성을 설정합니다. 에러 사항에 따라서 각각 다른 메시지를 출력하도록 구성하였습니다. 예를 들어서 400, 404, 500 에러가 발생하는 경우 각각에 따른 처리 과정을 보여주고 있습니다.
package com.blog.springbootwebflux.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
import org.springframework.data.crossstore.ChangeSetPersister;
import org.springframework.http.HttpStatus;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.resource.NoResourceFoundException;
import org.springframework.web.server.ResponseStatusException;
import java.util.HashMap;
import java.util.Map;
/**
* 전역 에러 속성을 커스터마이징한 클래스입니다.
*
* @author : jonghoon
* @fileName : GlobalErrorAttributes
* @since : 24. 12. 28.
*/
@Slf4j
public class GlobalErrorAttributes extends DefaultErrorAttributes {
@Override
public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
Throwable error = getError(request);
Map<String, Object> map = new HashMap<>();
if (error instanceof NoResourceFoundException) {
map.put("status", HttpStatus.NOT_FOUND);
map.put("errorCode", 404);
map.put("message", "요청한 리소스를 찾을 수 없습니다.");
} else if (error instanceof ResponseStatusException) {
map.put("status", HttpStatus.BAD_REQUEST);
map.put("errorCode", 400);
map.put("message", "잘못된 요청입니다.");
} else {
map.put("status", HttpStatus.INTERNAL_SERVER_ERROR);
map.put("errorCode", 500);
map.put("message", "서버 내부 오류가 발생했습니다.");
}
return map;
}
}
4.3. GlobalExceptionConfig
💡 GlobalExceptionConfig
1. errorAttributes()
- GlobalErrorAttributes을 등록합니다
2. resources()
- 에러 핸들러에 필요한 리소스를 제공합니다.
3. serverCodecConfigurer()
- GlobalErrorWebExceptionHandler를 위한 Bean 추가합니다.
package com.blog.springbootwebflux.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.web.WebProperties;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.ServerCodecConfigurer;
/**
* 전역으로 발생하는 Exception을 등록하는 환경설정 파일
*
* @author : jonghoon
* @fileName : GlobalExceptionConfig
* @since : 24. 12. 28.
*/
@Slf4j
@Configuration
public class GlobalExceptionConfig {
/**
* GlobalErrorAttributes 을 등록합니다.
*
* @return ErrorAttributes
*/
@Bean
public ErrorAttributes errorAttributes() {
return new GlobalErrorAttributes();
}
/**
* 에러 핸들러에 필요한 리소스를 제공합니다.
*
* @return WebProperties.Resources
*/
@Bean
public WebProperties.Resources resources() {
return new WebProperties.Resources();
}
/**
* GlobalErrorWebExceptionHandler를 위한 Bean 추가
*
* @return
*/
@Bean
public ServerCodecConfigurer serverCodecConfigurer() {
return ServerCodecConfigurer.create();
}
}
4.4. 사용결과
💡 사용 결과 : 404 에러
- 잘못된 리소스를 호출한 경우 Error인 경우 커스텀화 된 응답 코드를 반환하는 것을 확인하였습니다.
💡 사용 결과 : 400 에러
- 사용자 등록을 할 때 JSON 데이터를 포함하지 않은 경우, 아래와 같이 커스텀화된 응답 코드를 반환하는 것을 확인할 수 있습니다
6) 백프레셔(Backpressure) 제어 함수
💡 백프레셔(Backpressure) 제어 함수
- 데이터 스트림에서 Publisher와 Subscriber 간의 데이터 처리 속도 차이를 조절하는 중요한 메커니즘입니다.
- 데이터 흐름 제어: Subscriber가 처리할 수 있는 속도에 맞춰 데이터를 요청하고 수신합니다.
- 메모리 관리: 과도한 데이터 적재를 방지하여 시스템의 안정성을 유지합니다.
- 성능 최적화: 시스템 리소스를 효율적으로 사용하여 전체적인 성능을 향상합니다.
💡 백프레셔(Backpressure)
- 데이터 스트림에서 데이터 발행자(Publisher)와 구독자(Subscriber) 사이의 데이터 처리 속도 차이를 조절하기 위한 메커니즘입니다.
- 아래의 예시와 같이 데이터 발행자(Publisher)가 데이터를 데이터 구독자(Subscriber)에게 전달(emit)을 하는 경우 발행자의 데이터 속도가 구독자 속도보다 빠를 때 발생하는 문제에 대해서 백프레셔가 이를 속도를 조절하여 처리합니다.
- 이는 구독자가 처리할 수 있는 만큼의 데이터만 요청하여 시스템 과부하를 방지하며 메모리의 사용량을 효율적으로 관리하고 시스템의 안정성을 보장합니다.
- 아래의 예시에서는 데이터 구독자(Subsciber)는 데이터 발행자(Publisher)를 구독(Subscribe)하고 있습니다.
- 이때 데이터 전달(emit)을 통해서 데이터 1, 데이터 2, 데이터 3, 데이터 4, 데이터 N 형태로 데이터를 전달을 받는데, 데이터 구독자(Subsciber) 보다 많은 데이터가 전달이 되는 경우 시스템 과부하가 발생할 수 있습니다.
- 이에 따라서 백프레셔에서 이를 관리합니다.
1. 백프레셔 제어 함수
백프레셔(Backpressure) 제어 함수 | 설명 |
request(n) | Subscriber가 Publisher로부터 받을 데이터의 개수를 지정합니다. |
onBackpressureBuffer() | 처리되지 않은 데이터를 버퍼에 저장합니다. |
onBackpressureDrop() | 처리할 수 없는 데이터를 버립니다. |
onBackpressureLatest() | 가장 최근의 데이터만 유지하고 나머지는 버립니다. |
onBackpressureError() | 백프레셔가 발생했을 때 오류를 발생시켜 처리합니다. |
limitRate(n) | Publisher가 한 번에 전송할 수 있는 요소의 수를 제한합니다. |
sample(Duration) | 지정된 시간 간격으로 가장 최근 값만 전달받습니다. |
2. 백프레셔 사용예시
💡 사용 예시
1. request(n) 예시
- 구독자가 한 번에 처리할 데이터의 개수를 지정합니다. 예시에서는 100개의 데이터 중 10개만 요청하여 처리합니다.
2. onBackpressureBuffer() 예시
- 처리되지 않은 데이터를 버퍼에 임시 저장합니다. 예시에서는 최대 5개까지 버퍼링 하도록 설정되어 있습니다.
3. onBackpressureDrop() 예시
- 구독자가 처리할 수 없는 데이터는 폐기합니다. 처리되지 못한 데이터는 로그로 기록됩니다.
4. onBackpressureLatest() 예시
- 가장 최근의 데이터만 유지하고 나머지는 폐기합니다.
5. limitRate(n) 예시
- Publisher가 한 번에 전송할 수 있는 데이터 개수를 제한합니다. 예시에서는 5개씩 처리하도록 설정되어 있습니다.
6. sample(Duration) 예시
- 지정된 시간 간격으로 데이터를 샘플링합니다. 예시에서는 1초 간격으로 데이터를 샘플링합니다.
package com.blog.springbootwebflux.component;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import org.springframework.stereotype.Component;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* 백프레셔를 관리하는 메서드를 사용한 예시
*
* @author : jonghoon
* @fileName : BackpressureExam
* @since : 24. 12. 28.
*/
@Slf4j
@Component
public class BackpressureExam {
public void backpressureMethod() {
// 백프레셔 제어 함수 예시 코드
// 1. request(n) 예시 - 10개의 데이터만 요청
Flux.range(1, 100)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10);
}
@Override
protected void hookOnNext(Integer value) {
log.debug("Received: " + value);
}
});
// 2. onBackpressureBuffer() 예시 - 최대 5개까지 버퍼링
Flux.interval(Duration.ofMillis(1))
.onBackpressureBuffer(5)
.subscribe(value -> {
log.debug("Buffered value: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 3. onBackpressureDrop() 예시 - 처리할 수 없는 데이터는 드랍
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop(dropped ->
log.debug("Dropped value: " + dropped))
.subscribe(value -> {
log.debug("Processing: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 4. onBackpressureLatest() 예시 - 최신 데이터만 유지
Flux.interval(Duration.ofMillis(1))
.onBackpressureLatest()
.subscribe(value -> {
log.debug("Latest value: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 5. limitRate(n) 예시 - 한 번에 5개씩만 처리
Flux.range(1, 20)
.limitRate(5)
.subscribe(value ->
log.debug("Limited rate value: " + value));
// 6. sample() 예시 - 1초 간격으로 샘플링
Flux.interval(Duration.ofMillis(100))
.sample(Duration.ofSeconds(1))
.subscribe(value ->
log.debug("Sampled value: " + value));
// 7. sample() 예시 - 2초 동안 첫 번째 데이터만 처리
Flux.interval(Duration.ofMillis(500))
.sample(Duration.ofSeconds(2)) // throttleFirst와 비슷한 효과를 내기 위해 sample 사용
.subscribe(value ->
System.out.println("First value in 2 seconds: " + value));
// 8. sample() 예시 - 2초 동안 마지막 데이터만 처리
Flux.interval(Duration.ofMillis(500))
.sample(Duration.ofSeconds(2)) // throttleLast와 동일한 효과
.subscribe(value ->
System.out.println("Last value in 2 seconds: " + value));
}
}
💡 1. request(n) 사용 예시
- 1 ~ 100까지의 스트림 중에서 총 10개만 출력하도록 지정하여 콘솔상에 출력이 됩니다.
💡 2. onBackpressureBuffer() 예시
- Duration.ofMillis(1)를 통해서 1밀리 초마다 데이터를 발행을 하며, onBackpressureBuffer(5)를 통해서 버퍼를 수행합니다.
- 즉, 1밀리초마다 데이터를 발행하지만 Subscriber가 처리를 제대로 하지 못하는 경우 데이터는 버퍼(임시보관소)에 최대 5개까지 보관이 됩니다.
💡 3. onBackpressureDrop() 예시
- 구독자가 처리할 수 없는 데이터는 폐기합니다. 처리되지 못한 데이터는 로그로 기록됩니다.
- Duration.ofMillis(1)를 통해서 1밀리초마다 데이터를 발행을 하며, 처리되는 데이터의 경우 Processing으로 출력이 되고, 처리되지 못한 경우 폐기하는 데이터의 경우는 Dropped value로 데이터가 출력이 됩니다.
💡 4. onBackpressureLatest() 예시
- 가장 최근의 데이터만 유지하고 나머지는 폐기합니다.
- Duration.ofMillis(1)를 통해서 1밀리초마다 데이터를 발행을 하며, 가장 최신데이터의 경우 Lastet Value 값으로 출력이 됩니다.
💡 5. limitRate(n) 예시
- Publisher가 한 번에 전송할 수 있는 데이터 개수를 제한합니다. 예시에서는 5개씩 처리하도록 설정되어 있습니다.
- 총 범위 1에서부터 20까지 5개씩 처리하도록 설정이 되어서 콘솔상에 출력이 됩니다.
💡 6. sample(Duration)
- 지정된 시간 간격으로 데이터를 샘플링합니다. 예시에서는 1초 간격으로 데이터를 샘플링합니다.
- 샘플링은 연속적으로 발생하는 데이터 스트림에서 특정 시간 간격으로 데이터를 추출하는 과정을 의미합니다.
- 예를 들어, 1초마다 발생하는 센서 데이터에서 5초 간격으로만 데이터를 수집하고 싶을 때 사용할 수 있습니다.
- 해당 예시에서는 100밀리 초마다 데이터가 생성이 되지만 sample(Duration.ofSeconds(1))을 통해 1초 간격으로만 데이터를 수집하도록 설정되어 있습니다.
💡 7. sample(Duration)
- 2초 동안 첫 번째 데이터만 처리하도록 수행이 됩니다.
💡 8. sample(Duration)
- 2초 동안 마지막 데이터만 처리하도록 수행이 됩니다.
오늘도 감사합니다. 😀
그리드형