Java/Message Queue

[Java] Spring Boot AMQP RabbitMQ 이해하기 -3 : Java 구축 및 간단 예제

adjh54 2023. 10. 21. 14:47
728x170

 

 

 

해당 글에서는 이전에 환경을 구성한 것을 기반으로 Java Spring Boot 환경 내에서 설정하고 사용하는 방법을 알아봅니다.


 

 

 

💡 [참고] RabbitMQ에 대해 궁금하시면 아래의 글이 도움이 됩니다.
분류 링크
Spring Boot AMQP RabbitMQ -1 : 구조 및 종류 이해하기 https://adjh54.tistory.com/284
Spring Boot AMQP RabbitMQ -2 : 로컬 환경 구성하기 https://adjh54.tistory.com/285
Spring Boot AMQP RabbitMQ -3 : Java 환경 구축 및 간단 예시 https://adjh54.tistory.com/292
Spring Boot AMQP RabbitMQ -4 : Exchange 종류 별 이해 및 사용예시 https://adjh54.tistory.com/497
Spring Boot AMQP RabbitMQ -5 : TTL 및 데드 레터링 사용예시 https://adjh54.tistory.com/501
Spring Boot AMQP RabbitMQ -6 : 메시지 큐 종류, 큐 우선순위 https://adjh54.tistory.com/518
Docker : Docker를 이용하여 RabbitMQ 구축하기 https://adjh54.tistory.com/496
Docker : Docker Compose를 이용하여 RabbitMQ Node Cluster 구축하기 https://adjh54.tistory.com/517
API Document : QueueBuilder API Document https://adjh54.tistory.com/505
API Document : ExchangeBuilder API Document https://adjh54.tistory.com/506
API Document : MessageProperties, MessagePropertiesBuilder, MessageBuilderSupport https://adjh54.tistory.com/508
Spring Boot AMQP RabbitMQ Github : Event Producer https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq
Spring Boot AMQP RabbitMQ Github : Event Consumer https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq-consumer

 

 

 

1) 라이브러리 설치 및 환경 파일 설정


 

1. Spring Boot 내에 의존성 추가


dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'                  // Spring Boot AMQP
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.4.2'               // Jackson Databind
}

Maven Repository: org.springframework.boot » spring-boot-starter-amqp

 

 

2. 환경 설정 파일을 구성합니다


💡 application.properties 파일 혹은 yml 파일 내에서 구성을 합니다.
💡 사전에 구성해둔 RabbitMQ 서버와 계정을 입력해 줍니다.

 

 

 

2.1. application.properties 파일 구성

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

 

 

2.2. yml 파일 구성

spring:
  # Spring Boot RabbitMQ 설정
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

 

 💡 [참고] spring.rabbitmq 속성
속성 명 설명
spring.rabbitmq.host RabbitMQ 호스트 주소
spring.rabbitmq.port RabbitMQ 포트 번호
spring.rabbitmq.username RabbitMQ 연결에 사용되는 사용자 이름
spring.rabbitmq.password RabbitMQ 연결에 사용되는 비밀번호
spring.rabbitmq.virtual-host RabbitMQ 가상 호스트 이름
spring.rabbitmq.listener.simple.concurrency 간단한 리스너의 동시성 수준
spring.rabbitmq.template.exchange RabbitMQ 템플릿의 기본 교환기 이름
spring.rabbitmq.template.routing-key RabbitMQ 템플릿의 기본 라우팅 키

 

 

 

 

 

2) PRODUCER 구성


 💡 RabbitMQ 생산자(Producer) 구성

- 메시지를 ‘생성’하고 메시지 브로커에 ‘전송’하는 구성요소를 의미합니다.

 

 

1. 전체 프로세스 확인


 

💡 프로세스 구성

💡  [ 실제 호출이 발생한 경우 수행 ]


1. Client → ProceduerController
- Client에서는 api/v1/producer/send로 메시지 타이틀, 메시지에 대해서 담아서 API로 요청을 합니다.

2. ProceduerController → ProceduerService
- Controller는 인터페이스를 호출하여 메시지 정보를 전달한다.

3. ProceduerService → ProceduerServiceImpl
- 서비스의 구현체에서는 RabbitTemplate을 호출하여서 메시지를 전달하여 실제 메시지를 전송합니다.
(* 해당 부분에서는 브로커와 Jackson2JsonMessageConverter를 이용하여 데이터를 전송하기로 지정하여 JSON 타입으로 파싱 하여 브로커로 전달한다)
- exchange name과 Router Key 값을 기반으로 지정합니다.


💡  [ API 서버가 로드될 때 구성 ]

1. RabbitMqConfig
- API 서버가 로드될 때 수행이 됩니다.

2. Exchange, Queue
- 각각 지정한 Exchange Name, Queue Name을 지정합니다.

3. Binding
- 지정한 Exchange, Queue에 따라서 바인딩을 하여 Routing Name을 지정합니다.

4. ConnectionFactory
- Application.properties 파일 내에서 지정한 값을 불러와서 연결에 대한 구성을 수행합니다.

5. MessageConverter
- 메시지 전송 타입을 지정합니다. 전송은 JSON 타입으로 데이터를 주고받기로 지정하였습니다.

6. RabbitTemplate
- 구성한 Connection Factory, MessageConvert를 기반으로 통신을 위한 템플릿을 구성합니다.

7. RabbitMqConfig → ProceduerServiceImpl
- RabbitMq에 대한 구성한 RabbitTemplate을 기반으로 메시지를 보낼 수 있는 구현체를 구성합니다.
- exchange name과 Router Key 값을 기반으로 지정합니다.

 

[ 더 알아보기 ]

💡 MessageConverter


- MessageConverter를 사용하면 AMQP 메시지를 다른 형식으로 변환하거나, 다른 시스템과의 통신에 필요한 형식으로 변환할 수 있습니다. 이를 통해 메시지를 효과적으로 송수신하고 처리할 수 있습니다.

💡 MessageConverter의 종류
구현체 설명
Jackson2JsonMessageConverter Jackson JSON 라이브러리를 사용하여 메시지 직렬화 및 역직렬화
SimpleMessageConverter 기본적인 직렬화 및 역직렬화를 수행하는 간단한 메시지 컨버터
MarshallingMessageConverter Marshaller와 Unmarshaller를 사용하여 XML 메시지 처리
ByteArrayMessageConverter 바이트 배열 형식의 메시지를 처리하는 메시지 컨버터
MappingJackson2MessageConverter Jackson JSON 라이브러리를 사용하여 메시지 직렬화 및 역직렬화

 

 

 

2. RabbitMQ 환경 파일 구성


 

💡 RabbitMQ 환경 파일 구성

- Spring Boot 환경에서 RabbitMQ와의 연결을 구성하고, 큐, 익스체인지, 바인딩, RabbitTemplate, ConnectionFactory, MessageConverter 등을 설정하는 클래스를 의미합니다.

💡 RabbitMQ 환경 파일 구성 순서

1. Direct Exchange 구성

2. 큐 구성

3. 구성한 큐와 Direct Exchange를 바인딩

4. ConnectionFactory 구성

5. 데이터 통신을 위한 messageConverter 구성

6. 템플릿을 통하여 구성한 환경 내의 통신을 수행하도록 합니다.

 

 [ 다시 알아보기 ]

💡 Direct exchange


- ‘라우팅 키(Routing Key)’를 기반으로 메시지를 큐로 라우팅 합니다. 바인딩 키가 메시지의 라우팅 키와 정확히 일치하는 큐로 메시지가 라우팅 됩니다.

 

 

 

package com.adjh.multiflexapi.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * RabbitMQ의 설정파일 입니다.
 *
 * @author : jonghoon
 * @fileName : RabbitmqConfig
 * @since : 10/15/23
 */
@Configuration
public class RabbitmqConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.port}")
    private int port;


    /**
     * 1. Exchange 구성합니다.
     * "hello.exchange" 라는 이름으로 Direct Exchange 형태로 구성하였습니다.
     *
     * @return DirectExchange
     */
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("hello.exchange");
    }

    /**
     * 2. 큐를 구성합니다.
     * "hello.queue"라는 이름으로 큐를 구성하였습니다.
     *
     * @return Queue
     */
    @Bean
    Queue queue() {
        return new Queue("hello.queue", false);
    }


    /**
     * 3. 큐와 DirectExchange를 바인딩합니다.
     * "hello.key"라는 이름으로 바인딩을 구성하였습니다.
     *
     * @param directExchange
     * @param queue
     * @return Binding
     */
    @Bean
    Binding binding(DirectExchange directExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("hello.key");
    }


    /**
     * 4. RabbitMQ와의 연결을 위한 ConnectionFactory을 구성합니다.
     * Application.properties의 RabbitMQ의 사용자 정보를 가져와서 RabbitMQ와의 연결에 필요한 ConnectionFactory를 구성합니다.
     *
     * @return ConnectionFactory
     */
    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    /**
     * 5. 메시지를 전송하고 수신하기 위한 JSON 타입으로 메시지를 변경합니다.
     * Jackson2JsonMessageConverter를 사용하여 메시지 변환을 수행합니다. JSON 형식으로 메시지를 전송하고 수신할 수 있습니다
     *
     * @return
     */
    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    /**
     * 6. 구성한 ConnectionFactory, MessageConverter를 통해 템플릿을 구성합니다.
     *
     * @param connectionFactory
     * @param messageConverter
     * @return
     */
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        return rabbitTemplate;
    }

}

 

 

 

2. DTO 구성


💡 DTO 구성

- RabbitMQ 내에서 통신을 수행할 객체를 구성합니다.
package com.adjh.multiflexapi.model;

import lombok.*;

/**
 * 메시지 정보를 관리합니다.
 *
 * @author : jonghoon
 * @fileName : MessageDto
 * @since : 10/15/23
 */
@Getter
@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class MessageDto {
    private String title;
    private String message;

    @Builder
    public MessageDto(String title, String message) {
        this.title = title;
        this.message = message;
    }
}

 

 

 

3. Service 구성 : 인터페이스


 

💡 Service 구성

- 메시지 전송을 위한 인터페이스
package com.adjh.multiflexapi.service;

import com.adjh.multiflexapi.model.MessageDto;

/**
 * ProducerService Interface
 *
 * @author : jonghoon
 * @fileName : ProducerService
 * @since : 10/21/23
 */
public interface ProducerService {

    // 메시지를 큐로 전송 합니다.
    void sendMessage(MessageDto messageDto);
}

 

 

 

4. ServiceImpl 구성 : 구현체


💡 ServiceImpl 구성

- 인터페이스의 실제 구현체이며 RabbitTemplate을 호출하며 convertAndSend() 메서드를 이용하여 exchange 이름과 binding Routing key를 기반으로 메시지를 전달합니다.

- 해당 브로커와 데이터 통신 방법으로는 Jackson2JsonMessageConverter()를 사용하기로 정하였기에 Object를 JSON으로 파싱을 해야 합니다.

package com.adjh.multiflexapi.service.impl;

import com.adjh.multiflexapi.model.MessageDto;
import com.adjh.multiflexapi.service.ProducerService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.json.simple.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * ProducerService 구현체
 *
 * @author : jonghoon
 * @fileName : ProducerServiceImpl
 * @since : 10/15/23
 */
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(MessageDto messageDto) {
        try {
            // 객체를 JSON으로 변환
            ObjectMapper objectMapper = new ObjectMapper();
            String objectToJSON = objectMapper.writeValueAsString(messageDto);
            rabbitTemplate.convertAndSend("hello.exchange", "hello.key", objectToJSON);
        } catch (JsonProcessingException jpe) {
            System.out.println("파싱 오류 발생");
        }

    }
}

 

💡 [참고] RabbitTemplate 메서드

 

메서드 설명
convertAndSend(String routingKey, Object message) 지정된 라우팅 키로 메시지를 변환하여 전송합니다.
convertAndSend(String exchange, String routingKey, Object message) 지정된 익스체인지와 라우팅 키로 메시지를 변환하여 전송합니다.
convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) 지정된 익스체인지와 라우팅 키로 메시지를 변환하고, 후처리기를 적용하여 전송합니다.
send(String routingKey, Message message) 지정된 라우팅 키로 메시지를 전송합니다.
send(String exchange, String routingKey, Message message) 지정된 익스체인지와 라우팅 키로 메시지를 전송합니다.
send(String exchange, String routingKey, Message message, MessagePostProcessor messagePostProcessor) 지정된 익스체인지와 라우팅 키로 메시지를 전송하고, 후처리기를 적용합니다.
convertSendAndReceive(String routingKey, Object message) 지정된 라우팅 키로 메시지를 변환하여 전송하고, 응답을 받습니다.
convertSendAndReceive(String exchange, String routingKey, Object message) 지정된 익스체인지와 라우팅 키로 메시지를 변환하여 전송하고, 응답을 받습니다.
convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) 지정된 익스체인지와 라우팅 키로 메시지를 변환하고, 후처리기를 적용하여 전송하고, 응답을 받습니다.
sendAndReceive(String routingKey, Message message) 지정된 라우팅 키로 메시지를 전송하고, 응답을 받습니다.
sendAndReceive(String exchange, String routingKey, Message message) 지정된 익스체인지와 라우팅 키로 메시지를 전송하고, 응답을 받습니다.
sendAndReceive(String exchange, String routingKey, Message message, MessagePostProcessor messagePostProcessor) 지정된 익스체인지와 라우팅 키로 메시지를 전송하고, 후처리기를 적용하여 응답을 받습니다.

 

 

5. Controller 구성


 

 

 

💡 Controller 구성

- 큐 전송을 위한 서비스를 호출하여 API로 요청이 들어온 경우 RabbitMQ 내에 메시지를 큐에 저장합니다.
package com.adjh.multiflexapi.controller;

import com.adjh.multiflexapi.common.codes.SuccessCode;
import com.adjh.multiflexapi.common.response.ApiResponse;
import com.adjh.multiflexapi.model.MessageDto;
import com.adjh.multiflexapi.service.ProducerService;
import com.adjh.multiflexapi.service.impl.ProducerServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Please explain the class!!
 *
 * @author : jonghoon
 * @fileName : ProducerController
 * @since : 10/15/23
 */
@Slf4j
@RestController
@RequestMapping(value = "/api/v1/producer")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    /**
     * 생산자(Proceduer)가 메시지를 전송합니다.
     *
     * @param messageDto
     * @return
     */
    @PostMapping("/send")
    public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) {
        String result = "";

        producerService.sendMessage(messageDto);
        ApiResponse ar = ApiResponse.builder()
                .result(result)
                .resultCode(SuccessCode.SELECT.getStatus())
                .resultMsg(SuccessCode.SELECT.getMessage())
                .build();
        return new ResponseEntity<>(ar, HttpStatus.OK);
    }
}

 

 

 

 

6. 구성한 API로 호출을 보내서 성공을 확인하였습니다.


 

 

 

7. RabbitMQ Connection 연결 확인


💡 로컬 RabbitMQ 경로는 http://localhost:15672/ 에서 Connections 탭에서 연결이 됨을 확인할 수 있습니다.

 

 

8. Exchange의 확인


 

 

 

 

 

 

3) CONSUMER 구성


💡 RabbitMQ 소비자(Consumer) 구성

- 메시지 브로커에서 메시지를 가져와서 ‘수신’하고 ‘처리’를 하는 주체를 의미합니다.

 

 

1. CONSUMER를 구성합니다


 💡 PROCEDUER로부터 전송받은 데이터를 수신하는 CONSUMER를 구성합니다

- @RabbitListener 어노테이션을 이용하며 속성으로 PROCEDUER에서 전송하는 큐 이름에 대해 지정하면 해당 메시지를 전달받을 수 있습니다.
package com.adjh.multiflexapi.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * PROCEDUER로부터 전달받은 데이터를 수신
 *
 * @author : jonghoon
 * @fileName : ConsumerService
 * @since : 10/21/23
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class ConsumerService {

    @RabbitListener(queues = "hello.queue")
    public void receiveMessage(String msg) {
        log.debug("결과값을 받는다 :: " + msg);
    }
}

 

속성 설명
id 리스너 컨테이너의 고유 식별자입니다.
concurrency 리스너에 대해 생성할 동시 소비자의 수입니다.
autoStartup 리스너 컨테이너가 자동으로 시작해야 하는지 여부입니다.
errorHandler 리스너에서 발생하는 예외를 처리하기 위해 사용할 에러 핸들러입니다.
ackMode 리스너의 확인 모드입니다.
containerFactory 리스너 컨테이너를 생성하기 위해 사용할 컨테이너 팩토리의 빈 이름입니다.
errorHandler 리스너에서 발생하는 예외를 처리하기 위해 사용할 에러 핸들러입니다.
topics 구독할 토픽입니다.
queues 수신 대기할 큐입니다.
groupId 리스너의 그룹 ID입니다.
clientId 리스너의 클라이언트 ID입니다.
containerType 리스너 컨테이너의 유형입니다.
concurrencyPolicy 리스너 컨테이너를 확장할 때 사용할 동시성 정책입니다.
assignmentSelector 파티션을 리스너 컨테이너에 할당하기 위해 사용할 할당 선택자입니다.

 

 

 

2. PROCEDUER의 메시지 전송 및 CONSUMER의 메시지 수신


 

2.1. PROCEDER로 API를 호출합니다

 

 

2.2. CONSUMER는 메시지를 수신하여 콘솔에 메시지를 출력합니다.

 

 

 

 

💡 [참고] Spring Boot AMQP RabbitMQ에 대해 더 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
구분 링크
Spring Boot AMQP RabbitMQ-1 : 구조 및 종류 https://adjh54.tistory.com/284
Spring Boot AMQP RabbitMQ-2 : 로컬환경 구성 https://adjh54.tistory.com/285
Spring Boot AMQP RabbitMQ-3: Java 구축 및 간단 예제 https://adjh54.tistory.com/292


 

 

 

 

 

 

오늘도 감사합니다. 😀

 



 

그리드형