💡 RabbitMQ - 오픈 소스 기반 메시지 브로커 소프트웨어로 분산 시스템에서 ‘메시지나 데이터’를 전달하기 위한 중간 매개체(미들웨어)로 사용됩니다.
- AMQP(Advanced Message Queuing Protocol)를 기반으로 하며 다양한 프로그래밍 언어와 플랫폼에서 사용할 수 있습니다. - 메시지 큐를 사용하여 메시지의 생산자와 소비자 사이에 '비동기적인 통신'함으로써 메시지는 큐에 저장되어 생산자와 소비자가 독립적으로 작동할 수 있습니다. 또한 RabbitMQ를 이용하여 메시지를 안전하게 전달하고 처리하는데 중점을 둡니다.
- 메시지를 생성하고 소비하는 클라이언트 간에 메시지를 전달하는 미들웨어입니다. ex) Apache Kafka, RabbitMQ, Amazon Kinesis
교환기(Exchange)
- 메시지를 수신하고 처리할 대상을 결정하는 구성요소를 의미합니다. - 주로 메시지를 수신하고 라우팅 알고리즘을 통해 특정 큐로 보내는 역할을 수행합니다.
소비자(Consumer)
- 메시지 브로커에서 메시지를 가져와서 ‘수신’하고 ‘처리’를 하는 주체를 의미합니다.
바인딩(Binding)
- 클라이언트 애플리케이션과 메시징 시스템 간의 연결을 하는 과정을 의미합니다.
메시지 큐(Message Queue)
- 메시징 시스템에 저장된 메시지를 의미하며 송신자가 메시지를 전송하면 이를 수신자가 처리할때까지 대기시킵니다
2) RabbitMQ TTL(Time-To-Live)
💡 RabbitMQ TTL(Time-To-Live)
- 메시지나 큐에 설정할 수 있는 '속성'을 의미합니다. 이 속성은 메시지를 얼마 동안 큐에 보관할지에 대한 시간을 설정합니다.
- 속성의 단위는 초 단위로 지정하며, 시간이 지나면 메시지의 소비 관계없이 RabbitMQ 서버는 메시지 큐를 자동으로 삭제합니다. - 메시지가 추가될 때 설정되며 각각 메시지마다 다르게 설정됩니다. - 메시지의 지연 전달, 메시지의 만료, 또는 메시지의 소비를 제어하는데 유용하게 사용될 수 있습니다.
- 메시지에 대해 '개별적'으로 TTL을 설정할 수 있습니다. 이는 메시지가 큐에 들어갈 때 설정하며 이 메시지에 대한 특정 시간 또는 큐에 보관하도록 지정합니다.
- 설정하는 방법은 메시지 속성에 'expiration' 필드를 사용하여 수행할 수 있습니다. 단위는 밀리초입니다.
💡 메시지 TTL 설정 예시
- MessageProperties의 setExpiration 메서드의 값을 지정하여서 메시지의 TTL의 시간을 지정합니다. - 해당 예시에서는 1000(1초)으로 지정하였습니다.
/**
* Direct 방식을 이용하여 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void directToDeadLetterSendMessage(MessageDto messageDto) {
ObjectMapper objectMapper = new ObjectMapper();
try {
// [STEP1] DTO -> String 직렬화 수행
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// [STEP2] 메시지 속성 지정
MessageProperties messageProperties = new MessageProperties();
// [STEP3] 메시지의 Expiration 지정(TTL : 1초)
messageProperties.setExpiration("1000");
// [STEP4] 메시지 객체로 구성
Message message = new Message(objectToJSON.getBytes(), messageProperties);
// [STEP5] 라우터를 기반으로 큐에 메시지 전송
rabbitTemplate.convertAndSend("exchange.direct.processing", "processing", message);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
2. 큐 TTL 설정
💡 큐 TTL 설정
- 큐에 있는 '모든 메시지'에 동일하게 적용하는 방식입니다.
- 설정하는 방법은 큐에 매개변수에 'message-ttl' 인수를 사용하여 설정할 수 있습니다. 단위는 밀리초입니다.
💡 큐 TTL 설정 예시
- 큐를 구성할 때 인수인 message-ttl를 추가하여서 큐의 TTL을 지정합니다.
@Bean
public Queue processingQueue() {
return QueueBuilder.durable("processingQueue")
.withArgument("x-message-ttl", 1000) // 큐의 TTL 지정
.build();
}
3. 메시지 TTL과 큐 TTL의 우선순위
💡 메시지 TTL과 큐 TTL의 우선순위
- 메시지 TTL과 큐 TTL의 속성이 모두 지정된 경우, 더 짧은 TTL을 설정한 것이 우선적으로 적용이 됩니다. - 이러한 TTL은 시간이 지나면 큐에서 자동으로 삭제가 됩니다.
3) 메시지 큐 전달 실패 처리 : 데드 레터링, 데드 레터 익스체인지, 데드 레터 큐
1. 데드 레터링(Dead Lettering)
💡 데드 레터링(Dead Lettering)
- 메시지가 정상적으로 처리되지 못하고 RabbitMQ의 큐에서 제거될 때 발생하는 처리 방식을 말합니다. - 메시지가 처리되지 못하면 이런 메시지는 데드 레터 큐라는 특별한 큐로 보내집니다. 데드 레터링이 발생하는 경우는 다음과 같습니다.
1.1. 데드 레터링이 발생하는 경우
💡 데드 레터링이 발생하는 경우 - 해당 경우 큐에 있는 메시지 생성자(Message Producer)가 만든 메시지가 메시지 소비자(Message Consumers)에게 전달되지 못하는 경우입니다.
1. 메시지가 큐에서 거부(Reject)되거나 NACK 상태 혹은 다시 큐(requeue)로 돌아오는 경우
2. 메시지가 큐에 있었던 시간이 메시지의 TTL(Time-To-Live)을 초과하여 만료된 경우
- 메시지를 받았지만 처리하지 못한 상태를 의미하며, 메시지를 받은 수신자가 메시지를 처리하지 못하거나 오류가 발생했을 때를 의미합니다.
2. 데드 레터 익스체인지 (DLX: Dead Letter Exchange)
💡 데드 레터 익스체인지 (DLX: Dead Letter Exchange)
- RabbitMQ에서 제공하는 특별한 익스체인지 유형으로 메시지가 큐에서 제거되는 경우에 사용됩니다.
- Direct, Fanout, Headers, Topic 익스체인지와 별개가 아니며 이러한 익스체인지를 기반으로 구성이 됩니다. - 데드 레터 익스체인지를 통해 메시지 전송 실패가 발생한 메시지 들은 '데드 레터 큐'로 전달하게 됩니다. 이를 통해 메시지가 손실되는 것을 방지하고, 문제를 분석하고 해결하는데 도움을 줍니다. - 또한, 데드 레터 익스체인지를 사용하면 시스템의 안정성을 높이고, 메시지 처리에 대한 더 많은 유연성을 얻을 수 있습니다.
- 데드 레터 익스체인지를 설정하려면, 큐를 선언할 때 'x-dead-letter-exchange'라는 특별한 매개변수를 설정해야 합니다. 이 매개변수의 값은 메시지가 큐에서 제거될 때 메시지를 받을 익스체인지의 이름이어야 합니다.
2.2. 데드 레터 익스체인지 (DLX: Dead Letter Exchange) 구성 예시
💡 데드 레터 익스체인지 (DLX: Dead Letter Exchange) 구성 예시
- 이벤트 생성자(Event PRODUCER)와 이벤트 소비자(Event CONSUMER) 관계에서 데드 레터 익스체인지는 이벤트 생성자(Event PRODUCER) 내에 구성합니다.
- 이벤트 생성자가 이벤트 소비자에게 전달하기 위해 큐를 구성하였을 때, 이 큐 내에서는 속성 값으로 x-dead-letter-exchange, x-dead-letter-routing-key를 지정하여서, 메시지 전송 실패 시 Dead Letter Exchange로 라우팅 되도록 처리를 합니다. - 해당 익스체인지 내에서는 Dead Letter Queue 내에서 실패 메시지가 적재되도록 라우팅 하는 역할을 수행합니다.
@Configuration
public class RabbitMqDeadLetterConfig {
/**
* Queue 구성 : Dead Letter Queue로 이용
* - 성공적으로 처리하지 못한 메시지가 해당 큐에 들어옵니다.
*
* @return
*/
@Bean
public Queue deadLetterQueue() {
return new Queue("deadLetterQueue", true);
}
/**
* Queue 구성 : processingQueue 이름의 큐를 구성
* - 해당 큐에서는 속성 값으로 x-dead-letter-exchange가 발생시 deadLetterExchange로 라우팅 됩니다
* - 해당 큐에서는 속성 값으로 x-dead-letter-routing-key를 통해 Direct Queue의 라우팅 키를 전달하여 라우팅 됩니다.
*
* @return
*/
@Bean
public Queue processingQueue() {
return QueueBuilder.durable("processingQueue")
.withArgument("x-dead-letter-exchange", "exchange.direct.deadLetter")
.withArgument("x-dead-letter-routing-key", "deadLetter")
.build();
}
/**
* Direct Exchange 구성 : Dead Letter Exchange로 라우팅을 하는데 사용
* - 성공적으로 처리하지 못한 메시지를 메시지 큐(deadLetterQueue)로 전달하는 역할을 수행합니다.
*
* @return
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("exchange.direct.deadLetter");
}
/**
* Direct Exchange 구성 : processingQueue를 라우팅 하는데 사용
*
* @return
*/
@Bean
public DirectExchange processingExchange() {
return new DirectExchange("exchange.direct.processing");
}
}
💡 RabbitMQ 관리자에서 생성된 Exchange를 확인할 수 있습니다.
3. 데드 레터 큐(Dead Letter Queue)
💡 데드 레터 큐(Dead Letter Queue)
- 메시지 브로커 시스템에서 메시지가 처리되지 못하고 실패하였을 때, 해당 메시지를 보관하고 분석하기 위한 큐를 의미합니다. - 큐에 저장되는 시점은 메시지가 처리되지 못하고, 전송하려는 큐에서 제거되는 시점에 메시지를 저장합니다.
- Dead Letter Queue로 라우팅 되기 위해 사용되는 매개변수에 대해 알아봅니다.
매개변수
설명
x-dead-letter-exchange
메시지가 큐에서 처리되지 못하고 제거될 경우, 메시지를 전달할 대상인 데드 레터 익스체인지의 이름을 지정합니다.
x-dead-letter-routing-key
데드 레터 익스체인지로 전달되는 메시지를 라우팅하기 위한 키를 지정합니다. 이 키는 데드 레터 큐로 메시지를 정확히 전달하는 데 사용됩니다.
3.2. 데드 레터 큐(Dead Letter Queue) 처리 과정
💡 데드 레터 익스체인지 (DLX: Dead Letter Exchange) 처리 과정
1. 메시지 생성자(Producer)가 생성한 메시지가 Exchange로 전달됩니다.
2. Exchange는 처리방식에 따라 다르게 라우팅 되어 Queue에 메시지가 적재됩니다.
3. Queue를 수신하고 있던 이벤트 소비자(Consumer)에게 전달이 됩니다. (* 해당 부분에서 메시지 전송에 실패를 하게 됩니다.)
4. 메시지 전송에 실패한 경우, ‘데드 레터 익스체인지(DLX)’로 전달되어 '데드 레터 큐'로 라우팅 됩니다.
5. 데드 레터 큐에서는 전송 실패 메시지를 적재하며 이를 관리합니다.
3.3. 데드 레터 큐(Dead Letter Queue) 구성 예시
💡 데드 레터 큐(Dead Letter Queue) 구성 예시
- 이벤트 생성자(Event PRODUCER)와 이벤트 소비자(Event CONSUMER) 관계에서 데드 레터 익스체인지는 이벤트 생성자(Event PRODUCER) 내에 구성합니다. - 이벤트 생성자가 이벤트 소비자에게 전달하기 위해 큐를 구성하였고 메시지 전송 실패가 발생하였을 때, Dead Letter Exchange로 라우팅 되도록 처리가 됩니다. - 라우팅을 통해 Dead Letter Queue와 바인딩되어서 전달이 메시시 전송 실패 메시지가 큐에 적재가 됩니다.
package com.adjh.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ의 메시지 생성자(Message Producer) 설정파일 입니다.
* - 기본적인 큐를 구성하고 오류를 강제로 발생 시켜서 DeadLetter을 테스트
*
* @author : lee
* @fileName : RabbitMqDeadLetterConfig
* @since : 24. 6. 4.
*/
@Configuration
public class RabbitMqDeadLetterConfig {
/**
* Queue 구성 : Dead Letter Queue로 이용
* - 성공적으로 처리하지 못한 메시지가 해당 큐에 들어옵니다.
*
* @return
*/
@Bean
public Queue deadLetterQueue() {
return new Queue("deadLetterQueue", true);
}
/**
* Queue 구성 : processingQueue 이름의 큐를 구성
* - 해당 큐에서는 속성 값으로 x-dead-letter-exchange가 발생시 deadLetterExchange로 라우팅 됩니다
* - 해당 큐에서는 속성 값으로 x-dead-letter-routing-key를 통해 Direct Queue의 라우팅 키를 전달하여 라우팅 됩니다.
*
* @return
*/
@Bean
public Queue processingQueue() {
return QueueBuilder.durable("processingQueue")
.withArgument("x-dead-letter-exchange", "exchange.direct.deadLetter")
.withArgument("x-dead-letter-routing-key", "deadLetter")
.build();
}
/**
* Direct Exchange 구성 : Dead Letter Exchange로 라우팅을 하는데 사용
* - 성공적으로 처리하지 못한 메시지를 메시지 큐(deadLetterQueue)로 전달하는 역할을 수행합니다.
*
* @return
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("exchange.direct.deadLetter");
}
/**
* Direct Exchange 구성 : processingQueue를 라우팅 하는데 사용
*
* @return
*/
@Bean
public DirectExchange processingExchange() {
return new DirectExchange("exchange.direct.processing");
}
/**
* Direct Exchange 와 deadLetterQueue 간의 바인딩을 수행합니다.
* - Direct Exchange 방식으로 deadLetterQueue와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
*
* @param deadLetterQueue 성공적으로 처리하지 못한 메시지를 담는 공간
* @param deadLetterExchange 성공적으로 처리하지 못한 메시지를 라우팅
* @return
*/
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder
.bind(deadLetterQueue)
.to(deadLetterExchange)
.with("deadLetter");
}
/**
* Direct Exchange 와 processingQueue 간의 바인딩을 수행합니다.
*
* @param processingQueue 메시지를 담을 큐
* @param processingExchange 메시지를 담기 위한 라우팅
* @return
*/
@Bean
public Binding processingBinding(Queue processingQueue, DirectExchange processingExchange) {
return BindingBuilder
.bind(processingQueue)
.to(processingExchange)
.with("processing");
}
}
💡 RabbitMQ 관리자에서 생성된 Queue를 확인할 수 있습니다.
4) 전체 구성 확인하기
💡 전체구성 확인하기 - 일반 큐 전송과정에서 데드 레터링이 발생한 경우 이를 처리하는 방법에 대해 알아봅니다.
1. 개발환경
💡 개발환경
- 아래와 같은 개발환경에서 구성되었고, 아래의 Repository를 확인하시면 소스코드를 확인이 가능합니다.
- 해당 환경에서는 이벤트 소비자에게 전송하려는 큐 - 익스체인지 - 바인딩 구성과 이벤트 소비자에게 전송하지 못하는 메시지를 저장하는 큐 - 익스체인지 - 바인딩 형태로 구성이 되어 있습니다.
- 아래의 구성과 같이 “메시지 전송 성공 여부”에 따라서 이벤트 소비자(Event CONSUMER)에게 전송되느냐 아니면 Dead Letter Queue에 적재되느냐에 따르는 처리가 되어 있습니다.
분류
이름
설명
Queue
processingQueue
- 이벤트 소비자에게 전달할 메시지가 적재되는 공간
Exchange
exchange.direct.processing
- Direct Exchange로 processingQueue로 라우팅을 하기 위해 사용
Binding
processing(routingKey)
- processingQueue와 Exchange간의 바인딩을 수행
Queue
deadLetterQueue
- 이벤트 소비자에게 전달되지 못하고 전송 실패한 메시지가 적재되는 공간
Exchange
exchange.direct.deadLetter
- Direct Exchange로 deadLetterQueue로 라우팅을 하기 위해 사용
Binding
deadLetter(routingKey)
- deadLetterQueue와 Exchange간의 바인딩을 수행
💡 소스코드 설명 - 해당 파일은 RabbitMQ의 메시지 생성자(Message Producer) 설정파일입니다. - 기본적인 큐를 구성하고 오류를 강제로 발생시켜서 DeadLetter을 테스트를 하기 위한 목적으로 구성하였습니다.
1. deadLetterQueue()
- 성공적으로 처리하지 못한 메시지가 해당 큐에 들어옵니다.
2. processingQueue()
- Queue 구성 : processingQueue 이름의 큐를 구성 - 해당 큐에서는 속성 값으로 x-dead-letter-exchange가 발생 시 deadLetterExchange로 라우팅 됩니다 - 해당 큐에서는 속성 값으로 x-dead-letter-routing-key를 통해 Direct Queue의 라우팅 키를 전달하여 라우팅 됩니다.
3. deadLetterExchange()
- Direct Exchange 구성 : Dead Letter Exchange로 라우팅을 하는 데 사용 - 성공적으로 처리하지 못한 메시지를 메시지 큐(deadLetterQueue)로 전달하는 역할을 수행합니다.
4. processingExchange()
- Direct Exchange 구성 : processingQueue를 라우팅 하는 데 사용
5. deadLetterBinding()
- Direct Exchange와 deadLetterQueue 간의 바인딩을 수행합니다. - Direct Exchange 방식으로 deadLetterQueue와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
6. processingBinding()
- Direct Exchange와 processingQueue 간의 바인딩을 수행합니다.
package com.adjh.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ의 메시지 생성자(Message Producer) 설정파일 입니다.
* - 기본적인 큐를 구성하고 오류를 강제로 발생 시켜서 DeadLetter을 테스트
*
* @author : lee
* @fileName : RabbitMqDeadLetterConfig
* @since : 24. 6. 4.
*/
@Configuration
public class RabbitMqDeadLetterConfig {
/**
* Queue 구성 : Dead Letter Queue로 이용
* - 성공적으로 처리하지 못한 메시지가 해당 큐에 들어옵니다.
*
* @return
*/
@Bean
public Queue deadLetterQueue() {
return new Queue("deadLetterQueue", true);
}
/**
* Queue 구성 : processingQueue 이름의 큐를 구성
* - 해당 큐에서는 속성 값으로 x-dead-letter-exchange가 발생시 deadLetterExchange로 라우팅 됩니다
* - 해당 큐에서는 속성 값으로 x-dead-letter-routing-key를 통해 Direct Queue의 라우팅 키를 전달하여 라우팅 됩니다.
*
* @return
*/
@Bean
public Queue processingQueue() {
return QueueBuilder.durable("processingQueue")
.withArgument("x-dead-letter-exchange", "exchange.direct.deadLetter")
.withArgument("x-dead-letter-routing-key", "deadLetter")
.build();
}
/**
* Direct Exchange 구성 : Dead Letter Exchange로 라우팅을 하는데 사용
* - 성공적으로 처리하지 못한 메시지를 메시지 큐(deadLetterQueue)로 전달하는 역할을 수행합니다.
*
* @return
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("exchange.direct.deadLetter");
}
/**
* Direct Exchange 구성 : processingQueue를 라우팅 하는데 사용
*
* @return
*/
@Bean
public DirectExchange processingExchange() {
return new DirectExchange("exchange.direct.processing");
}
/**
* Direct Exchange 와 deadLetterQueue 간의 바인딩을 수행합니다.
* - Direct Exchange 방식으로 deadLetterQueue와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
*
* @param deadLetterQueue 성공적으로 처리하지 못한 메시지를 담는 공간
* @param deadLetterExchange 성공적으로 처리하지 못한 메시지를 라우팅
* @return
*/
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder
.bind(deadLetterQueue)
.to(deadLetterExchange)
.with("deadLetter");
}
/**
* Direct Exchange 와 processingQueue 간의 바인딩을 수행합니다.
*
* @param processingQueue 메시지를 담을 큐
* @param processingExchange 메시지를 담기 위한 라우팅
* @return
*/
@Bean
public Binding processingBinding(Queue processingQueue, DirectExchange processingExchange) {
return BindingBuilder
.bind(processingQueue)
.to(processingExchange)
.with("processing");
}
}
2. ProducerService.java
💡 ProducerService
- 해당 인터페이스 내에서는 directToDeadLetterSendMessage() 함수를 이용하여 구현합니다.
package com.adjh.springbootrabbitmq.service;
import com.adjh.springbootrabbitmq.dto.MessageDto;
/**
* 메시지 생성자의 Exchange 별 서비스 처리
*
* @author : jonghoon
* @fileName : ProducerService
* @since : 5/25/24
*/
public interface ProducerService {
void directToDeadLetterSendMessage(MessageDto messageDto); // Direct Exchange 방식을 이용하며 Dead Letter 테스트 하기 위해 이용
}
3. ProducerServiceImpl.java
💡 ProducerServiceImpl.java
- 인터페이스의 구현체를 구성합니다. 해당 부분에서는 사전에 구성한 RabbitTemplate을 이용하여서 convertAndSend() 메서드를 호출하여 메시지를 전송합니다.
- 메시지 전송에서는 익스체인지 이름으로 exchange.direct.processing를 지정하고 Direct Exchange이기에 RoutingKey로 “processing”를 지정해서 데이터를 전송합니다.
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* Direct 방식을 이용하여 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void directToDeadLetterSendMessage(MessageDto messageDto) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String objectToJSON = objectMapper.writeValueAsString(messageDto);
rabbitTemplate.convertAndSend("exchange.direct.processing", "processing", objectToJSON);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
6) 소스코드 구성 확인하기 : 이벤트 소비자
1. DeadLetterComponent
1.1. 성공 메시지 테스트
💡 DeadLetterComponent : 성공 메시지 테스트
- processingQueue 큐를 바라보고 있다가 성공적인 메시지를 받는 것부터 확인합니다.
@Component
public class DeadLetterComponent {
@RabbitListener(queues = "processingQueue")
public void processingQueueMessage(String msg) {
System.out.println("Processing Queue 내의 결과 값을 반환 받습니다 " + msg);
}
}
💡 이벤트 생성자는 구성한 서비스를 API로 호출하여서 메시지를 전송하여 큐(processingQueue)에 적재합니다.
💡 이벤트 소비자는 processingQueue 큐를 바라보고 있다가 파라미터로 문자열 형태의 메시지를 성공적으로 전달받았습니다.
1.2. 실패 메시지 테스트
💡 DeadLetterComponent : 실패 메시지 테스트
- 이벤트 소비자 구성을 두 개의 큐를 바라보도록 구성하였습니다. - processingQueue의 경우는 정상적으로 처리되었을 때, 들어오는 메시지 리스너입니다. - deadLetterQueue의 경우는 비정상적으로 처리가 되었을 때, 들어오는 메시지 리스너입니다.
package com.adjh.springbootrabbitmqconsumer.component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 이벤트 소비자를 구성합니다.
*
* @author : lee
* @fileName : DeadLetterComponent
* @since : 24. 6. 5.
*/
@Component
public class DeadLetterComponent {
@RabbitListener(queues = "processingQueue")
public void processingQueueMessage(String msg) {
System.out.println("Processing Queue 내의 결과 값을 반환 받습니다 " + msg);
}
@RabbitListener(queues = "deadLetterQueue")
public void receiveErrorMessage(String msg) {
System.out.println("오류가 발생하였을때 저장되는 메시지 : " + msg);
}
}
💡 ProducerServiceImpl - 강제로 실패하는 경우를 만들기 위해 ‘TTL’의 시간을 적게 두어서 메시지 유효시간을 줄여 구성하였습니다. - 기존의 서비스 구현체 내에 MessageProperties 클래스로 setExpiration(”1000”)시간을 1초로 두어서 큐에서 메시지가 유효한 시간을 줄여 강제로 실패 메시지를 구성합니다.
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
/**
* Direct 방식을 이용하여 메시지 전송
*
* @param messageDto
*/
@Override
@Transactional(readOnly = true)
public void directToDeadLetterSendMessage(MessageDto messageDto) {
ObjectMapper objectMapper = new ObjectMapper();
try {
// [STEP1] DTO -> String 직렬화 수행
String objectToJSON = objectMapper.writeValueAsString(messageDto);
// [STEP2] 메시지 속성 지정
MessageProperties messageProperties = new MessageProperties();
// [STEP3] 메시지의 Expiration 지정(TTL : 1초)
messageProperties.setExpiration("1000");
// [STEP4] 메시지 객체로 구성
Message message = new Message(objectToJSON.getBytes(), messageProperties);
// [STEP5] 라우터를 기반으로 큐에 메시지 전송
rabbitTemplate.convertAndSend("exchange.direct.processing", "processing", message);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
💡 이벤트 생성자는 구성한 서비스를 API로 호출하여서 메시지를 전송하여 큐(processingQueue)에 적재하려 시도하지만, TTL이 1초여서 해당 메시지는 유효한 상태가 아닙니다.
💡 이벤트 소비자는 메시지의 TTL이 지나서 deadLetterQueue로 메시지가 전달되었습니다. - 정상적인 경우라면 processingQueue 큐로 전달이 됩니다.