Java/WebFlux

[Java] Spring Boot WebFlux 활용하여 구현하기 -2: 계층구조 및 활용예시

adjh54 2024. 12. 28. 12:00
728x170
해당 글에서는 반응형 프로그래밍과 이를 구현하기 위한 Spring WebFlux의 계층 구조에 대해 이해하고 구현하는 예시에 대한 이해를 돕기 위해 작성한 글입니다.

 



 

 

💡 [참고] Spring Boot WebFlux에 대해 추가적으로 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
분류 링크
Spring Boot Webflux 이해하기 -1 : 흐름 및 주요 특징 이해 https://adjh54.tistory.com/232
Spring Boot Webflux 이해하기 -2 : 활용하기 https://adjh54.tistory.com/233
Spring Boot WebFlux 이해하고 구현하기 -1 : 반응형 프로그래밍에서 WebFlux까지 흐름 https://adjh54.tistory.com/627
Spring Boot WebFlux 활용하여 구현하기 -2: 계층구조 및 활용예시 https://adjh54.tistory.com/628
Spring Boot WebFlux 활용하여 구현하기 -3: Publisher/Subscriber 데이터 처리 타입, 에러 핸들러, 백프레셔 https://adjh54.tistory.com/629
Spring Boot WebFlux 활용 Github https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-webflux

 

 

1) Spring Webflux 계층 구조


💡 Spring Webflux 계층구조

- 반응형 프로그래밍을 지원하기 위해 설계된 웹 프레임워크로, 효율적인 비동기 처리를 위해 계층화된 구조를 가지고 있습니다.
- 각 계층은 특정 책임을 가지고 있으며, 이들이 유기적으로 연결되어 전체 애플리케이션을 구성합니다.

 

 

1. 계층 구조 특징


💡 계층 구조 특징

- Spring Webflux에서 구성된 각각 계층에 대한 주요한 특징과 처리되는 과정에 대해 알아봅니다.
계층(Layer) 주요 특징
표현 계층(Presentation Layer) - 클라이언트와 직접적인 통신 처리를 수행하며 HTTP 요청을 받아 적절한 비즈니스 로직으로 전달하는 역할을 수행합니다.
- 함수형 방식의 경우는 Router Functions와 Handler Functions이 위치하며 주석 기반 방식(annotation-based)에서는 Controller가 해당 계층에 위치합니다.
비즈니스 계층(Service Layer) - 표현 계층으로부터 호출 되어서 실제 비즈니스 로직을 처리하는 역할을 수행합니다.
- 해당 부분에서 트랜잭션이 관리되며, Reactive Stream을 활용하여 비동기 처리가 됩니다.
영속성 계층(Persistence Layer) - 관계형 데이터베이스(R2DBC)나 Reactive Mongo등을 통해서 데이터베이스에 접근하여 데이터를 조작합니다.
도메인 계층(Domain Layer) - 데이터베이스의 테이블과 매핑되는 객체를 정의합니다.
- Reactive 스트림과 호환되는 도메인 모델 구현

 

 

 

2. 계층 구조 처리흐름


💡 계층 구조 처리흐름

- 해당 계층 구조에서 클라이언트의 요청에 따라서 반응형 API 내에서 처리되는 순차적인 계층별 확인을 합니다.

1. Client → 표현 계층(Presentation Layer)

- 클라이언트가 데이터 처리를 위해서 표현 계층 엔드포인트로 데이터를 포함하여 요청을 수행합니다.
- 표현 계층 내에서는 요청받은 값(requestBody, PathVariable, requestParam, requestHeader..)에 따라서 데이터를 받아서 비즈니스 로직 처리 이후 값을 반환합니다.


2. 표현 계층(Presentation Layer) → 비즈니스 계층(Service Layer)

- 표현 계층 내에서 전달받은 데이터를 비즈니스 계층으로 전달하여 비즈니스 로직 처리를 수행합니다.
- 비즈니스 로직 처리 중 데이터 처리에 대한 부분은 영속성 계층을 호출하여 처리합니다.


3. 비즈니스 계층(Service Layer) → 영속성 계층(Persistence Layer)

- 비즈니스 계층에서 호출된 영속성 계층의 경우는 관계형 데이터베이스에 접근하는 R2DBC 드라이버를 사용하거나 Reactive Mongo를 통해서 관계형 데이터베이스에 접근하거나 MongoDB에 접근하여 비동기 처리를 수행합니다.


4. 영속성 계층(Persistence Layer) → 도메인 계층(Domain Layer)

- 영속성 계층에서 데이터 처리를 할 때, 테이블과 매핑된 도메인 계층의 Entity를 통해서 데이터를 관리합니다.
- 이러한 과정을 수행하고 최종 처리된 결과를 클라이언트에게 반환하는 구조입니다.

 

 

2) 표현 계층(Presentation Layer) : 어노테이션 기반 방식(annotation-based)


💡 표현 계층(Presentation Layer) : 어노테이션 기반 방식(annotation-based)

- Spring WebFlux 계층 중 표현 계층(Presentation Layer)을 구현하는 방법 중 어노테이션 기반 방식(annotation-based)에 대해 알아봅니다.

 

 

1. 어노테이션 기반 방식(annotation-based)


💡 어노테이션 기반 방식(annotation-based)

- 전통적인 Spring MVC와 유사한 어노테이션 기반 방식입니다. @RestController, @GetMapping 등의 어노테이션을 사용하여 직관적으로 API를 정의합니다.
- @RestController임을 클래스에 선언하여 웹 서비스의 Restful 형태의 API 임을 정의하고 @RequestMapping을 통해서 컨트롤러의 공통 어노테이션을 정의합니다.
- 또한, HTTP Method인 GET, POST, PATCH, PUT, DELETE와 매핑된 @GetMapping, @PostMapping을 통해서, HTTP Method와 메서드별 상세 엔드포인트를 지정합니다.

// RESTful API 방식
@RestController
public class UserController {
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userService.getUser(id);
    }
}

 

 

2. 어노테이션 기반 방식(annotation-based)의 주요 어노테이션


💡 어노테이션 기반 방식(annotation-based)의 주요 어노테이션

- 주요 어노테이션으로는 HTTP 요청 매핑 관련 어노테이션, 요청 데이터 바인딩 관련 어노테이션, 컨트롤러 정의 관련 어노테이션 등을 이용하여서 표현 계층을 구현합니다.
어노테이션 분류 설명 예시
@RestController 컨트롤러 정의 RESTful 웹 서비스의 컨트롤러를 정의하며, 응답을 JSON/XML 형태로 반환 @RestController
public class UserController {}
@Controller 컨트롤러 정의 전통적인 Spring MVC 컨트롤러를 정의하며, 주로 뷰를 반환 @Controller
public class ViewController {}
       
@RequestMapping HTTP 요청 매핑 모든 HTTP 메소드에 대한 일반적인 요청 매핑 @RequestMapping("/api")
public class ApiController {}
@GetMapping HTTP 요청 매핑 HTTP GET 요청을 처리하는 메서드 매핑 @GetMapping("/users")
public Mono<User> getUser() {}
@PostMapping HTTP 요청 매핑 HTTP POST 요청을 처리하는 메서드 매핑 @PostMapping("/users")
public Mono<User> createUser() {}
@PutMapping HTTP 요청 매핑 HTTP PUT 요청을 처리하는 메서드 매핑 @PutMapping("/users/{id}")
public Mono<User> updateUser() {}
@PatchMapping HTTP 요청 매핑 HTTP PATCH 요청을 처리하는 메서드 매핑 @PatchMapping("/users/{id}")
public Mono<User> updateUser() {}
@DeleteMapping HTTP 요청 매핑 HTTP DELETE 요청을 처리하는 메서드 매핑 @DeleteMapping("/users/{id}")
public Mono<Void> deleteUser() {}
       
@RequestBody 요청 데이터 바인딩 HTTP 요청 본문을 자바 객체로 변환 createUser(@RequestBody User user)
@PathVariable 요청 데이터 바인딩 URL 경로 변수를 메서드 파라미터로 바인딩 getUser(@PathVariable String id)
@RequestParam 요청 데이터 바인딩 요청 파라미터를 메서드 파라미터로 바인딩 getUser(@RequestParam String name)

 

 

Mapping Requests :: Spring Framework

You can map requests by using glob patterns and wildcards: Pattern Description Example ? Matches one character "/pages/t?st.html" matches "/pages/test.html" and "/pages/t3st.html" * Matches zero or more characters within a path segment "/resources/*.png" m

docs.spring.io

 

💡 [참고] Spring Web Annotation에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
 

[Java] Spring Web Annotation 이해하고 사용하기 -2 : 요청 및 응답

해당 글에서는 Spring Web에서 사용되는 주요 어노테이션 중 요청/응답과 관련된 어노테이션의 종류에 대해 상세히 알아봅니다. 💡 [참고] 이전에 작성한 Spring Web Annotation '환경구성' 글에서 이어

adjh54.tistory.com

 

💡 [참고] RESTful (@RestController)에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
 

[Java] RESTful API 설계 방법 -1 : 이해하기

해당 글에서는 Restful API에 대해서 이해하며 이를 통해 설계를 하는 방법에 대해서 이해하기 위해 작성한 글입니다. 1) REST / RESTful API 💡 REST(Representational State Transfer)란? - 웹 애플리케이션을 개발

adjh54.tistory.com

 

 

 

3) 표현 계층(Presentation Layer) : 함수형 방식(functional)


💡 표현 계층(Presentation Layer) : 함수형 방식(functional)

- Spring WebFlux 계층(Layer) 중 표현 계층(Presentation Layer)을 구현하는 방법 중 함수형 방식(functional)에 대해 알아봅니다.
구분 Router Function Handler Function
정의 HTTP 요청을 적절한 HandlerFunction에 매핑하는 라우팅 함수 ServerRequest를 받아 Mono<ServerResponse>를 반환하는 함수형 인터페이스
역할 URL 패턴과 HTTP 메소드를 기반으로 요청을 적절한 핸들러에 연결 실제 요청을 처리하고 응답을 생성
사용 위치 주로 @Configuration 클래스 내에서 @Bean으로 정의 @Component로 정의된 별도 클래스에서 구현
처리 흐름 Client Request → Router → Handler 순서로 요청을 전달 비즈니스 로직을 호출하고 응답을 생성하여 반환

 

[ 더 알아보기 ]

💡 Handler Function에서는 왜 Mono<ServerResponse> 값만 반환하는가? Flux<ServiceResponse>는 처리가 안되는가?


- Handler Function은 HTTP 요청을 처리하고 단일 HTTP 응답을 생성하는 것이 주요 목적입니다. 따라서 항상 하나의 ServerResponse를 반환해야 하며, 이것이 Mono로 래핑 되는 이유입니다.
- 즉, Flux는 여러 개의 요소를 비동기적으로 처리하는 데 사용되지만, HTTP 응답은 본질적으로 단일 응답이어야 하므로 Mono를 사용합니다. 만약 여러 아이템을 반환해야 한다면, 그 아이템들을 컬렉션이나 배열로 포함하는 단일 ServerResponse를 생성하면 됩니다.

 

 

1. 함수형 방식(functional)


💡 함수형 방식(functional)

- 함수형 프로그래밍 스타일을 따르는 방식으로, RouterFunction과 HandlerFunction을 사용하여 라우팅을 정의합니다
-
RouterFunction의 경우는 기존 Restful(@RestController)에서와 같이 엔드포인트를 관리하고, 처리를 Handler로 전달합니다.
- HandlerFunction의 경우는 엔드포인트로 전달받은 데이터(ServerRequest)를 기반으로 요청 데이터 처리를 하고 응답을 생성합니다.

 

Functional Endpoints :: Spring Framework

Spring WebFlux includes WebFlux.fn, a lightweight functional programming model in which functions are used to route and handle requests and contracts are designed for immutability. It is an alternative to the annotation-based programming model but otherwis

docs.spring.io

 

 

💡 [참고] Router / Handler Function 사용예시

- Router Function에서는 클라이언트의 요청에 대한 엔드포인트에 대해서만 정의하고 처리는 Handler Function에게 이관합니다.
- Handler Function에서는 클라이언트의 요청 데이터나 반환 데이터를 서비스로 이관하여 비즈니스 로직을 처리하고 반환 값을 구성하는 역할을 수행합니다.
// Router Function 방식
@Configuration
public class RouterConfig {
    @Bean
    public RouterFunction<ServerResponse> route(UserHandler userHandler) {
        return RouterFunctions
            .route(GET("/users/{id}"), userHandler::getUser)
            .andRoute(POST("/users"), userHandler::createUser);
    }
}

// Handler 구현
@Component
public class UserHandler {
    private final UserService userService;

    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    public Mono<ServerResponse> getUser(ServerRequest request) {
        String userId = request.pathVariable("id");
        return userService.findById(userId)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(User.class)
            .flatMap(user -> userService.createUser(user))
            .flatMap(savedUser -> ServerResponse.created(URI.create("/users/" + savedUser.getId()))
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(savedUser));
    }
}

 

 

 

[더 알아보기]

💡 반환 값을 Mono, Flux를 받는다면 비동기 처리가 되는 걸까?


- 아닙니다. WebFlux를 제대로 활용하기 위해서는 모든 컴포넌트가 리액티브여야 최대의 성능을 발휘할 수 있습니다.
- 이를 최대한 활용하기 위해서는 비동기-논블로킹 방식으로 동작해야 하며, JDBC와 같은 블로킹 작업은 피해야 하고 전체 리액티브 스트림에 대한 이해가 필요합니다.
- 결론적으로, 단순히 컨트롤러의 반환 타입만 Mono나 Flux로 바꾸는 것은 진정한 리액티브 프로그래밍이 아닙니다. 데이터베이스 접근, 외부 서비스 호출 등 모든 계층이 리액티브 하게 구현되어야 합니다.

 

 

2. 함수형 방식(functional) : Router Function


💡 함수형 방식(functional) : Router Function

- 클라이언트의 요청을 처리하고 응답을 반환하는 첫 번째 계층입니다.
- Router Function은 함수형 엔드포인트를 정의하는 방식으로, HTTP 요청을 적절한 핸들러로 라우팅 하는 역할을 합니다.
- Router를 구성할 때, 단일 Router Function으로 구성을 하거나 중첩 Router Function으로 구성할 수 있습니다.
 

Functional Endpoints :: Spring Framework

Spring WebFlux includes WebFlux.fn, a lightweight functional programming model in which functions are used to route and handle requests and contracts are designed for immutability. It is an alternative to the annotation-based programming model but otherwis

docs.spring.io

 

3.1. 일반 Router


💡 일반 Router

- 단순히 각각의 엔드포인트를 순차적으로 정의하는 방식입니다.
- 아래의 예시코드처럼 [GET] /users, [GET] /users/{id}, [POST] /users 형태로 엔드포인트가 구성이 되어 있습니다.
@Configuration
public class RouterConfig {
    @Bean
    public RouterFunction<ServerResponse> route(UserHandler userHandler) {
        return RouterFunctions
            .route(GET("/users"), userHandler::getAllUsers)
            .andRoute(GET("/users/{id}"), userHandler::getUser)
            .andRoute(POST("/users"), userHandler::createUser);
    }
}

 

 

3.2. Nested Router


💡 Nested Router

- 연관된 엔드포인트들을 그룹화하여 계층적으로 구성하는 방식입니다. 코드가 더 구조화되고 관리하기 쉬워집니다.
- 아래 예시에서는 path()를 사용하여 그룹화합니다.
- 그룹화 과정을 통해 엔드포인트들을 논리적으로 그룹화하고 코드의 가독성과 유지보수를 향상합니다. 또한 공통 경로나 미들웨어를 쉽게 적용할 수 있습니다
- 아래의 예시코드처럼 [GET] /api/v1/user/user/{userId}, [GET] /api/v1/user/user/, [GET] /api/v1/code/code/{codeId} 형태로 엔드포인트가 구성이 되어 있습니다.
@Configuration
public class RouterConfig {
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler, CodeHandler codeHandler) {
        return RouterFunctions
                .route()
                .path("/api/v1", builder -> builder
                        .path("/user", userBuilder -> userBuilder
                                .GET("/user/{userId}", userHandler::findTbUserByUserId)
                                .GET("/users", userHandler::findTbUserByUserNm)
                        )
                        .path("/code", codeBuilder -> codeBuilder
                                .GET("code/{codeId}", codeHandler::findAllByCd)
                        )
                )
                .build();
    }
}

 

 

3. 함수형 방식(functional) : Handler Function


💡 함수형 방식(functional) : Handler Function

- HandlerFunction은 HTTP 요청을 처리하고 응답을 생성하는 함수형 인터페이스입니다.
- ServerRequest를 입력으로 받아 Mono<ServerResponse>를 반환하는 구조로 되어있습니다.
- 주요 역할은 요청 데이터 추출 및 검증, 비즈니스 로직 호출, 응답 생성 및 반환의 역할을 수행합니다.
 

Functional Endpoints :: Spring Framework

Spring WebFlux includes WebFlux.fn, a lightweight functional programming model in which functions are used to route and handle requests and contracts are designed for immutability. It is an alternative to the annotation-based programming model but otherwis

docs.spring.io

 

4.1. ServerRequest


💡 ServerRequest

- HTTP 요청을 처리하는 인터페이스로, 클라이언트로부터 받은 요청 데이터를 추출하고 검증하는 역할을 합니다. 경로 변수, 쿼리 파라미터, 요청 본문 등을 처리할 수 있습니다.

 

💡 ServerRequest 주요 추출 메서드
메서드 설명 사용 예시
pathVariable() URL 경로 변수 추출 request.pathVariable("id")
queryParam() 쿼리 파라미터 추출 request.queryParam("name").orElse("default")
headers() 헤더 값 추출 request.headers().firstHeader("Authorization")
bodyToMono() 요청 본문을 객체로 변환 request.bodyToMono(User.class)
cookies() 쿠키 값 추출 request.cookies().getFirst("sessionId").getValue()
methodName() HTTP 메서드 확인 request.methodName()
uri() 요청 URI 확인 request.uri()

 

 

ServerRequest (Spring Framework 6.2.1 API)

bodyToFlux  reactor.core.publisher.Flux  bodyToFlux(Class  elementClass) Extract the body to a Flux. Type Parameters: T - the element type Parameters: elementClass - the class of element in the Flux Returns: the body as a flux

docs.spring.io

 

💡 Handler Function의 ServerRequest 사용 예시

- Router Function으로부터 전달받은 사용자 데이터는 ServerRequest를 통해서 전달받으며, 해당 내용 내에서 각각을 추출할 수 있습니다.
package com.blog.springbootwebflux.handler;

import com.blog.springbootwebflux.model.dto.UserDto;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;

import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.net.URI;

@Component
public class ExampleHandler {
    public Mono<ServerResponse> handleGetRequest(ServerRequest request) {
        // 1. 경로 변수 추출
        String id = request.pathVariable("id");

        // 2. 쿼리 파라미터 추출
        String name = request.queryParam("name").orElse("default");

        // 3. 헤더 값 추출
        String authHeader = request.headers().firstHeader("Authorization");

        // 4. 요청 본문 추출 (JSON을 User 객체로 변환)
        Mono<UserDto> userMono = request.bodyToMono(UserDto.class);

        // 5. 쿠키 추출
        String sessionId = request.cookies().getFirst("sessionId")
                .getValue();

        // 6. 요청 메소드 확인
        String method = request.methodName();

        // 7. 요청 URI 확인
        URI uri = request.uri();

        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue("Request processed successfully");
    }

}

 

 

4.2. ServerResponse


💡 ServerResponse

- HTTP 응답을 생성하는 인터페이스입니다. 상태 코드, 헤더, 본문 등을 포함한 응답을 생성하여 클라이언트에게 반환합니다.

 

💡 ServerResponse 주요 메서드
메서드 설명 사용 예시
ok() 200 OK 응답 생성 ServerResponse.ok().build()
created() 201 Created 응답 생성 ServerResponse.created(URI.create("/resource/1")).build()
noContent() 204 No Content 응답 생성 ServerResponse.noContent().build()
badRequest() 400 Bad Request 응답 생성 ServerResponse.badRequest().build()
notFound() 404 Not Found 응답 생성 ServerResponse.notFound().build()

 

 

ServerResponse (Spring Framework 6.2.1 API)

writeTo Write this response to the given web exchange. Parameters: exchange - the web exchange to write to context - the context to use when writing Returns: Mono to indicate when writing is complete

docs.spring.io

 

 

💡 Handler Function의 ServerResponse 사용예시

- 해당 경우는 비즈니스 로직을 처리한 이후에 클라이언트에게 반환을 해주는 응답 방법입니다.
@Component
public class ExampleHandler {
    public Mono<ServerResponse> handleRequest(ServerRequest request) {
        // 1. 기본 성공 응답
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(new ResponseDto("Success"));

        // 2. 생성 성공 응답
        return ServerResponse.created(URI.create("/resources/1"))
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(newResource);

        // 3. 에러 응답
        return ServerResponse.badRequest()
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(new ErrorResponse("Invalid input"));

        // 4. 조건부 응답
        return findResource()
            .flatMap(resource -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(resource))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

 

 

4.3. Handler Classes


💡 Handler Classes

- 실제 요청 처리 로직이 구현되는 클래스들입니다. ServerRequest를 받아서 비즈니스 로직을 처리하고 ServerResponse를 반환하는 역할을 수행합니다.

 

 

💡 Handler Classess 사용예시 

1. 요청 처리: ServerRequest를 통해 경로 변수, 요청 본문 등을 추출합니다.
2. 비즈니스 로직 연계: UserService를 통해 실제 비즈니스 로직을 수행합니다.
3. 응답 생성: ServerResponse를 사용하여 적절한 HTTP 응답을 생성합니다.
4. 에러 처리: 존재하지 않는 리소스나 잘못된 요청에 대해 적절한 에러 응답을 반환합니다.
// 사용자 조회 핸들러
public Mono<ServerResponse> getUserById(ServerRequest request) {
    // 1. 요청 처리 - ServerRequest를 통해 경로 변수 추출
    String userId = request.pathVariable("id");         
    
    // 2. 비즈니스 로직 연계 - Service 계층 호출
    return userService.findUserByUserId(userId)
            // 3. 응답 생성 - 성공 시 200 OK와 함께 사용자 정보 반환
            .flatMap(user -> ServerResponse.ok()
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(user))
            // 4. 에러 처리 - 사용자가 없을 경우 404 Not Found 반환
            .switchIfEmpty(ServerResponse.notFound().build());
}

 

 

4) 영속성 계층(Persistence Layer)


💡 영속성 계층(Persistence Layer)

- 데이터베이스와의 상호작용을 담당하는 계층입니다. Spring WebFlux에서는 리액티브 프로그래밍을 지원하는 특별한 리포지토리 인터페이스와 구현체를 사용합니다.


- 일반적으로 JDBC를 이용하는 경우 SQL Mapper, JPA를 이용하여 블록 - 동기 형태의 데이터베이스 처리가 수행되었습니다.
- 리액티브 프로그래밍을 구현하는 WebFlux에서는 일반적인 관계형 데이터베이스에서는 R2DBC나 NoSQL에서는 Reactive Mongo 등을 이용하여서 비동기 처리를 수행합니다.

 

1. R2DBC(Reactive Relational Database Connectivity)


💡 R2DBC (Reactive Relational Database Connectivity)

- 반응형 프로그래밍(Reactive Programming)에서 관계형 데이터베이스에 접근하기 위한 표준 API 사양(Specification)을 의미합니다.
- JDBC와 다르게 non-blocking 방식을 통해 비동기적 데이터베이스 액세스를 지원하도록 특별 설계가 되었습니다.
- 각 데이터베이스 벤더(PostgreSQL, MySQL 등)는 이 사양을 구현한 R2DBC 드라이버를 제공합니다.
- 예를 들어서, PostgreSQL을 사용할 때는 'r2dbc-postgresql' 드라이버를 사용합니다.

 

 

2. R2DBC를 지원하는 드라이버


💡 R2DBC를 지원하는 드라이버

- 데이터베이스 드라이버는 애플리케이션과 데이터베이스 간의 통신을 가능하게 해주는 소프트웨어 구성요소를 의미합니다.
- 관계형 데이터베이스 내에서 R2DBC를 지원하는 데이터베이스 드라이버에 대해 확인을 해봅니다.
데이터베이스 R2DBC 드라이버 명 관련 링크
Oracle oracle-r2dbc https://github.com/oracle/oracle-r2dbc
H2 r2dbc-h2 https://github.com/r2dbc/r2dbc-h2
MariaDB r2dbc-mariadb https://github.com/mariadb-corporation/mariadb-connector-r2dbc
MySQL r2dbc-mysql https://github.com/asyncer-io/r2dbc-mysql
PostgreSQL r2dbc-postgresql https://github.com/pgjdbc/r2dbc-postgresql
MSSQL r2dbc-mssql https://github.com/r2dbc/r2dbc-mssql

 

 

💡 [참고] 아래의 공식 사이트 Driver를 확인하시면 이외에 지원하는 데이터베이스 드라이버를 확인할 수 있습니다.
 

R2DBC

R2DBC 0.8.1.RELEASE: A standard API for reactive programming using SQL databases.

r2dbc.io

 

3. ReactiveCrudRepository Interface


💡 ReactiveCrudRepository Interface

- Spring Data R2DBC에서 제공하는 기본 인터페이스로, 리액티브 프로그래밍 방식의 CRUD(Create, Read, Update, Delete) 작업을 지원합니다.

 

💡 [참고] ReactiveCrudRepository를 이용한 사용예시

- Repository 내에서 ReactiveCrudRepository를 상속받아서 구성하였습니다.
package com.blog.springbootwebflux.repository;

import com.blog.springbootwebflux.model.entity.UserEntity;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Repository
public interface UserRepository extends ReactiveCrudRepository<UserEntity, Long> {

    Mono<UserEntity> findTbUserByUserId(String userId);

    Flux<UserEntity> findTbUserByUserNm(String userName);
}

 

 

4. 리액티브 트랜잭션(Reactive Transaction)


💡 리액티브 트랜잭션(Reactive Transaction)

- 비동기-논블로킹 방식으로 데이터베이스 트랜잭션을 처리하는 메커니즘입니다. 전통적인 동기식 트랜잭션과 달리, 리액티브 트랜잭션은 스레드를 차단하지 않고 효율적으로 리소스를 활용합니다.

- 비동기 처리: 트랜잭션 작업이 완료될 때까지 스레드를 차단하지 않고, 다른 작업을 수행할 수 있습니다.
- 선언적 트랜잭션: @Transactional 어노테이션을 사용하여 메서드 레벨에서 트랜잭션을 선언할 수 있습니다.
- 트랜잭션 전파: 여러 리액티브 작업 간에 트랜잭션을 전파하고 관리할 수 있습니다.
- 롤백 관리: 예외 발생 시 자동으로 트랜잭션을 롤백하며, 프로그래밍 방식으로도 롤백을 제어할 수 있습니다.

 

💡 어노테이션 기반 트랜잭션

- 어노테이션 기반 트랜잭션(@Transactional)은 Spring에서 제공하는 선언적 트랜잭션 관리 방식으로, 메서드나 클래스 레벨에서 트랜잭션 경계를 정의할 수 있습니다.
@Service
@Transactional
public class ReactiveTransactionService {
    public Mono<Result> performTransaction() {
        return operation1()
            .flatMap(this::operation2)
            .flatMap(this::operation3);
    }
}

 

💡 [참고] @Transactional 어노테이션에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
 

[Java] @Transactional 어노테이션 상세 이해하기 : Checked, Unchecked Exception Rollback

해당 글에서는 @Transactional 어노테이션에 대해 상세한 이해를 돕기 위해 작성한 글입니다. 💡 [참고] 다양한 Annotation에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.구분링크Spring Web

adjh54.tistory.com

 

 

5) 계층 구조 활용 예시


💡 계층 구조 활용 예시

- 해당 예시에서는 계층 구조가 처리되는 방식을 구현한 예시입니다. 해당 시나리오는 비동기 병렬 처리를 위한 비즈니스 로직 처리 과정입니다.
- 시나리오에서는 임의의 사용자를 10명을 등록하여 비동기 API 통신을 확인합니다.
- 해당 등록을 위해서 사용자 아이디 조회 → 사용자 등록 → 이메일 전송 처리를 수행합니다.


1. UserServiceTest → RouterConfig
- Test 클래스 내에서는 WebClient를 통해 사용자를 등록하는 API 호출을 수행합니다. 해당 과정에서 총 10번의 사용자 등록을 하도록 호출하였습니다.


2. RouterConfig → UserHandler
- 호출된 엔드포인트를 기반으로 처리를 위한 UserHandler의 registerUser() 메서드를 호출합니다.


3. UserHandler → UserService

- UserHandler에서는 UserService를 호출하고 처리과정 중 성공을 하면 “1”을 반환하고 에러가 발생하면 “0”을 반환합니다.


4. UserSerivce → UserServiceImpl
- UserServiceImpl 구현체에서 인터페이스의 비즈니스 로직을 처리합니다. 비즈니스 로직은 사용자 아이디를 기반으로 사용자 조회 → 중복 체크를 통과하면 사용자 등록 → 사용자 등록이 완료되면 이메일을 전송하는 처리과정을 비동기로 처리를 수행합니다.


5. UserServiceImpl → UserRepository
- 비즈니스 로직 처리 중 데이터 처리를 위해서 UserRepository를 호출합니다.


6. UserRepository → ReactiveCrudRepository
- ReactiveCrudRepository로부터 상속을 받아서 구현합니다. 이를 통해 데이터베이스에 접근하여 비동기적 데이터를 조회합니다.

 

1. 프로젝트 구조


💡 프로젝트 구조

- 해당 처리를 구성하기 위해서 프로젝트 패키지 구조는 아래와 같이 구성하였습니다

패키지 주요 파일 설명
config RouterConfig.java Router Function으로 클라이언트의 엔드포인트를 받아주는 기능을 수행
handler UserHandler.java Handler Function으로 Router Function의 엔드포인트에 대한 데이터의 요청/응답을 담당
model UserEntity.java Repository와 매핑이 되어서 테이블 컬럼들을 정의 및 담당
repository UserRepository.java R2DBC-PostgreSQL과 연결되어서 SQL문 수행
service UserService.java
UserServiceImpl.java
비즈니스 로직 처리 수행

 

 

2. 의존성 주입 : build.gradle


💡 의존성 주입 : build.gradle

- spring-boot-starter-webflux : 반응형 프로그래밍을 구현하는 라이브러리
- spring-boot-starter-data-r2dbc : 비동기식 데이터 처리를 구현하기 위한 라이브러리
- spring-boot-starter-mail : 이메일 전송 처리를 수행하는 라이브러리
- r2dbc-postgresql : R2DBC로 PostgreSQL을 연결하는 드라이버 라이브러리
- reactor-test : 반응형 프로그래밍을 테스트하기 위한 테스트 라이브러리
ext {
    set('bootVer', "3.3.6")
    set('jacksonVer', "2.16.1")
}

dependencies {
    // Spring Boot Starter
    implementation "org.springframework.boot:spring-boot-starter-webflux:${bootVer}"        // Webflux
    implementation "org.springframework.boot:spring-boot-starter-data-r2dbc:${bootVer}"     // R2DBC
    implementation "org.springframework.boot:spring-boot-starter-mail:${bootVer}"           // Mail

    // OpenSource
    implementation 'org.postgresql:r2dbc-postgresql:1.0.7.RELEASE'                          // R2DBC - PostgresSQL
    implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVer}"              // Jackson Databind

    // Runtime & Compile & test
    runtimeOnly 'org.postgresql:postgresql'                                                 // Postgres
    compileOnly 'org.projectlombok:lombok'                                                  // Lombok

    annotationProcessor 'org.projectlombok:lombok'                                          // Lombok
    testImplementation 'io.projectreactor:reactor-test:3.7.1'                               // Reactor Test
    testImplementation 'org.springframework.boot:spring-boot-starter-test'                  // JUnit
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'                            // JUnit
}

 

 

3. DDL 구조


💡 DDL 구조

- 패키지 내에 resouces/db 패키지 내에 ddl.sql 파일을 구성해 두었습니다.
- 아래와 같이 간단한 테이블을 구성하는 DDL입니다.

-- 사용자 테이블
create table tb_user
(
    user_sq serial
        constraint tb_user_pk
            primary key,
    user_id text,
    user_pw text,
    user_nm text,
    user_st text
);

alter table tb_user
    owner to localmaster;

 

 

4. RouterConfig.java


💡 RouterConfig.java

- API Endpoint 관리하는 Router Function입니다. 이는 중첩 라우팅 방식을 통해서 API Endpoint를 구성하였습니다.
- /api/v1/user/users를 호출하면 userHandler의 findTbUserByUserNm 메서드가 호출이 됩니다.
package com.blog.springbootwebflux.config;

import com.blog.springbootwebflux.handler.CodeHandler;
import com.blog.springbootwebflux.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ReactorResourceFactory;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.resources.LoopResources;

/**
 * API Endpoint 관리하는 Router Function입니다.
 *
 * @author : leejonghoon
 * @fileName : RouterConfig
 * @since : 2024. 12. 4.
 */
@Configuration
public class RouterConfig {

    /**
     * Reactor Resource Control
     * - 글로벌 리소스 비활성화
     * - 이벤트 루프 스레드 설정
     *
     * @return ReactorResourceFactory
     */
    @Bean
    public ReactorResourceFactory reactorResourceFactory() {
        ReactorResourceFactory factory = new ReactorResourceFactory();
        factory.setUseGlobalResources(false); // 글로벌 리소스 사용 비활성화
        factory.setLoopResources(LoopResources.create("event-loop-", 4, true));
        return factory;
    }

    /**
     * Nested Router 구성 예시
     *
     * @param userHandler UserHandler
     * @param codeHandler CodeHandler
     * @return RouterFunction<ServerResponse>
     */
    @Bean
    public RouterFunction<ServerResponse> nestedRouterFunction(UserHandler userHandler, CodeHandler codeHandler) {
        return RouterFunctions
                .route()
                .path("/api/v1", builder -> builder
                        .path("/user", userBuilder -> userBuilder
                                .GET("/users", userHandler::findTbUserByUserNm)
                )
                .build();
    }

}

 

 

5. UserHandler


💡 UserHandler

- 요청된 Endpoint의 요청 값과 응답 값에 대한 처리를 수행합니다.
- 해당 부분에서는 userService의 userRegister()를 호출하였고, 성공할 시 result값을 반환하고, 에러 발생 시 0을 반환하도록 처리가 되었습니다.
package com.blog.springbootwebflux.handler;

import com.blog.springbootwebflux.model.entity.UserEntity;
import com.blog.springbootwebflux.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

/**
 * 사용자 Handler Function
 *
 * @author : leejonghoon
 * @fileName : UserHandler
 * @since : 2024. 12. 4.
 */
@Slf4j
@Component
public class UserHandler {
    private final UserService userService;
    private final TransactionalOperator transactionalOperator;

    public UserHandler(UserService userService, TransactionalOperator transactionalOperator) {
        this.userService = userService;
        this.transactionalOperator = transactionalOperator;
    }

    /**
     * 사용자를 등록하고 성공 메일을 보냅니다.
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> registerUser(ServerRequest request) {
        return request.bodyToMono(UserEntity.class)
                .flatMap(userService::userRegister)
                .flatMap(result -> ServerResponse.ok().bodyValue(result))
                .onErrorResume(e -> ServerResponse.ok().bodyValue(0));
    }
}

 

 

 

6. UserService


 💡 UserService

- 단일의 정수형을 반환받는 userRegister 서비스를 구축하였습니다.
package com.blog.springbootwebflux.service;

import com.blog.springbootwebflux.model.entity.UserEntity;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * 사용자의 비즈니스 로직을 처리하는 Service
 *
 * @author : leejonghoon
 * @fileName : UserService
 * @since : 2024. 12. 19.
 */
@Service
public interface UserService {

    Mono<Integer> userRegister(UserEntity userEntity);
}

 

 

7. UserServiceImpl


 💡 UserServiceImpl

- UserService의 구현체로 아래와 같은 비즈니스 로직을 처리합니다.

1. 사용자 아이디를 조회합니다.

- 조회하여서 아이디가 존재하는 경우 즉시 0으로 반환합니다.
- 조회하지 않는 경우 사용자 등록 비즈니스 로직을 수행합니다.


2. 사용자를 등록합니다

- 사용자 등록이 성공하면, 축하 메일을 전송합니다. 결괏값으로 1을 반환합니다.
package com.blog.springbootwebflux.service.impl;

import com.blog.springbootwebflux.model.dto.MailTxtSendDto;
import com.blog.springbootwebflux.model.entity.UserEntity;
import com.blog.springbootwebflux.repository.UserRepository;
import com.blog.springbootwebflux.service.EmailService;
import com.blog.springbootwebflux.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;

/**
 * Please explain the class!!
 *
 * @author : leejonghoon
 * @fileName : UserServiceImpl
 * @since : 2024. 12. 19.
 */
@Slf4j
@Service
public class UserServiceImpl implements UserService {
    private final UserRepository userRepository;
    private final EmailService emailService;

    public UserServiceImpl(UserRepository userRepository, EmailService emailService) {
        this.userRepository = userRepository;
        this.emailService = emailService;
    }

    @Transactional
    @Override
    public Mono<Integer> userRegister(UserEntity userEntity) {

        // [Service] 사용자 아이디를 조회합니다.
        return this.findUserByUserId(userEntity.getUserId())

                // [CASE1] 아이디가 존재하면, 0의 결과값을 반환합니다.
                .flatMap(existingUser -> Mono.just(0))  // 이미 존재하는 사용자인 경우 0 반환

                // [CASE2] 아이디가 존재하지 않는다면, 사용자를 등록합니다.
                .switchIfEmpty(

                        // [Service] 사용자를 등록합니다.
                        userRepository.save(userEntity)
                                .flatMap(savedUser -> {
                                    MailTxtSendDto mailDto = MailTxtSendDto.builder()
                                            .emailAddr(savedUser.getUserEmail())
                                            .subject(savedUser.getUserId() + "님 회원가입을 축하합니다!")
                                            .content("환영합니다. 회원가입이 완료되었습니다.")
                                            .build();
                                    // 이메일 전송을 별도로 실행하고 결과를 기다리지 않음
                                    emailService.sendTxtEmail(mailDto)
                                            .subscribe(
                                                    null,
                                                    error -> log.error("이메일 전송 실패: {}", error.getMessage())
                                            );

                                    // 즉시 성공 응답 반환
                                    return Mono.just(1);
                                })
                )
                // [CASE3] 회원가입 실패시, 오류메시지와 0의 값을 반환합니다.
                .onErrorResume(e -> {
                    log.debug("회원가입 처리 중 오류 발생: {}", e.getMessage());
                    return Mono.just(0);  // 에러 발생 시 0 반환
                });
    }

    @Override
    @Transactional(readOnly = true)
    public Mono<UserEntity> findUserByUserId(String userId) {
        System.out.println("[+] findUserByUserId 실행 ....");

        Mono<UserEntity> userInfo = userRepository.findTbUserByUserId(userId);
        userInfo.subscribe(
                data -> System.out.println("User data: " + data),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
        );
        System.out.println("findUserByUserId :: " + userInfo.toString());

        return userInfo;
    }
}

 

 

8. UserRepository


💡 UserRepository

- 실제 R2DBC와의 연결을 위해서 ReactiveCrudRepository<UserEntity, Long>로 부터 상속을 받아서 구성합니다.
@Repository
public interface UserRepository extends ReactiveCrudRepository<UserEntity, Long> {

    Mono<UserEntity> findTbUserByUserId(String userId); 
}

 

 

9. UserServiceTest


 💡 UserServiceTest

- 구성한 API를 WebClient를 통해서 직접적으로 10번의 호출을 수행합니다.
- 각기 다른 아이디를 구성하고 데이터를 호출하여서 10번 수행을 합니다.
package com.blog.springbootwebflux.service;

import com.blog.springbootwebflux.model.entity.UserEntity;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * Please explain the class!!
 *
 * @author : leejonghoon
 * @fileName : UserServiceTest
 * @since : 2024. 12. 20.
 */
@SpringBootTest
@AutoConfigureWebTestClient
public class UserServiceTest {

    @Test
    void testMultipleUserRegistrations() {
        WebClient client = WebClient.create("<http://localhost:8080>");

        // 10명의 서로 다른 사용자 데이터 생성
        List<UserEntity> users = IntStream.range(0, 10)
                .mapToObj(i -> UserEntity.builder()
                        .userId("testIdd" + i)
                        .userEmail("adjh54@naver.com")
                        .build())
                .collect(Collectors.toList());

        // 각 사용자에 대해 테스트 실행
        StepVerifier.create(
                        Flux.fromIterable(users)
                                .flatMap(user ->
                                        client.post()
                                                .uri("/api/v1/user/user")
                                                .contentType(MediaType.APPLICATION_JSON)
                                                .bodyValue(user)
                                                .exchange()
                                                .thenReturn(Integer.class)
                                                .elapsed()
                                )
                )
                .expectNextCount(10)
                .verifyComplete();
    }
}

 

 

6) 계층 구조 활용 결과


 

1. 테스트 코드를 수행하여 10번 수행에 따르는 트랜잭션이 생성이 되었습니다.


 

 

2. 순차적인 트랜잭션이 아닌 비 동기적으로 처리가 수행됨을 확인하였습니다.


 

 

3. 최종적으로 이메일이 전송됨을 확인하였습니다.


 

 

 

4. 메일 확인


💡 메일 확인

- 순차적인 처리가 아닌 비동기식으로 처리됨을 확인하였습니다.

 

 

 

7) 동기적 처리와 비교


💡 동기적 처리와 비교

- 비 동기 처리에서는 이벤트 루프 기반의 '논 블로킹' 방식으로 이벤트 루프를 통해서 스레드가 블로킹되지 않고 계속해서 다른 작업의 처리가 되었습니다.
- 반대로 동기적 처리는 아래와 같이 '블로킹' 방식으로 각 요청이 완료될 때까지 기다리는 형태로 처리가 됨을 보여주고 있습니다. 

 

 

 

 

 

오늘도 감사합니다. 😀

 

 

그리드형