반응형
해당 글에서는 메시지 큐의 종류에 대해 알아보고 우선순위를 부여한 메시지 큐 처리와 각각 처리 방법에 대해서 알아봅니다.
💡 [참고] Spring Boot AMQL RabbitMQ에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
분류 | 링크 |
Spring Boot AMQP RabbitMQ -1 : 구조 및 종류 이해하기 | https://adjh54.tistory.com/284 |
Spring Boot AMQP RabbitMQ -2 : 로컬 환경 구성하기 | https://adjh54.tistory.com/285 |
Spring Boot AMQP RabbitMQ -3 : Java 환경 구축 및 간단 예시 | https://adjh54.tistory.com/292 |
Spring Boot AMQP RabbitMQ -4 : Exchange 종류 별 이해 및 사용예시 | https://adjh54.tistory.com/497 |
Spring Boot AMQP RabbitMQ -5 : TTL 및 데드 레터링 사용예시 | https://adjh54.tistory.com/501 |
Spring Boot AMQP RabbitMQ -6 : 메시지 큐 종류, 큐 우선순위 | https://adjh54.tistory.com/518 |
Docker : Docker를 이용하여 RabbitMQ 구축하기 | https://adjh54.tistory.com/496 |
Docker : Docker Compose를 이용하여 RabbitMQ Node Cluster 구축하기 | https://adjh54.tistory.com/517 |
API Document : QueueBuilder API Document | https://adjh54.tistory.com/505 |
API Document : ExchangeBuilder API Document | https://adjh54.tistory.com/506 |
API Document : MessageProperties, MessagePropertiesBuilder, MessageBuilderSupport | https://adjh54.tistory.com/508 |
Spring Boot AMQP RabbitMQ Github : Event Producer | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq |
Spring Boot AMQP RabbitMQ Github : Event Consumer | https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-rabbitmq-consumer |
1) RabbitMQ
💡 RabbitMQ
- 오픈 소스 기반 메시지 브로커 소프트웨어로 분산 시스템에서 ‘메시지나 데이터’를 전달하기 위한 중간 매개체(미들웨어)로 사용됩니다.
- AMQP(Advanced Message Queuing Protocol)를 기반으로 하며 다양한 프로그래밍 언어와 플랫폼에서 사용할 수 있습니다.
- 메시지 큐를 사용하여 메시지의 생산자와 소비자 사이에 '비동기적인 통신'함으로써 메시지는 큐에 저장되어 생산자와 소비자가 독립적으로 작동할 수 있습니다. 또한 RabbitMQ를 이용하여 메시지를 안전하게 전달하고 처리하는데 중점을 둡니다.
1. RabbitMQ의 주요 용어
용어 | 설명 |
생산자(Producer) | - 메시지를 ‘생성’하고 메시지 브로커에 ‘전송’하는 구성요소를 의미합니다. |
메시지(Message) | - 송신자와 수신자간에 전송이 되는 ‘데이터’를 의미합니다. |
메시지 브로커(Message Broker) | - 메시지를 생성하고 소비하는 클라이언트 간에 메시지를 전달하는 미들웨어입니다.ex) Apache Kafka, RabbitMQ, Amazon Kinesis |
교환기(Exchange) | - 메시지를 수신하고 처리할 대상을 결정하는 구성요소를 의미합니다.- 주로 메시지를 수신하고 라우팅 알고리즘을 통해 특정 큐로 보내는 역할을 수행합니다. |
소비자(Consumer) | - 메시지 브로커에서 메시지를 가져와서 ‘수신’하고 ‘처리’를 하는 주체를 의미합니다. |
바인딩(Binding) | - 클라이언트 애플리케이션과 메시징 시스템 간의 연결을 하는 과정을 의미합니다. |
메시지 큐(Message Queue) | - 메시징 시스템에 저장되는 공간이자 생성자가 메시지를 전송하면 수신자가 이를 수신하는 전달 과정을 관리하는 역할을 수행합니다.. |
2. 메시지 큐(Message Queue)
💡 메시지 큐(Message Queue)
- 메시지를 임시로 저장하는 공간이자, 메시지 생성자(Message Producer)가 생성한 메시지를 수신하는 메시지 소비자(Consumer)로 메시지가 전달 과정을 관리하고 메시지가 안전하게 전달될 수 있도록 돕습니다.
- 메시지는 임시로 저장하는 버퍼 역할을 하며 큐에 저장된 메시지는 메시지 소비자(Consumer)에 의해 처리될 때까지 대기하게 됩니다. 이를 통해 시스템 간의 통신을 안정화하고 부하를 관리하며 서로 다른 시스템 간에 메시지를 안전하게 전달할 수 있습니다.
- 높은 처리량, 유연한 라우팅, 클러스터링, 메시지 지속성 등 다양한 기능을 제공합니다. 또한, RabbitMQ는 다양한 메시징 패턴을 지원하며, 플러그인 아키텍처를 통해 기능을 확장할 수 있습니다.
3. 메시지 큐 종류
💡 메시지 큐 종류
- RabbitMQ Manangement로 확인을 해보면 ‘Classic’, ‘Quorum’, ‘Stream’이 있음을 확인하였습니다.
종류 | 설명 | 특징 |
Classic Queue | 일반적인 큐를 의미하며, 큐에 도착하는 메시지가 먼저 나오는 순서를 가지는 FIFO(First In, First Out) 형태를 가지는 메시지 큐를 의미합니다. | 높은 처리량을 지원하며, 메시지를 안정적으로 전달하는 것을 중점을 두어 수행합니다. |
Quorum Queue | 메시지 브로커에서 메시지 손실을 방지하게 개발된 기능으로 메시지를 복제하여 여러 노드에 분산 저장하는 방식을 의미합니다. | 데이터 안정성을 확보하는 방식이며 클러스터 내의 노드 일부가 실패하더라도 메시지가 손실되지 않도록 보장합니다. |
Stream Queue | Stream 큐는 메시지를 저장하는 데 사용되는 방식이 다르며, 높은 처리량과 메시지 순서를 유지할 수 있는 기능을 제공합니다. | Stream 큐는 메시지를 여러 개의 청크로 나누어 저장하므로, 메시지를 빠르게 저장하고 검색할 수 있습니다. |
2) 메시지 큐의 종류-1 : 클래식 큐(Classic Queues)
💡 클래식 큐(Classic Queues)
- 일반적인 큐를 의미하며, 큐에 도착하는 메시지가 먼저 나오는 순서를 가지는 FIFO(First In, First Out) 형태를 가지는 메시지 큐를 의미합니다.
- 높은 처리량을 지원하며, 메시지를 안정적으로 전달하는 것을 중점을 두어 수행합니다.
1. 클래식 큐 주요 특징
💡 클래식 큐 주요 특징
1. 대기열(Queue) 독점성
- 선입 선출(FIFO) 구조에서 단 하나의 연결에서만 사용되며 해당 연결이 닫히면 대기열이 삭제됩니다.
- 관련 링크 : https://www.rabbitmq.com/docs/queues
2. 데드 레터링(Dead Lettering) 지원
- 큐에 도착한 메시지의 전송이 실패한 경우, 데드 레터 익스체인지(DLX)를 통해 라우팅 되어 데드 레터 큐 내에 저장이 되어 문제를 분석하고 해결하는데 도움을 줍니다.
- 관련 링크 : https://www.rabbitmq.com/docs/quorum-queues#dead-lettering
3. TTL(Time-To-Live) 설정 지원
- 메시지 혹은 큐에 유효시간을 지정하여 메시지의 유효성을 보장합니다.
- 관련 링크 : https://www.rabbitmq.com/docs/ttl
4. 대기열(Queue)의 길이 제한 설정 지원
- 길이를 제한함으로써 대기열의 제일 앞쪽(전송되지 못하고 가장 오래된 메시지)에 위치한 메시지를 삭제하거나 데드 레터 큐로 전송합니다.
- 대기열의 길이는 RabbitMQ 3.12 버전 이상에서는 최대 2048개의 메시지를 메모리에 보관할 수 있습니다.
5. 내구성(durable) 설정 지원
- 메시지를 디스크에 저장하고 서버 재시작 후에도 메시지가 유실되지 않게 하는 내구성(durable) 설정을 지원합니다.
- 그러나 해당 설정은 큐가 유지되기에 성능에 부담을 줄 수 있습니다.
6. 메시지 우선순위 설정 지원
- 우선순위가 높은 메시지가 먼저 처리되도록 도와줍니다. 이를 위해 별도의 우선순위 큐를 만들어 우선순위가 높은 메시지를 별도 큐에 저장하고 해당 큐가 다른 큐보다 먼저 처리되도록 하는 것을 지원합니다.
- 관련 링크 : https://www.rabbitmq.com/docs/priority
2. 클래식 큐 구성 예시
💡 클래식 큐 구성 예시
- 아래와 같이 두 개의 ClassicQueue를 구성하였습니다. 사용자는 특정 서비스에 메시지를 전달하면 Fanout Exchange를 통해서 각각 바인딩된 ClassicQueue1, ClassicQueue2에 순서대로 적재되는 과정입니다.
- Fanout Exchange를 통해서 Routing Key를 지정하지 않고 Queue의 이름만을 매핑하여서 일괄 전송되도록 구성되어 있습니다.
package com.adjh.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 클래식 큐(Classic Queue)를 구성하기 위한 설정 클래스입니다.
*
* @author : lee
* @fileName : RabbitMqClassicQueueConfig
* @since : 24. 6. 10.
*/
@Configuration
public class RabbitMqClassicQueueConfig {
/**
* Queue 구성 : 일반적인 클래식 큐로 구성
*
* @return
*/
@Bean
public Queue classicQueue1() {
return QueueBuilder
// 1. 큐 이름 지정
.durable("classicQueue1")
// 2. 데드 레터 익스체인지 지정
.deadLetterExchange("deadLetterExchange")
// 3. 데드 레터 라우터 지정
.deadLetterRoutingKey("deadLetter")
.build();
}
/**
* Queue 구성 : 일반적인 클래식 큐로 구성
*
* @return
*/
@Bean
public Queue classicQueue2() {
return QueueBuilder
// 1. 큐 이름 지정
.durable("classicQueue2")
// 2. 데드 레터 익스체인지 지정
.deadLetterExchange("deadLetterExchange")
// 3. 데드 레터 라우터 지정
.deadLetterRoutingKey("deadLetter")
.build();
}
/**
* Fanout Exchange 구성
*
* @return
*/
@Bean
public FanoutExchange classicQueueFanoutExchange() {
return ExchangeBuilder.fanoutExchange("exchange.fanout.classicQueue").build();
}
/**
* Fanout fanoutExchange - classicQueue1 간의 바인딩을 수행합니다.
*
* @param classicQueueFanoutExchange
* @param classicQueue1
* @returnㅈ
*/
@Bean
Binding classQueueBind1(FanoutExchange classicQueueFanoutExchange, Queue classicQueue1) {
return BindingBuilder
.bind(classicQueue1)
.to(classicQueueFanoutExchange);
}
/**
* TFanout fanoutExchange - classicQueue2 간의 바인딩을 수행합니다.
*
* @param classicQueueFanoutExchange
* @param classicQueue2
* @returnㅈ
*/
@Bean
Binding classQueueBind2(FanoutExchange classicQueueFanoutExchange, Queue classicQueue2) {
return BindingBuilder
.bind(classicQueue2)
.to(classicQueueFanoutExchange);
}
}
3. 결과 확인
💡 결과 확인
- Exchange에서는 ClassicQueue1, ClassicQueue2에 각각 순차적으로 적재가 되었음을 확인하였습니다.
3) 메시지 큐의 종류-2 : 클래식 큐(큐 우선순위 & 메시지 우선순위)
💡 큐의 우선순위
- 클래식 큐(Classic Queue) 내에서는 ‘우선순위’ 기능을 지원합니다. 이러한 기능은 클래식 큐 내의 메시지를 전송할 때 메시지 속성으로 ‘우선순위’를 두어서 순서에 따라 처리가 됩니다.
- 이러한 우선순위를 지정하지 않으면 0의 값을 가지며 순차적으로 큐에 적재됩니다. 이를 지정할 때는 1 ~ 255까지의 값을 지원하지만 1 ~ 5 사이의 값을 사용하는 것을 권장합니다.
- 예를 들면, 숫자가 높을수록 우선순위가 높기에, 메시지의 큐 우선순위가 5인 경우는 메시지의 큐 우선순위 1보다 우선순위가 높습니다.
- 1부터 주어진 큐 우선순위는 각 우선순위에 대한 하위 대기열을 내부적으로 유지해야 하기에 ‘우선순위 값이 높을수록 더 많은 CPU 및 메모리 리소스’가 필요합니다.
💡 메시지 우선순위
- 큐로 전송되는 각 메시지에 대해 ‘우선순위’를 지정하는 기능을 지원합니다. 이를 통해서 메시지의 처리 순서를 제어하는 데 사용이 됩니다.
- 이러한 우선순위를 지정하지 않으면 0의 값을 가지며 순차적으로 큐에 적재됩니다. 이를 지정할 때는 1 ~ 255까지의 값을 지원하지만 1 ~ 5 사이의 값을 사용하는 것을 권장합니다.
1. 우선순위 사용 목적
💡 우선순위 사용 목적
- 특정 메시지가 더 중요하거나 빠르게 처리가 되어야 할 경우, 그 메시지를 먼저 처리하도록 하기 위함입니다.
- 예를 들어서, 시스템에서 발생하는 중요 경고나 알림 메시지가 일반 메시지보다 먼저 처리되어야 하는 경우가 있습니다. 이때 메시지를 높은 우선순위로 설정하여 먼저 처리되도록 할 수 있습니다.
2. 큐 & 메시지 우선순위 속성 지정
💡 메시지 우선순위 속성 지정
- 우선순위의 속성은 ‘큐로 전송되는 메시지 별’로 속성이 지정이 됩니다. 이를 위해서는 큐 내에 maxPriority() 속성을 지정하고, Message 내에 우선순위를 지정하는 setPriority()를 함께 지정해야 합니다.
💡 큐 우선순위 속성 지정 예시
- 큐 내에서는 maxPriority() 메서드를 통해서 최대 우선순위를 지정할 수 있습니다. 이를 지정해야 메시지 내의 우선순위 처리가 수행이 됩니다.
/**
* 클래식 큐 내에 최대 우선순위를 지정하여, 메시지의 우선순위에 따라 처리 합니다.
*
* @return
*/
@Bean
public Queue classicPriorityQueue() {
return QueueBuilder
.durable("classicPriorityQueue")
.maxPriority(255)
.build();
}
💡 메시지 우선순위 속성 지정 예시
- 전송하려는 메시지의 인스턴스를 구성할 때 setPriority() 메서드를 통해서 메시지의 우선순위를 지정하거나 header 속성으로 “x-max-priority”의 값을 두어서 우선순위를 지정합니다.
// [STEP1] MessageProperties 인스턴스 구성
MessageProperties mpb = MessagePropertiesBuilder.newInstance()
.setPriority(5)
.build();
// or
// [STEP1] MessageProperties 인스턴스 구성
MessageProperties mpb = MessagePropertiesBuilder.newInstance()
.setHeader("x-max-priority", 5)
.build();
3. 메시지 우선순위 사용예시
💡 메시지 우선순위 사용예시
- 해당 예시에서는 생성자(Producer)가 두 개의 서비스를 호출하여, 우선순위 대로 메시지가 전달되는지 확인하기 위한 사용예제입니다.
3.1. 사용예시 구성도
💡 사용예시 구성도
- 우선순위가 1인 메시지와 우선순위 5인 메시지를 번갈아가면서 호출하여 Queue에 적재를 하는 경우에, 우선순위 5인 메시지가 일괄적으로 먼저 수행되고 추후 우선순위 1인 메시지가 처리되는지를 확인합니다.
3.2. RabbitMQ 설정 파일 구성
💡 RabbitMQ 설정 파일 구성
- RabbitMqClassicPriorityQueueConfig 내에서는 classicPriorityQueue라는 이름의 큐를 구성하고 최대 우선순위를 255로 지정하였습니다.
- 또한, 이를 큐에게 전달하기 위한 Exchange로 “exchange.direct.priorityQueue” 이름의 Direct Exchange를 이용하였고, 이를 이으는 Binding과 Routing Key를 구성하였습니다.
package com.adjh.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 클래식 큐 중 우선순위를 두어서 처리 예시
*
* @author : lee
* @fileName : RabbitMqClassicPriorityQueue
* @since : 24. 6. 12.
*/
@Configuration
public class RabbitMqClassicPriorityQueueConfig {
/**
* 클래식 큐 내에 최대 우선순위를 지정하여, 메시지의 우선순위에 따라 처리 합니다.
*
* @return
*/
@Bean
public Queue classicPriorityQueue() {
return QueueBuilder
.durable("classicPriorityQueue")
.maxPriority(255)
.build();
}
/**
* Direct Exchange
*
* @return
*/
@Bean
public DirectExchange classicPriorityExchange() {
return ExchangeBuilder.directExchange("exchange.direct.priorityQueue").build();
}
/**
* classicPriorityQueue와 classicPriorityExchange를 바인딩합니다.
*
* @param classicPriorityExchange
* @param classicPriorityQueue
* @return
*/
@Bean
Binding classicPriorityBind(DirectExchange classicPriorityExchange, Queue classicPriorityQueue) {
return BindingBuilder
.bind(classicPriorityQueue)
.to(classicPriorityExchange)
.with("classicPriorityQueue"); // 라우팅 키 (Routing key)
}
}
3.4. RabbitMQ 호출 서비스 구성
💡 RabbitMQ 호출 서비스 구성
- 해당 서비스에서는 실제 메시지를 큐로 전달하는 서비스 구현체입니다.
- 각각 서비스는 메시지 인스턴스를 구성할 때 setPriority() 메서드를 통해서 1, 5에 대한 값으로 지정하여서 메시지를 전송하게끔 구성하였습니다.
package com.adjh.springbootrabbitmq.service.impl;
import com.adjh.springbootrabbitmq.dto.MessageDto;
import com.adjh.springbootrabbitmq.service.ProducerQueueService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 다양한 큐를 테스트 해보기 위한 서비스 구현체입니다.
*
* @author : lee
* @fileName : ProducerQueueServiceImpl
* @since : 24. 6. 10.
*/
@Service
public class ProducerQueueServiceImpl implements ProducerQueueService {
private final RabbitTemplate rabbitTemplate;
public ProducerQueueServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 우선순위가 5인 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void sendPriority5Queue(MessageDto messageDto) {
// [STEP1] MessageProperties 인스턴스 구성
MessageProperties mpb = MessagePropertiesBuilder.newInstance()
.setPriority(5)
.build();
ObjectMapper objectMapper = new ObjectMapper();
try {
// [STEP2] 메시지 객체 직렬화 수행
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// [STEP3] 직렬화 객체와 메시지 정보로 메시지 객체 구성
Message message = new Message(objectToJSON.getBytes(), mpb);
// [STEP4] Direct Exchange를 이용하여 Routing Key와 함께 메시지 전달
rabbitTemplate.convertAndSend("exchange.direct.priorityQueue", "classicPriorityQueue", message);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
/**
* 우선순위가 1인 메시지 전송
*
* @param messageDto
*/
@Override
public void sendPriority1Queue(MessageDto messageDto) {
// [STEP1] MessageProperties 인스턴스 구성
MessageProperties mpb = MessagePropertiesBuilder.newInstance()
.setPriority(1)
.build();
ObjectMapper objectMapper = new ObjectMapper();
try {
// [STEP2] 메시지 객체 직렬화 수행
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// [STEP3] 직렬화 객체와 메시지 정보로 메시지 객체 구성
Message message = new Message(objectToJSON.getBytes(), mpb);
// [STEP4] Direct Exchange를 이용하여 Routing Key와 함께 메시지 전달
rabbitTemplate.convertAndSend("exchange.direct.priorityQueue", "classicPriorityQueue", message);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
3.5. API 호출
💡 API 호출 -1
- 우선순위가 5인 메시지를 전송하는 API에 아래와 같이 구성하였고 아래의 우선순위 1인 API와 번갈아가며 호출하였습니다.
💡 API 호출 -2
- 우선순위가 1인 메시지를 전송하는 API에 아래와 같이 구성하였고 아래의 우선순위 5인 API와 번갈아가며 호출하였습니다.
4. 결과 확인
4.1. 큐 구성 확인
💡 큐 구성 확인
- 현재는 API 호출을 하지 않아서 빈 Queue를 확인하였습니다.
4.2. 우선순위 큐 적재 확인
💡 우선순위 큐 적재 확인
- 번갈아가면서 총 6번의 API 호출을 수행하였고, 해당 Queue 내에 적재됨을 확인하였습니다.
4.3. 소비자 서버 실행
💡 소비자 서버 실행
- 큐에 적재된 데이터를 받는 소비자를 구성하고 해당 데이터를 확인합니다.
@RabbitListener(queues = "classicPriorityQueue")
public void receiveClassicPriorityQueue(String msg) {
System.out.println("classicPriorityQueue 내의 결과값을 받습니다 : " + msg);
}
4.4. 최종 결과 확인
💡 최종 결과 확인
- 아래와 같이 번갈아가며 API를 호출했으나 Queue에서 나오는 형태는 우선순위가 높은(5) 경우가 가장 먼저 출력이 됨이 확인되었습니다.
4) 메시지 큐의 종류-3: 쿼럼 큐(Quorum Queue)
💡 쿼럼 큐(Quorum Queue)
- 라프트 컨센서스 알고리즘(Raft Consensus Algorithm)을 기반으로 메시지 브로커에서 메시지 손실을 방지하게 개발된 기능으로 메시지를 복제하여 여러 노드에 분산 저장하는 방식을 의미합니다.
- 예를 들어서 3개의 노드로 구성된 RabbitMQ 클러스터에서 쿼럼 큐를 사용하면, 데이터 일관성을 유지하면서 안전하게 처리할 수 있습니다. 하나의 노드가 다운되더라도 다른 노드들이 메시지 동기화를 유지하므로 서비스 중단 없이 동작할 수 있습니다.
- 이를 통해서 데이터 안정성을 확보하는 방식이며 클러스터 내의 노드 일부가 실패하더라도 메시지가 손실되지 않도록 보장합니다.
- 이는 메시지 전달의 안정성을 높이지만 각 메시지를 여러 노드에 복제해야 하므로 성능에 영향을 미칠 수 있습니다. 그렇기에 사용 여부는 특정 애플리케이션의 요구사항과 트레이드오프를 고려하여 결정해야 합니다.
[더 알아보기]
💡 라프트 컨센서스 알고리즘(The Raft Consensus Algorithm)
- 분산 시스템에서 노드 간에 합의를 이루기 위한 알고리즘을 의미합니다. 이를 통해 시스템의 모든 노드가 동일한 순서로 로그 항목을 복제하도록 보장하여, 시스템 전체가 동일한 상태를 유지하도록 합니다.
💡 쿼럼(Quorum)
- 분산 컴퓨팅에서 다수의 노드에 대한 동의나 합의를 의미합니다. RabbitMQ의 쿼럼 큐는 메시지를 안전하게 저장하기 위해 클러스터의 절반 이상의 노드가 동의하거나 메시지를 저장하도록 요구하는 방식입니다.
💡 트레이드오프(Trade-Off)
- 한 가지 선택을 함으로써 포기해야 하는 것을 의미합니다. 일반적으로 한 가지 상황에서 이익을 얻기 위해 다른 것을 희생해야 하는 상황을 의미합니다.
- 예를 들어, 품질을 높이려면 비용이 더 들 수 있고, 비용을 줄이려면 품질이 떨어질 수 있습니다. 이와 같이 서로 상반되는 두 요소 중에서 하나를 선택해야 하는 상황을 트레이드오프라고 합니다.
1. 쿼럼 큐 주요 특징
💡 쿼럼 큐 주요 특징
1. 데이터 복제와 복원
- 데이터를 여러 노드에 복제하여 저장합니다. 이로 인해 노드 일부가 실패하더라도 데이터 손실을 방지할 수 있습니다. 또한, 클러스터가 다운되거나 노드가 실패한 후에도 데이터를 복원할 수 있습니다.
2. 메시지 순서 보장
- 메시지를 수신한 순서대로 처리합니다. 이는 동일한 큐에서 메시지를 소비하는 여러 소비자가 있는 경우에도 메시지 순서가 유지됩니다.
3. 쿼럼 선거
- 클러스터 내에서 노드가 실패하거나 네트워크 분할이 발생할 경우, 쿼럼 큐는 쿼럼 선거를 통해 클러스터의 리더를 선출합니다. 이는 데이터의 일관성을 유지하고 클러스터가 계속 작동할 수 있도록 합니다.
4. 메시지 더블링 방지
- 쿼럼 큐는 메시지를 여러 노드에 복제할 때 메시지 더블링이 발생하지 않도록 합니다. 이는 각 메시지가 고유한 ID를 가지고 있어, 같은 메시지가 여러 번 전달되는 것을 방지합니다.
5. 높은 내구성
- 쿼럼 큐는 메시지를 디스크에 저장하므로 시스템 재시작 후에도 메시지 유실이 없습니다. 이는 메시지의 내구성을 보장합니다.
6. 대기열 길이 제한
- 쿼럼 큐는 큐의 길이를 제한할 수 있습니다. 이를 통해 메시지가 무한정 쌓이는 것을 방지하고, 시스템의 성능을 유지할 수 있습니다.
7. 메시지 TTL 지원
- 쿼럼 큐는 메시지에 TTL(Time-To-Live)을 설정할 수 있습니다. 이는 메시지가 일정 시간이 지나면 자동으로 삭제되도록 하는 기능입니다.
2. 쿼럼 큐 사용예시
💡 쿼럼 큐 사용예시
- 해당 예시에서는 쿼럼 큐를 이용하여 데이터를 여러 노드에 복제하여 저장하는 것입니다. 이는 노드 일부가 실패하더라도 데이터 손실을 방지할 수 있다는 장점이 있습니다.
- 이를 위해서 RabbitMQ의 클러스터링 과정이 필요합니다. 이는 하나의 노드에서 가지고 있는 큐에 대한 메시지를 다른 클러스터링 된 노드들과 공유하여 데이터 손실을 방지할 수 있다는 점입니다.
💡 [참고] Docker Compose를 이용하여 RabbitMQ의 클러스터링을 구성합니다.
2.1. 사용예시 구성도
💡 사용예시 구성도
- Service에서는 요청에 따라서 메시지가 전송됩니다. 해당 메시지는 Direct Exchange를 타고 바인딩되어서 쿼럼 큐로 전송이 됩니다.
- 쿼럼 큐에서는 리더와 멤버로 구성이 되어 있어서 리더를 기준으로 수행되며, 리더로 지정된 노드가 수행을 멈춘 경우 멤버로 존재하는 노드가 이를 대신하여 수행합니다.
[ 더 알아보기 ]
💡 쿼럼 큐 내에서 ‘리더’와 ‘멤버’
- '리더'는 쿼럼 큐에서 가장 핵심적인 역할을 수행하며, 모든 쓰기 작업은 '리더'를 통해 이루어집니다. 이는 데이터의 일관성을 유지하기 위한 중요한 점입니다.
- 또한, '리더'는 '멤버'들에게 데이터를 복제하여, 노드가 실패하더라도 데이터 손실을 방지하는 역할을 합니다.
- '멤버'는 '리더'로부터 데이터를 복제받아 저장하는 역할을 합니다. '멤버'들은 '리더'가 실패할 경우, 새로운 '리더'를 선출하여 클러스터가 계속 작동할 수 있도록 보장합니다.
2.2. RabbitMQ 설정 파일 구성
package com.adjh.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 쿼럼 큐(Quorum Queue)를 구성하기 위한 설정 클래스입니다.
*
* @author : lee
* @fileName : RabbitMqQuorumQueueConfig
* @since : 24. 6. 11.
*/
@Configuration
public class RabbitMqQuorumQueueConfig {
/**
* Quorum Queue 구성 : 쿼럼 큐 형태로 구성
*
* @return
*/
@Bean
public Queue quorumQueue() {
return QueueBuilder
// 1. 큐 유지 여부 설정
.durable("quorumQueue")
.quorum()
// 2. 데드 레터 익스체인지 지정
.deadLetterExchange("deadLetterExchange")
// 3. 데드 레터 라우터 지정
.deadLetterRoutingKey("deadLetter")
.build();
}
/**
* Direct Exchange 구성
*
* @return
*/
@Bean
DirectExchange quorumQueueDirectExchange() {
// direct.exchange 이름의 Direct Exchange 구성
return new DirectExchange("exchange.direct.quorumQueue");
}
/**
* Direct Exchange 와 Queue1 간의 바인딩을 수행합니다.
* - Direct Exchange 방식으로 Queue1와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
*
* @param quorumQueueDirectExchange
* @param quorumQueue
* @return
*/
@Bean
Binding classQueueBind(DirectExchange quorumQueueDirectExchange, Queue quorumQueue) {
return BindingBuilder
.bind(quorumQueue)
.to(quorumQueueDirectExchange)
.with("quorumQueue"); // 라우팅 키 (Routing key)
}
}
2.3. RabitMQ 호출 서비스 구성
package com.adjh.springbootrabbitmq.service.impl;
import com.adjh.springbootrabbitmq.dto.MessageDto;
import com.adjh.springbootrabbitmq.service.ProducerQueueService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 다양한 큐를 테스트 해보기 위한 서비스 구현체입니다.
*
* @author : lee
* @fileName : ProducerQueueServiceImpl
* @since : 24. 6. 10.
*/
@Service
public class ProducerQueueServiceImpl implements ProducerQueueService {
private final RabbitTemplate rabbitTemplate;
public ProducerQueueServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 쿼럼 큐로 메시지를 전송합니다.
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void sendQuorumQueue(MessageDto messageDto) {
// [STEP1] MessageProperties 인스턴스 구성
MessageProperties mpb = MessagePropertiesBuilder.newInstance()
.build();
ObjectMapper objectMapper = new ObjectMapper();
try {
// [STEP2] 메시지 객체 직렬화 수행
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// [STEP3] 직렬화 객체와 메시지 정보로 메시지 객체 구성
Message message = new Message(objectToJSON.getBytes(), mpb);
// [STEP4] Direct Exchange를 이용하여 Routing Key와 함께 메시지 전달
rabbitTemplate.convertAndSend("exchange.direct.quorumQueue", "quorumQueue", message);
} catch (JsonProcessingException jpe) {
System.out.println("파싱 오류 발생");
}
}
}
2.4. API 호출
💡 API 호출
- 구성한 서비스로 메시지를 전송하여 큐에 적재합니다.
3. 결과 확인
💡 결과 확인
- 아래와 같이 리더로 rabbitmq-1이 존재하며, 멤버로는 이외의 노드 간에 데이터가 복제가 되어 손실을 방지합니다.
오늘도 감사합니다😀
반응형
'Java > Message Queue' 카테고리의 다른 글
[Java] Spring Boot AMQP RabbitMQ 이해하기 -5 : TTL 및 데드 레터링 사용예시 (0) | 2024.06.05 |
---|---|
[Java] Spring Boot AMQP RabbitMQ 이해하기 -4 : RabbitMQ Exchange 종류 별 이해 및 사용예시 (0) | 2024.05.30 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -3 : Java 구축 및 간단 예제 (0) | 2023.10.21 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -2 : 로컬 환경 구성 (2) | 2023.10.15 |
[Java] Spring Boot AMQP RabbitMQ 이해하기 -1 : 구조 및 종류 (1) | 2023.10.14 |