반응형
반응형
해당 글에서는 Spring Boot 환경에서 RabbitMQ를 활용하여 Exchange 종류 별로 데이터 생성 및 전송 방법에 대해 알아봅니다.
💡 [참고] RabbitMQ에 대해 궁금하시면 아래의 글이 도움이 됩니다.
분류 | 링크 |
Spring Boot AMQP RabbitMQ -1 : 구조 및 종류 이해하기 | https://adjh54.tistory.com/284 |
Spring Boot AMQP RabbitMQ -2 : 로컬 환경 구성하기 | https://adjh54.tistory.com/285 |
Spring Boot AMQP RabbitMQ -3 : Java 환경 구축 및 간단 예시 | https://adjh54.tistory.com/292 |
Spring Boot AMQP RabbitMQ -4 : Exchange 종류 별 이해 및 사용예시 | https://adjh54.tistory.com/497 |
Spring Boot AMQP RabbitMQ -5 : TTL 및 데드 레터링 사용예시 | https://adjh54.tistory.com/501 |
Spring Boot AMQP RabbitMQ -6 : 메시지 큐 종류, 큐 우선순위 | https://adjh54.tistory.com/518 |
Docker : Docker를 이용하여 RabbitMQ 구축하기 | https://adjh54.tistory.com/496 |
Docker : Docker Compose를 이용하여 RabbitMQ Node Cluster 구축하기 | https://adjh54.tistory.com/517 |
API Document : QueueBuilder API Document | https://adjh54.tistory.com/505 |
API Document : ExchangeBuilder API Document | https://adjh54.tistory.com/506 |
API Document : MessageProperties, MessagePropertiesBuilder, MessageBuilderSupport | https://adjh54.tistory.com/508 |
Spring Boot AMQP RabbitMQ Github : Event Producer | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq |
Spring Boot AMQP RabbitMQ Github : Event Consumer | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq-consumer |
1) RabbitMQ
💡 RabbitMQ
- 오픈 소스 기반 메시지 브로커 소프트웨어로 분산 시스템에서 ‘메시지나 데이터’를 전달하기 위한 중간 매개체로 사용됩니다.
- AMQP(Advanced Message Queuing Protocol)를 기반으로 하며 다양한 프로그래밍 언어와 플랫폼에서 사용할 수 있습니다.
- 메시지 큐를 사용하여 메시지의 생산자와 소비자 사이에 '비동기적인 통신'함으로써 메시지는 큐에 저장되어 생산자와 소비자가 독립적으로 작동할 수 있습니다. 또한 RabbitMQ를 이용하여 메시지를 안전하게 전달하고 처리하는데 중점을 둡니다.
💡 [참고] RabbitMQ에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
1. Exchange - (Binding) - Queue 관계
💡 Exchange - (Binding) - Queue 관계
- RabbitMQ 내에서 ‘Exchange 방식’에 따라 Queue 내에 ‘데이터’가 적재되는 방식이 달라지기에 각각 수행되는 방법에 대해 알아봅니다.
- Exchange는 메시지를 수신하고 처리할 대상을 결정하는 요소를 의미합니다.
- Queue는 메시지를 저장하는 공간이며 이벤트 생성자와 이벤트 소비자 간의 데이터를 주고받는 공간을 의미합니다.
- 생성자(Producer) 입장에서는 ‘메시지’를 생성하여 Exchange를 선택하여 메시지를 전송합니다. 해당 메시지는 특정 규칙에 따라 Queue에 데이터를 저장하게 됩니다.
- 소비자(Consumer) 입장에서는 ‘Queue’를 바라보는 리스너를 구축하여, Queue 내에 데이터가 들어오게 되면 소비자에게 전달이 됩니다.
💡 [참고] RabbitMQ의 주요 용어
용어 | 분류 |
생산자(Producer) | - 메시지를 ‘생성’하고 메시지 브로커에 ‘전송’하는 구성요소를 의미합니다. |
메시지(Message) | - 송신자와 수신자간에 전송이 되는 ‘데이터 덩어리’를 의미합니다. |
메시지 브로커(Message Broker) | - 메시지를 생성하고 소비하는 클라이언트 간에 메시지를 전달하는 미들웨어입니다.ex) Apache Kafka, RabbitMQ, Amazon Kinesis |
교환기(Exchange) | - 메시지를 수신하고 처리할 대상을 결정하는 구성요소를 의미합니다. - 주로 메시지를 수신하고 라우팅 알고리즘을 통해 특정 큐로 보내는 역할을 수행합니다. |
소비자(Consumer) | - 메시지 브로커에서 메시지를 가져와서 ‘수신’하고 ‘처리’를 하는 주체를 의미합니다. |
바인딩(Binding) | - 클라이언트 애플리케이션과 메시징 시스템 간의 연결을 하는 과정을 의미합니다. |
메시지 큐(Message Queue) | - 메시징 시스템에 저장된 메시지를 의미하며 송신자가 메시지를 전송하면 이를 수신자가 처리할때까지 대기시킵니다. |
2. Exchange 유형
💡 Exchange 유형
- 메시지를 수신하고 처리할 대상을 결정하는 역할을 수행하는 Exchange의 유형에 대해 알아봅니다.
Exchange 유형 | 설명 |
RabbitMQ Direct Exchange | ‘라우팅 키‘를 기반으로 메시지를 큐로 라우팅합니다. 바인딩 키가 메시지의 라우팅 키와 '정확히 일치하는 큐'로 메시지가 라우팅됩니다. |
RabbitMQ Fanout Exchange | 라우팅 키에 관계없이 바인딩된 ‘모든 큐‘로 메시지를 라우팅합니다. 여러 소비자에게 메시지를 브로드캐스트하는데 유용합니다. |
RabbitMQ Topic Exchange | 라우팅 키의 ‘라우팅 패턴 매칭‘에 따라 메시지를 큐로 라우팅합니다. 라우팅 패턴 간의 와일드카드(*) 혹은 해시(#)가 일치해야만 수행합니다. |
RabbitMQ Headers Exchange | 라우팅 키 대신 '헤더 속성 값'에 따라 메시지를 큐로 라우팅합니다. 헤더 값은 소비자가 지정한 헤더와 일치해야 메시지가 라우팅됩니다. |
3. 라우팅 키(Routing)
💡 라우팅 키(Routing)
- RabbitMQ 메시지 브로커에서 메시지를 특정 큐로 라우팅 하는 데 사용되는 문자열입니다. 이 문자열은 일반적으로 여러 단어로 구성되며, 각 단어는 점 (.)으로 구분됩니다.
- 라우팅 키의 각 부분은 메시지를 분류하는 데 사용되는 다른 기준을 나타낼 수 있습니다.
- 라우팅 키는 <카테고리.세부 항목> 형태로 구성이 됩니다.
- 예를 들어서 'animals.rabbit', 'plants.tree', 'vehicles.car'등이 있는데, 'animals', 'plants', 'vehicles'는 각각의 카테고리를 나타내고, 그다음에 오는 'rabbit', 'tree', 'car'는 그 카테고리 안의 항목을 나타냅니다.
💡 사용 예시
- Direct Exchange에서 ‘라우팅 키’를 기반으로 생산자와 소비자 간의 데이터 전송을 수행합니다.
@Bean
DirectExchange directExchange() {
return new DirectExchange("exchange.direct");
}
@Bean
Queue queue1() {
// queue1 이름의 큐를 구성합니다.
return new Queue("queue1", false);
}
@Bean
Binding directBinding(DirectExchange directExchange, Queue queue1) {
return BindingBuilder
.bind(queue1)
.to(directExchange)
.with("order.pizza"); // 라우팅 키 (Routing key)
}
4. 라우팅 패턴(Routing Pattern)
💡 라우팅 패턴(Routing Pattern)
- Topic Exchange에서 주로 사용되며, 라우팅 키의 패턴 매칭에 따라 메시지를 큐로 라우팅 합니다. 여기서는 와일드카드(*)와 해시(#)를 이용한 패턴 매칭으로 사용이 됩니다.
- 와일드카드(*)의 경우는 0개 또는 한 단어와 비교를 수행합니다.
- 예를 들어 "fruit.banana."라는 패턴은 "fruit.banana"가 일치하며, 추가적인 단어가 있는 "fruit.banana.delicious"도 일치하지만, 다른 단어로 시작하는 "fruit.apple.delicious"는 일치하지 않습니다.
- 해시(#)는 해시 기호 이후의 모든 라우팅 키와 매칭됩니다.
- 예를 들어, "fruit.#" 패턴은 "fruit.banana", "fruit.banana.delicious", "fruit.apple" 등 fruit로 시작하는 모든 라우팅 키와 일치합니다.
- 라우팅 패턴은 메시지 라우팅의 유연성을 높여주며, 특정 주제 또는 카테고리에 대한 메시지를 특정 큐로 라우팅 하는데 유용합니다. 이를 통해 메시지 소비자는 관심 있는 메시지만 선택적으로 수신할 수 있습니다.
💡 사용 예시
- Topic Exchange를 사용하는 예시에서 “order.*”에 맞는 라우팅 키에 대한 패턴을 찾습니다.
/**
* Topic Exchange 구성
*
* @return
*/
@Bean
TopicExchange topicExchange() {
// topic.exchange 이름의 Topic Exchange
return new TopicExchange("exchange.topic");
}
@Bean
Queue queue5() {
// queue5 이름의 큐를 구성합니다.
return new Queue("queue5", false);
}
/**
* Topic Exchange 와 Queue5 간의 바인딩을 수행합니다.
* - Topic Exchange 방식으로 Queue5와 특정 라우팅 패턴(Routing Pattern)을 기반으로 바인딩 수행
*
* @param topicExchange
* @param queue5
* @return
*/
@Bean
Binding topicBinding(TopicExchange topicExchange, Queue queue5) {
return BindingBuilder
.bind(queue5)
.to(topicExchange)
.with("order.*");
}
[ 더 알아보기 ]
💡 Topic Exchange를 사용하는데 와일드카드(*)와 해시(#)를 사용하지 않고 구성하면 어떻게 될까?
- Topic Exchange에서 와일드카드(*)와 해시(#)를 사용하지 않을 경우, 라우팅 키가 완전히 일치하는 큐에만 메시지가 라우팅 됩니다.
- 즉, 패턴 매칭을 활용한 유연한 라우팅이 이루어지지 않고, Direct Exchange와 유사하게 작동하게 됩니다.
2) 초기 환경 구성 : RabbitMQConfig
💡 초기 환경 구성 : RabbitMQConfig
- 최초 환경 구성은 아래의 글을 참고하셔서 최초 구성을 한 뒤 RabbitMQ를 이용하기 위한 Exchange, Queue, Binding을 수행합니다.
💡 구성 예시
- 아래의 예시에서는 RabbitMQ 내에서 생성자(Prodceduer)의 역할을 수행합니다.
- 이는 @Configuration과 @Bean 어노테이션을 통해 애플리케이션 서버가 실행될 때, RabbitMQ를 연결하고 Exchange, Queue를 등록하여, 해당 정보를 토대로 Binding을 수행합니다.
- 또한 이를 통해 RabbitMQ와의 통신을 위한 rabbitTemplate을 구성하여, 이를 기반으로 서버와 RabbitMQ 간의 통신이 수행됩니다.
1. 이벤트 수신자, 소비자(Event Consumer) 구성
💡 이벤트 수신자, 소비자(Event Consumer) 구성
- 해당 부분에서는 RabbitMQ의 Queue에 메시지가 들어오는 경우 @RabbitListener 어노테이션을 통해서 이를 수신합니다.
- 여기서는 queue1, 2, 3, 4, 5에 대한 큐에 대해 데이터가 들어온 경우, 해당 데이터를 반환받기로 구성하였습니다.
package com.adjh.springbootrabbitmqconsumer.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Order Service에 대한 이벤트 소비자를 구성합니다.
*
* @author : lee
* @fileName : OrderComponent
* @since : 24. 5. 28.
*/
@Slf4j
@Component
public class OrderComponent {
@RabbitListener(queues = "queue1")
public void receiveMessage(String msg) {
System.out.println("Queue1 내의 결과 값을 반환 받습니다 " + msg);
}
@RabbitListener(queues = "queue2")
public void receiveMessage2(String msg) {
System.out.println("Queue2 내의 결과 값을 반환 받습니다 " + msg);
}
@RabbitListener(queues = "queue3")
public void receiveMessage3(String msg) {
System.out.println("Queue3 내의 결과 값을 반환 받습니다 " + msg);
}
@RabbitListener(queues = "queue4")
public void receiveMessage4(String msg) {
System.out.println("Queue4 내의 결과 값을 반환 받습니다 " + msg);
}
@RabbitListener(queues = "queue5")
public void receiveMessage5(String msg) {
System.out.println("Queue5 내의 결과 값을 반환 받습니다 " + msg);
}
}
💡 [참고] @RabbitListener 어노테이션 속성값
속성 값 리턴 값 설명
속성 값 | 리턴 값 | 설명 |
ackMode | String | 컨테이너 팩토리의 AcknowledgeMode 속성을 재정의합니다. |
admin | String | AmqpAdmin에 대한 참조입니다. |
autoStartup | String | 컨테이너 팩토리의 기본 설정을 재정의하여 true 또는 false로 설정합니다. |
batch | String | 컨테이너 팩토리의 batchListener 속성을 재정의합니다. |
bindings | QueueBinding[] | 리스너의 큐 이름과 교환기, 선택적인 바인딩 정보를 함께 제공하는 QueueBindings의 배열입니다. |
concurrency | String | 이 리스너를 위한 리스너 컨테이너의 동시성을 설정합니다. |
containerFactory | String | 이 엔드포인트를 서비스하는 메시지 리스너 컨테이너를 생성하는데 사용되는 RabbitListenerContainerFactory의 빈 이름입니다. |
converterWinsContentType | String | 'replyContentType' 속성의 값으로 메시지 컨버터가 설정한 모든 컨텐츠 타입 헤더를 재정의하여 'false'로 설정합니다. |
errorHandler | String | 리스너 메서드가 예외를 던질 경우 호출할 RabbitListenerErrorHandler를 설정합니다. |
exclusive | boolean | true로 설정하면 컨테이너의 단일 소비자가 queues()의 독점 사용권을 가지게 되어, 다른 소비자가 큐에서 메시지를 받는 것을 방지합니다. |
executor | String | 이 리스너의 컨테이너에 사용할 작업 실행자 빈 이름을 설정합니다. 이 설정은 컨테이너 팩토리에서 설정한 모든 실행자를 재정의합니다. |
group | String | 제공된 경우, 이 리스너의 리스너 컨테이너는 이 값의 이름을 가진 빈에 추가됩니다. 이 빈은 Collection<MessageListenerContainer> 유형입니다. |
id | String | 이 엔드포인트를 관리하는 컨테이너의 고유 식별자입니다. |
messageConverter | String | 이 리스너에 사용되는 컨테이너 팩토리의 메시지 컨버터를 재정의합니다. |
priority | String | 이 엔드포인트의 우선순위입니다. |
queues | String[] | 이 리스너의 큐입니다. |
queuesToDeclare | Queue[] | 이 리스너의 큐입니다. |
replyContentType | String | 응답 메시지의 컨텐츠 유형을 설정하는데 사용됩니다. |
replyPostProcessor | String | 응답이 전송되기 전에 후처리를 위한 ReplyPostProcessor의 빈 이름입니다. |
returnExceptions | String | 리스너가 던진 예외를 일반적인 replyTo/@SendTo 의미론을 사용하여 발신자에게 보내도록 "true"로 설정합니다. |
2. 이벤트 송신자, 생성자(Producer) 서비스 구축
💡 이벤트 송신자, 생성자(Producer) 서비스 구축
- 각각 Exchange 별로 비즈니스 로직 구축을 위해 서비스를 구축하였습니다.
package com.adjh.springbootrabbitmq.service;
import com.adjh.springbootrabbitmq.dto.MessageDto;
/**
* 메시지 생성자의 Exchange 별 서비스 처리
*
* @author : jonghoon
* @fileName : ProducerService
* @since : 5/25/24
*/
public interface ProducerService {
void directSendMessage(MessageDto messageDto); // Direct Exchange 방식 이용
void fanoutSendMessage(MessageDto messageDto); // Fanout Exchange 방식 이용
void headerSendMessage(MessageDto messageDto); // Header Exchange 방식 이용
void topicSendMessage(MessageDto messageDto); // Topic Exchange 방식 이용
}
3) Exchange 유형 : Direct Exchange
💡 Exchange 유형 : Direct Exchange
- ‘라우팅 키(Routing Key)’를 기반으로 메시지를 큐로 라우팅 합니다.
- 바인딩 키가 메시지의 라우팅 키와 ‘정확히 일치하는 큐’로 메시지가 라우팅 됩니다.
1. Direct Exchange 처리과정
💡 Direct Exchange 처리과정
- 이벤트 생성자는 Direct Exchange를 이용하여 바인딩 과정에서 ‘라우팅 키’를 포함하여, 이를 기반으로 이벤트 소비자에게 전송되는 과정입니다.
1. 서버가 실행될 때, Direct Exchange와 데이터를 담을 Queue를 구성하고, 이 둘을 Binding을 합니다.
- 바인딩 과정에서 라우팅 키를 포함하여 구성합니다.
2. 이벤트 생성자(Sender)는 특정 이벤트를 발생시킵니다.
- 해당 이벤트를 통해 RabbitTemplate에서 구성한 Direct Exchange를 지정하며, 라우팅 키를 포함하여 데이터와 함께 큐로 전송합니다.
3. 이벤트 소비자(Consumer)는 이벤트를 수신합니다.
- 사전에 생성자가 구성한 큐를 수신하고 있다가 큐에 데이터가 들어오면 데이터를 전달받습니다.
2. 사용예시
💡 사용예시 -1 : RabbitMqConfig
- 해당 RabbitMqConfig.java 파일 내에서가 최초 서버가 실행되면서 ‘Direct Exchange‘ 구성들이 수행됩니다.
1. directExchange()
- Direct Exchange를 구성합니다.
2. queue1()
- Direct Exchange를 수행하여 데이터를 적재하기 위한 Queue1을 구성하였습니다.
3. directBinding()
- Direct Exchange에서는 Direct Exchange와 Queue1 간의 바인딩을 수행합니다.
- Direct Exchange 방식으로 Queue1와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
/**
* Direct Exchange 구성
*
* @return
*/
@Bean
DirectExchange directExchange() {
// direct.exchange 이름의 Direct Exchange 구성
return new DirectExchange("exchange.direct");
}
/**
* Queue 구성
*
* @return
*/
@Bean
Queue queue1() {
// queue1 이름의 큐를 구성합니다.
return new Queue("queue1", false);
}
/**
* Direct Exchange 와 Queue1 간의 바인딩을 수행합니다.
* - Direct Exchange 방식으로 Queue1와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
*
* @param directExchange
* @param queue1
* @return
*/
@Bean
Binding directBinding(DirectExchange directExchange, Queue queue1) {
return BindingBuilder
.bind(queue1)
.to(directExchange)
.with("order.pizza"); // 라우팅 키 (Routing key)
}
💡 사용예시 -2 : ProducerServiceImpl
- 사전에 구성한 서비스 인터페이스의 Direct Exchange의 구현체입니다.
1. 클라이언트에서 전달받은 messageDto라는 객체를 문자열 형태로 직렬화(Object to String)를 수행합니다
2. Direct Exchange를 이용하여 라우팅 키(order.pizza)를 기반으로 queue1로 데이터를 전송합니다.
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* Direct 방식을 이용하여 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void directSendMessage(MessageDto messageDto) {
try {
// 1. 전송하려는 객체를 문자열로 변환합니다.
ObjectMapper objectMapper = new ObjectMapper();
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// 2. Direct Exchange를 이용하여 라우팅 키(order.pizza)를 기반으로 queue1로 데이터를 전송합니다.
rabbitTemplate.convertAndSend("exchange.direct", "order.pizza", objectToJSON);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
3. 수행 결과
💡 수행 결과 -1 : 이벤트 생성자
- 이벤트 생성자에서 아래와 같은 결과를 전송하였습니다.
💡 수행 결과 -2: 이벤트 소비자
- Queue1을 수신하고 있던 이벤트 소비자에서 아래와 같은 결과를 수신하였습니다.
4) Exchange 유형: Fanout Exchange
💡 Exchange 유형: Fanout Exchange
- ‘라우팅 키’에 관계없이 바인딩된 모든 큐로 메시지를 라우팅 합니다.
- 여러 소비자에게 메시지를 브로드캐스트 하는 데 유용합니다.
1. Fanout Exchange 처리 과정
💡 Fanout Exchange 처리과정
- 이벤트 생성자는 Fanout Exchange를 이용하여 바인딩 때, 연결된 Queue에게 일괄적으로 이벤트 소비자에게 전송되는 과정입니다.
1. 서버가 실행될 때, Fanout Exchange와 데이터를 담을 Queue를 구성하고, 이 둘을 Binding을 합니다.
- 해당 바인딩 과정에서 한 개의 큐는 하나의 Exchange 간의 바인딩을 수행합니다. 즉, 여러 개의 Queue에게 전송하기 위해서는 각각 Exchange-Queue 형태로 구성해야 합니다.
2. 이벤트 생성자(Sender)는 특정 이벤트를 발생시킵니다.
- 해당 이벤트를 통해 RabbitTemplate에서 구성한 Fanout Exchange를 지정하여 데이터와 함께 큐로 전송합니다.
3. 이벤트 소비자(Consumer)는 이벤트를 수신합니다.
- 사전에 생성자가 구성한 큐를 수신하고 있다가 큐에 데이터가 들어오면 데이터를 전달받습니다.
- 해당 경우는 여러 개의 큐와 바인딩이 되면 일괄적으로 모두 전송이 됩니다.
2. 사용예시
💡 사용예시 -1 : RabbitMqConfig
- 해당 RabbitMqConfig.java 파일 내에서가 최초 서버가 실행되면서 ’Fanout Exchange‘ 구성들이 수행됩니다.
1. fanoutExchange()
- "exchange.fanout"이라는 이름의 Fanout Exchange를 생성합니다.
2. queue2(), queue3()
- 각각 "queue2", "queue3"라는 이름의 큐를 생성합니다.
3. fanoutBinding1()
- "queue2"를 "exchange.fanout"에 바인딩합니다. 즉, "exchange.fanout"에서 보내진 메시지는 "queue2"로 라우팅 됩니다.
4. fanoutBinding2()
- "queue3"를 "exchange.fanout"에 바인딩합니다. 즉, "exchange.fanout"에서 보내진 메시지는 "queue3"로 라우팅 됩니다.
- Fanout Exchange 방식은 라우팅 키를 무시하고 바인딩된 모든 큐에 메시지를 브로드캐스트 하는 방식입니다. 따라서, 이 예시에서는 "exchange.fanout"에 보내진 모든 메시지는 "queue2"와 "queue3"로 전송됩니다.
/**
* Fanout Exchange 구성
*
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
// fanout.exchange 이름의 Fanout Exchange 구성
return new FanoutExchange("exchange.fanout");
}
@Bean
Queue queue2() {
// queue2 이름의 큐를 구성합니다.
return new Queue("queue2", false);
}
@Bean
Queue queue3() {
// queue3 이름의 큐를 구성합니다.
return new Queue("queue3", false);
}
/**
* Fanout Exchange 와 Queue2 간의 바인딩을 수행합니다.
* - Fanout Exchange 방식으로 Queue2, Queue3와의 바인딩 수행(* 일괄 Queue2, Queue3에게 전송예정)
*
* @param fanoutExchange
* @param queue2
* @return
*/
@Bean
Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue queue2) {
return BindingBuilder
.bind(queue2)
.to(fanoutExchange); // 바인딩 된 모든 큐로 라우팅이 됩니다.
}
/**
* Fanout Exchange 와 Queue3 간의 바인딩을 수행합니다.
* - Fanout Exchange 방식으로 Queue2, Queue3와의 바인딩 수행(* 일괄 Queue2, Queue3에게 전송예정)
*
* @param fanoutExchange
* @param queue3
* @return
*/
@Bean
Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue queue3) {
return BindingBuilder
.bind(queue3)
.to(fanoutExchange); // 바인딩 된 모든 큐로 라우팅이 됩니다.
}
💡 사용예시 -2 : ProducerServiceImpl
- 사전에 구성한 서비스 인터페이스의 Fanout Exchange의 구현체입니다.
1. 클라이언트에서 전달받은 messageDto라는 객체를 문자열 형태로 직렬화(Object to String)를 수행합니다
2. Fanout Exchange를 이용하여 queue2, queue3으로 데이터를 전송합니다.
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* Fanout 방식을 이용하여 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void fanoutSendMessage(MessageDto messageDto) {
try {
// 1. 전송하려는 객체를 문자열로 변환합니다.
ObjectMapper objectMapper = new ObjectMapper();
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// 2. Fanout Exchange를 이용하여 queue2, queue3로 데이터를 전송합니다.
rabbitTemplate.convertAndSend("exchange.fanout", "", objectToJSON);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
3. 수행 결과
💡 수행 결과 -1 : 이벤트 생성자
- 이벤트 생성자에서 아래와 같은 결과를 전송하였습니다.
💡 수행 결과 -2: 이벤트 소비자
- Queue2, 3을 수신하고 있던 이벤트 소비자에서 아래와 같은 결과를 수신하였습니다.
5) Exchange 유형: Headers Exchange
💡 Exchange 유형: Headers Exchange
- 라우팅 키 대신 ‘헤더 속성 값’에 따라 메시지를 큐로 라우팅 합니다.
- 헤더 값은 소비자가 지정한 헤더와 일치해야 메시지가 라우팅이 됩니다.
- 헤더 정보를 기반으로 라우팅을 하기 때문에 라우팅 키나 패턴 매칭과는 다른 동작을 가집니다.
1. Headers Exchange 처리 과정
💡 Headers Exchange 처리과정
- 이벤트 생성자는 Headers Exchange를 이용하여 바인딩 때, ‘header의 키 값’을 확인하여 동일하거나 혹은 존재한 지 여부를 비교하여, 이를 기반으로 이벤트 소비자에게 전송되는 과정입니다.
1. 서버가 실행될 때, Headers Exchange와 데이터를 담을 Queue를 구성하고, 이 둘을 Binding을 합니다.
- 바인딩 과정에서 header 키에 대한 조건을 포함합니다.(존재하거나 혹은 동일하거나)
2. 이벤트 생성자(Sender)는 특정 이벤트를 발생시킵니다.
- 해당 이벤트를 통해 RabbitTemplate에서 구성한 Headers Exchange를 지정하여 데이터와 함께 큐로 전송합니다.
3. 이벤트 소비자(Consumer)는 이벤트를 수신합니다.
- 사전에 생성자가 구성한 큐를 수신하고 있다가 큐에 데이터가 들어오면 데이터를 전달받습니다.
2. 사용 예시
💡 사용예시 -1 : RabbitMqConfig
- 해당 RabbitMqConfig.java 파일 내에서가 최초 서버가 실행되면서 ‘Topic Exchange‘ 구성들이 수행됩니다.
1. headersExchange() 메서드는 "exchange.headers"라는 이름의 Headers Exchange를 생성합니다.
2. queue5() 메서드는 "queue5"라는 이름의 큐를 생성합니다.
3. headersBinding() 메서드는 Headers Exchange와 Queue4 사이의 바인딩을 선언합니다. 여기서는 "x-api-key"라는 헤더가 존재하는지를 확인하는 조건을 사용하여 바인딩을 수행합니다.
- 즉, 이 설정은 메시지가 "x-api-key"라는 헤더를 가지고 있을 경우에만 "queue5"로 라우팅 됩니다.
/**
* Headers Exchange 구성
*
* @return
*/
@Bean
HeadersExchange headersExchange() {
// headers.exchange 이름의 Headers Exchange
return new HeadersExchange("exchange.headers");
}
@Bean
Queue queue4() {
// queue4 이름의 큐를 구성합니다.
return new Queue("queue4", false);
}
/**
* Headers Exchange 와 Queue4 간의 바인딩을 수행합니다.
* - Headers Exchange 방식으로 Queue4와 Header 값을 조건으로 바인딩 수행
*
* @param headersExchange
* @param queue4
* @return
*/
@Bean
Binding headersBinding(HeadersExchange headersExchange, Queue queue4) {
return BindingBuilder
.bind(queue4)
.to(headersExchange)
.where("x-api-key") // Header 내에 "x-api-key" 라는 값이 존재하는 경우
.exists();
}
💡 사용예시 -2 : ProducerServiceImpl
- 사전에 구성한 서비스 인터페이스의 Headers Exchange의 구현체입니다.
1. 클라이언트에서 전달받은 messageDto라는 객체를 문자열 형태로 직렬화(Object to String)를 수행합니다
2. Headers Exchange를 이용하여 Header 내에 x-api-key가 존재한다면 queue4로 데이터를 전송합니다.
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* Headers 방식을 이용하여 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void headerSendMessage(MessageDto messageDto) {
try {
// 1. 전송하려는 객체를 문자열로 변환합니다.
ObjectMapper objectMapper = new ObjectMapper();
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// 2. Headers Exchange를 이용하여 queue4로 데이터를 전송합니다.
rabbitTemplate.convertAndSend("exchange.headers", "", objectToJSON);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
3. 수행 결과
💡 수행 결과 -1 : 이벤트 생성자
- 이벤트 생성자에서 아래와 같은 결과를 전송하였습니다.
💡 수행 결과 -2: 이벤트 소비자
- Queue4를 수신하고 있던 이벤트 소비자에서 아래와 같은 결과를 수신하였습니다.
6) Exchange 유형: Topic Exchange
💡 Exchange 유형: Topic Exchange
- 라우팅 키의 패턴 매칭에 따라 메시지를 큐로 라우팅 합니다.
- 라우팅 패턴 간의 와일드카드(*) 혹은 해시(#)가 일치해야만 수행합니다.
1. Topic Exchange 처리 과정
💡 Topic Exchange 처리과정
- 이벤트 생성자는 Topic Exchange를 이용하여 바인딩 때, ‘라우팅 패턴’을 확인하여 이를 기반으로 이벤트 소비자에게 전송되는 과정입니다.
1. 서버가 실행될 때, Topic Exchange와 데이터를 담을 Queue를 구성하고, 이 둘을 Binding을 합니다.
- 바인딩 과정에서 라우팅 패턴(*, #)에 만족하는 조건을 포함합니다.
2. 이벤트 생성자(Sender)는 특정 이벤트를 발생시킵니다.
- 해당 이벤트를 통해 RabbitTemplate에서 구성한 Topic Exchange를 지정하여 데이터와 함께 큐로 전송합니다.
3. 이벤트 소비자(Consumer)는 이벤트를 수신합니다.
사전에 생성자가 구성한 큐를 수신하고 있다가 큐에 데이터가 들어오면 데이터를 전달받습니다.
2. 사용 예시
💡 사용예시 -1 : RabbitMqConfig
- 해당 RabbitMqConfig.java 파일 내에서가 최초 서버가 실행되면서 ‘Topic Exchange‘ 구성들이 수행됩니다.
1. topicExchange()
- 이름이 "exchange.topic"인 Topic Exchange를 생성합니다.
2. queue5()
- "queue5"인 Queue를 생성합니다.
3. topicBinding()
- "exchange.topic" Topic Exchange와 "queue5" Queue를 바인딩합니다. 이때 바인딩의 라우팅 패턴은 "order.*"입니다. 이는 "order."로 시작하는 모든 라우팅 키가 해당 Queue로 메시지를 전송하게 됩니다.
/**
* Topic Exchange 구성
*
* @return
*/
@Bean
TopicExchange topicExchange() {
// topic.exchange 이름의 Topic Exchange
return new TopicExchange("exchange.topic");
}
@Bean
Queue queue5() {
// queue5 이름의 큐를 구성합니다.
return new Queue("queue5", false);
}
/**
* Topic Exchange 와 Queue5 간의 바인딩을 수행합니다.
* - Topic Exchange 방식으로 Queue5와 특정 라우팅 패턴(Routing Pattern)을 기반으로 바인딩 수행
*
* @param topicExchange
* @param queue5
* @return
*/
@Bean
Binding topicBinding(TopicExchange topicExchange, Queue queue5) {
return BindingBuilder
.bind(queue5)
.to(topicExchange)
.with("order.*");
}
💡 사용예시 -2 : ProducerServiceImpl
- 사전에 구성한 서비스 인터페이스의 Topic Exchange의 구현체입니다.
1. 클라이언트에서 전달받은 messageDto라는 객체를 문자열 형태로 직렬화(Object to String)를 수행합니다.
2. Topic Exchange를 이용하여 라우팅 패턴(order.*)을 기반으로 이에 해당하는 queue5로 데이터를 전송합니다.
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* Topic 방식을 이용하여 메시지
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void topicSendMessage(MessageDto messageDto) {
try {
ObjectMapper objectMapper = new ObjectMapper();
String objectToJSON = objectMapper.writeValueAsString(messageDto);
rabbitTemplate.convertAndSend("exchange.topic", "order.*", objectToJSON);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
3. 수행 결과
💡 수행 결과 -1 : 이벤트 생성자
- 이벤트 생성자에서 아래와 같은 결과를 전송하였습니다.
💡 수행 결과 -2: 이벤트 소비자
- Queue5를 수신하고 있던 이벤트 소비자에서 아래와 같은 결과를 수신하였습니다.
💡 [참고] 해당 작업에 대한 소스코드는 아래의 Git Repository에서 확인하실 수 있습니다.
오늘도 감사합니다. 😀
반응형
'Java > Message Queue' 카테고리의 다른 글
[Java] Spring Boot AMQP RabbitMQ 이해하기 -6 : 메시지 큐 종류, 큐 우선순위 (0) | 2024.06.29 |
---|---|
[Java] Spring Boot AMQP RabbitMQ 이해하기 -5 : TTL 및 데드 레터링 사용예시 (0) | 2024.06.05 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -3 : Java 구축 및 간단 예제 (0) | 2023.10.21 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -2 : 로컬 환경 구성 (2) | 2023.10.15 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -1 : 구조 및 종류 (1) | 2023.10.14 |