728x170
해당 글에서는 이전에 환경을 구성한 것을 기반으로 Java Spring Boot 환경 내에서 설정하고 사용하는 방법을 알아봅니다.
💡 [참고] RabbitMQ에 대해 궁금하시면 아래의 글이 도움이 됩니다.
분류 | 링크 |
Spring Boot AMQP RabbitMQ -1 : 구조 및 종류 이해하기 | https://adjh54.tistory.com/284 |
Spring Boot AMQP RabbitMQ -2 : 로컬 환경 구성하기 | https://adjh54.tistory.com/285 |
Spring Boot AMQP RabbitMQ -3 : Java 환경 구축 및 간단 예시 | https://adjh54.tistory.com/292 |
Spring Boot AMQP RabbitMQ -4 : Exchange 종류 별 이해 및 사용예시 | https://adjh54.tistory.com/497 |
Spring Boot AMQP RabbitMQ -5 : TTL 및 데드 레터링 사용예시 | https://adjh54.tistory.com/501 |
Spring Boot AMQP RabbitMQ -6 : 메시지 큐 종류, 큐 우선순위 | https://adjh54.tistory.com/518 |
Docker : Docker를 이용하여 RabbitMQ 구축하기 | https://adjh54.tistory.com/496 |
Docker : Docker Compose를 이용하여 RabbitMQ Node Cluster 구축하기 | https://adjh54.tistory.com/517 |
API Document : QueueBuilder API Document | https://adjh54.tistory.com/505 |
API Document : ExchangeBuilder API Document | https://adjh54.tistory.com/506 |
API Document : MessageProperties, MessagePropertiesBuilder, MessageBuilderSupport | https://adjh54.tistory.com/508 |
Spring Boot AMQP RabbitMQ Github : Event Producer | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq |
Spring Boot AMQP RabbitMQ Github : Event Consumer | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq-consumer |
1) 라이브러리 설치 및 환경 파일 설정
1. Spring Boot 내에 의존성 추가
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp' // Spring Boot AMQP
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.4.2' // Jackson Databind
}
Maven Repository: org.springframework.boot » spring-boot-starter-amqp
2. 환경 설정 파일을 구성합니다
💡 application.properties 파일 혹은 yml 파일 내에서 구성을 합니다.
💡 사전에 구성해둔 RabbitMQ 서버와 계정을 입력해 줍니다.
2.1. application.properties 파일 구성
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
2.2. yml 파일 구성
spring:
# Spring Boot RabbitMQ 설정
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
💡 [참고] spring.rabbitmq 속성
속성 명 | 설명 |
spring.rabbitmq.host | RabbitMQ 호스트 주소 |
spring.rabbitmq.port | RabbitMQ 포트 번호 |
spring.rabbitmq.username | RabbitMQ 연결에 사용되는 사용자 이름 |
spring.rabbitmq.password | RabbitMQ 연결에 사용되는 비밀번호 |
spring.rabbitmq.virtual-host | RabbitMQ 가상 호스트 이름 |
spring.rabbitmq.listener.simple.concurrency | 간단한 리스너의 동시성 수준 |
spring.rabbitmq.template.exchange | RabbitMQ 템플릿의 기본 교환기 이름 |
spring.rabbitmq.template.routing-key | RabbitMQ 템플릿의 기본 라우팅 키 |
2) PRODUCER 구성
💡 RabbitMQ 생산자(Producer) 구성
- 메시지를 ‘생성’하고 메시지 브로커에 ‘전송’하는 구성요소를 의미합니다.
1. 전체 프로세스 확인
💡 프로세스 구성
💡 [ 실제 호출이 발생한 경우 수행 ]
1. Client → ProceduerController
- Client에서는 api/v1/producer/send로 메시지 타이틀, 메시지에 대해서 담아서 API로 요청을 합니다.
2. ProceduerController → ProceduerService
- Controller는 인터페이스를 호출하여 메시지 정보를 전달한다.
3. ProceduerService → ProceduerServiceImpl
- 서비스의 구현체에서는 RabbitTemplate을 호출하여서 메시지를 전달하여 실제 메시지를 전송합니다.
(* 해당 부분에서는 브로커와 Jackson2JsonMessageConverter를 이용하여 데이터를 전송하기로 지정하여 JSON 타입으로 파싱 하여 브로커로 전달한다)
- exchange name과 Router Key 값을 기반으로 지정합니다.
💡 [ API 서버가 로드될 때 구성 ]
1. RabbitMqConfig
- API 서버가 로드될 때 수행이 됩니다.
2. Exchange, Queue
- 각각 지정한 Exchange Name, Queue Name을 지정합니다.
3. Binding
- 지정한 Exchange, Queue에 따라서 바인딩을 하여 Routing Name을 지정합니다.
4. ConnectionFactory
- Application.properties 파일 내에서 지정한 값을 불러와서 연결에 대한 구성을 수행합니다.
5. MessageConverter
- 메시지 전송 타입을 지정합니다. 전송은 JSON 타입으로 데이터를 주고받기로 지정하였습니다.
6. RabbitTemplate
- 구성한 Connection Factory, MessageConvert를 기반으로 통신을 위한 템플릿을 구성합니다.
7. RabbitMqConfig → ProceduerServiceImpl
- RabbitMq에 대한 구성한 RabbitTemplate을 기반으로 메시지를 보낼 수 있는 구현체를 구성합니다.
- exchange name과 Router Key 값을 기반으로 지정합니다.
[ 더 알아보기 ]
💡 MessageConverter
- MessageConverter를 사용하면 AMQP 메시지를 다른 형식으로 변환하거나, 다른 시스템과의 통신에 필요한 형식으로 변환할 수 있습니다. 이를 통해 메시지를 효과적으로 송수신하고 처리할 수 있습니다.
💡 MessageConverter의 종류
구현체 | 설명 |
Jackson2JsonMessageConverter | Jackson JSON 라이브러리를 사용하여 메시지 직렬화 및 역직렬화 |
SimpleMessageConverter | 기본적인 직렬화 및 역직렬화를 수행하는 간단한 메시지 컨버터 |
MarshallingMessageConverter | Marshaller와 Unmarshaller를 사용하여 XML 메시지 처리 |
ByteArrayMessageConverter | 바이트 배열 형식의 메시지를 처리하는 메시지 컨버터 |
MappingJackson2MessageConverter | Jackson JSON 라이브러리를 사용하여 메시지 직렬화 및 역직렬화 |
2. RabbitMQ 환경 파일 구성
💡 RabbitMQ 환경 파일 구성
- Spring Boot 환경에서 RabbitMQ와의 연결을 구성하고, 큐, 익스체인지, 바인딩, RabbitTemplate, ConnectionFactory, MessageConverter 등을 설정하는 클래스를 의미합니다.
💡 RabbitMQ 환경 파일 구성 순서
1. Direct Exchange 구성
2. 큐 구성
3. 구성한 큐와 Direct Exchange를 바인딩
4. ConnectionFactory 구성
5. 데이터 통신을 위한 messageConverter 구성
6. 템플릿을 통하여 구성한 환경 내의 통신을 수행하도록 합니다.
[ 다시 알아보기 ]
💡 Direct exchange
- ‘라우팅 키(Routing Key)’를 기반으로 메시지를 큐로 라우팅 합니다. 바인딩 키가 메시지의 라우팅 키와 정확히 일치하는 큐로 메시지가 라우팅 됩니다.
package com.adjh.multiflexapi.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ의 설정파일 입니다.
*
* @author : jonghoon
* @fileName : RabbitmqConfig
* @since : 10/15/23
*/
@Configuration
public class RabbitmqConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.port}")
private int port;
/**
* 1. Exchange 구성합니다.
* "hello.exchange" 라는 이름으로 Direct Exchange 형태로 구성하였습니다.
*
* @return DirectExchange
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange("hello.exchange");
}
/**
* 2. 큐를 구성합니다.
* "hello.queue"라는 이름으로 큐를 구성하였습니다.
*
* @return Queue
*/
@Bean
Queue queue() {
return new Queue("hello.queue", false);
}
/**
* 3. 큐와 DirectExchange를 바인딩합니다.
* "hello.key"라는 이름으로 바인딩을 구성하였습니다.
*
* @param directExchange
* @param queue
* @return Binding
*/
@Bean
Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("hello.key");
}
/**
* 4. RabbitMQ와의 연결을 위한 ConnectionFactory을 구성합니다.
* Application.properties의 RabbitMQ의 사용자 정보를 가져와서 RabbitMQ와의 연결에 필요한 ConnectionFactory를 구성합니다.
*
* @return ConnectionFactory
*/
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
/**
* 5. 메시지를 전송하고 수신하기 위한 JSON 타입으로 메시지를 변경합니다.
* Jackson2JsonMessageConverter를 사용하여 메시지 변환을 수행합니다. JSON 형식으로 메시지를 전송하고 수신할 수 있습니다
*
* @return
*/
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 6. 구성한 ConnectionFactory, MessageConverter를 통해 템플릿을 구성합니다.
*
* @param connectionFactory
* @param messageConverter
* @return
*/
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
2. DTO 구성
💡 DTO 구성
- RabbitMQ 내에서 통신을 수행할 객체를 구성합니다.
package com.adjh.multiflexapi.model;
import lombok.*;
/**
* 메시지 정보를 관리합니다.
*
* @author : jonghoon
* @fileName : MessageDto
* @since : 10/15/23
*/
@Getter
@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class MessageDto {
private String title;
private String message;
@Builder
public MessageDto(String title, String message) {
this.title = title;
this.message = message;
}
}
3. Service 구성 : 인터페이스
💡 Service 구성
- 메시지 전송을 위한 인터페이스
package com.adjh.multiflexapi.service;
import com.adjh.multiflexapi.model.MessageDto;
/**
* ProducerService Interface
*
* @author : jonghoon
* @fileName : ProducerService
* @since : 10/21/23
*/
public interface ProducerService {
// 메시지를 큐로 전송 합니다.
void sendMessage(MessageDto messageDto);
}
4. ServiceImpl 구성 : 구현체
💡 ServiceImpl 구성
- 인터페이스의 실제 구현체이며 RabbitTemplate을 호출하며 convertAndSend() 메서드를 이용하여 exchange 이름과 binding Routing key를 기반으로 메시지를 전달합니다.
- 해당 브로커와 데이터 통신 방법으로는 Jackson2JsonMessageConverter()를 사용하기로 정하였기에 Object를 JSON으로 파싱을 해야 합니다.
package com.adjh.multiflexapi.service.impl;
import com.adjh.multiflexapi.model.MessageDto;
import com.adjh.multiflexapi.service.ProducerService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.json.simple.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* ProducerService 구현체
*
* @author : jonghoon
* @fileName : ProducerServiceImpl
* @since : 10/15/23
*/
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(MessageDto messageDto) {
try {
// 객체를 JSON으로 변환
ObjectMapper objectMapper = new ObjectMapper();
String objectToJSON = objectMapper.writeValueAsString(messageDto);
rabbitTemplate.convertAndSend("hello.exchange", "hello.key", objectToJSON);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
💡 [참고] RabbitTemplate 메서드
메서드 | 설명 |
convertAndSend(String routingKey, Object message) | 지정된 라우팅 키로 메시지를 변환하여 전송합니다. |
convertAndSend(String exchange, String routingKey, Object message) | 지정된 익스체인지와 라우팅 키로 메시지를 변환하여 전송합니다. |
convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) | 지정된 익스체인지와 라우팅 키로 메시지를 변환하고, 후처리기를 적용하여 전송합니다. |
send(String routingKey, Message message) | 지정된 라우팅 키로 메시지를 전송합니다. |
send(String exchange, String routingKey, Message message) | 지정된 익스체인지와 라우팅 키로 메시지를 전송합니다. |
send(String exchange, String routingKey, Message message, MessagePostProcessor messagePostProcessor) | 지정된 익스체인지와 라우팅 키로 메시지를 전송하고, 후처리기를 적용합니다. |
convertSendAndReceive(String routingKey, Object message) | 지정된 라우팅 키로 메시지를 변환하여 전송하고, 응답을 받습니다. |
convertSendAndReceive(String exchange, String routingKey, Object message) | 지정된 익스체인지와 라우팅 키로 메시지를 변환하여 전송하고, 응답을 받습니다. |
convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) | 지정된 익스체인지와 라우팅 키로 메시지를 변환하고, 후처리기를 적용하여 전송하고, 응답을 받습니다. |
sendAndReceive(String routingKey, Message message) | 지정된 라우팅 키로 메시지를 전송하고, 응답을 받습니다. |
sendAndReceive(String exchange, String routingKey, Message message) | 지정된 익스체인지와 라우팅 키로 메시지를 전송하고, 응답을 받습니다. |
sendAndReceive(String exchange, String routingKey, Message message, MessagePostProcessor messagePostProcessor) | 지정된 익스체인지와 라우팅 키로 메시지를 전송하고, 후처리기를 적용하여 응답을 받습니다. |
5. Controller 구성
💡 Controller 구성
- 큐 전송을 위한 서비스를 호출하여 API로 요청이 들어온 경우 RabbitMQ 내에 메시지를 큐에 저장합니다.
package com.adjh.multiflexapi.controller;
import com.adjh.multiflexapi.common.codes.SuccessCode;
import com.adjh.multiflexapi.common.response.ApiResponse;
import com.adjh.multiflexapi.model.MessageDto;
import com.adjh.multiflexapi.service.ProducerService;
import com.adjh.multiflexapi.service.impl.ProducerServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Please explain the class!!
*
* @author : jonghoon
* @fileName : ProducerController
* @since : 10/15/23
*/
@Slf4j
@RestController
@RequestMapping(value = "/api/v1/producer")
public class ProducerController {
@Autowired
private ProducerService producerService;
/**
* 생산자(Proceduer)가 메시지를 전송합니다.
*
* @param messageDto
* @return
*/
@PostMapping("/send")
public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) {
String result = "";
producerService.sendMessage(messageDto);
ApiResponse ar = ApiResponse.builder()
.result(result)
.resultCode(SuccessCode.SELECT.getStatus())
.resultMsg(SuccessCode.SELECT.getMessage())
.build();
return new ResponseEntity<>(ar, HttpStatus.OK);
}
}
6. 구성한 API로 호출을 보내서 성공을 확인하였습니다.
7. RabbitMQ Connection 연결 확인
💡 로컬 RabbitMQ 경로는 http://localhost:15672/ 에서 Connections 탭에서 연결이 됨을 확인할 수 있습니다.
8. Exchange의 확인
3) CONSUMER 구성
💡 RabbitMQ 소비자(Consumer) 구성
- 메시지 브로커에서 메시지를 가져와서 ‘수신’하고 ‘처리’를 하는 주체를 의미합니다.
1. CONSUMER를 구성합니다
💡 PROCEDUER로부터 전송받은 데이터를 수신하는 CONSUMER를 구성합니다
- @RabbitListener 어노테이션을 이용하며 속성으로 PROCEDUER에서 전송하는 큐 이름에 대해 지정하면 해당 메시지를 전달받을 수 있습니다.
package com.adjh.multiflexapi.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* PROCEDUER로부터 전달받은 데이터를 수신
*
* @author : jonghoon
* @fileName : ConsumerService
* @since : 10/21/23
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ConsumerService {
@RabbitListener(queues = "hello.queue")
public void receiveMessage(String msg) {
log.debug("결과값을 받는다 :: " + msg);
}
}
속성 | 설명 |
id | 리스너 컨테이너의 고유 식별자입니다. |
concurrency | 리스너에 대해 생성할 동시 소비자의 수입니다. |
autoStartup | 리스너 컨테이너가 자동으로 시작해야 하는지 여부입니다. |
errorHandler | 리스너에서 발생하는 예외를 처리하기 위해 사용할 에러 핸들러입니다. |
ackMode | 리스너의 확인 모드입니다. |
containerFactory | 리스너 컨테이너를 생성하기 위해 사용할 컨테이너 팩토리의 빈 이름입니다. |
errorHandler | 리스너에서 발생하는 예외를 처리하기 위해 사용할 에러 핸들러입니다. |
topics | 구독할 토픽입니다. |
queues | 수신 대기할 큐입니다. |
groupId | 리스너의 그룹 ID입니다. |
clientId | 리스너의 클라이언트 ID입니다. |
containerType | 리스너 컨테이너의 유형입니다. |
concurrencyPolicy | 리스너 컨테이너를 확장할 때 사용할 동시성 정책입니다. |
assignmentSelector | 파티션을 리스너 컨테이너에 할당하기 위해 사용할 할당 선택자입니다. |
2. PROCEDUER의 메시지 전송 및 CONSUMER의 메시지 수신
2.1. PROCEDER로 API를 호출합니다
2.2. CONSUMER는 메시지를 수신하여 콘솔에 메시지를 출력합니다.
💡 [참고] Spring Boot AMQP RabbitMQ에 대해 더 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
구분 | 링크 |
Spring Boot AMQP RabbitMQ-1 : 구조 및 종류 | https://adjh54.tistory.com/284 |
Spring Boot AMQP RabbitMQ-2 : 로컬환경 구성 | https://adjh54.tistory.com/285 |
Spring Boot AMQP RabbitMQ-3: Java 구축 및 간단 예제 | https://adjh54.tistory.com/292 |
오늘도 감사합니다. 😀
그리드형
'Java > Message Queue' 카테고리의 다른 글
[Java] Spring Boot AMQP RabbitMQ 이해하기 -6 : 메시지 큐 종류, 큐 우선순위 (0) | 2024.06.29 |
---|---|
[Java] Spring Boot AMQP RabbitMQ 이해하기 -5 : TTL 및 데드 레터링 사용예시 (0) | 2024.06.05 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -4 : RabbitMQ Exchange 종류 별 이해 및 사용예시 (0) | 2024.05.30 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -2 : 로컬 환경 구성 (2) | 2023.10.15 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -1 : 구조 및 종류 (1) | 2023.10.14 |