Java/Message Queue

[Java] Spring Boot Kafka 이해하고 활용하기 -1 : 주요 특징, 활용 사례, 클래스/메서드

adjh54 2025. 1. 11. 20:00
728x170
해당 글에서는 Spring Kafka에 대해 이해하고 주요 특징 및 활용되는 사례와 주요 클래스와 메서드에 대해 이해를 돕기 위해 작성한 글입니다.



 
 
 

 💡 [참고] Docker 기반의 초기 Apache Kafka 구성이나 혹은 Apache Kafka 이론에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
분류 설명 링크
Docker Docker 기반 Apache Kafka 환경 구축하기 https://adjh54.tistory.com/637
이해하기 Apache Kafka 이해하기 -1: 주요 모델 및 구성요소 https://adjh54.tistory.com/636
이해하기 Apache Kafka 이해하기 -2 : Zookeeper, KRaft(Kafka Raft), 비교 https://adjh54.tistory.com/639
Java 활용 Spring Boot Kafka 이해하고 활용하기 -1 : 주요 특징, 활용 사례, 클래스/메서드 https://adjh54.tistory.com/640
Java 활용 Spring Boot Kafka 이해하고 활용하기 -2 : 환경 구성 및 사용 예시 https://adjh54.tistory.com/641
     
Github Docker Compose를 이용한 KRaft 구성 예시 Repository https://github.com/adjh54ir/multiflex-docker/tree/main/simple-apache-kafka
Github Spring Boot Kafka를 이용한 예시 Repository https://github.com/adjh54ir/blog-codes/tree/main/spring-boot-kafka

 
 
 

1) Apache Kafka


💡 Apache Kafka

- 실시간으로 스트림 데이터를 수집하고 처리하는 데 최적화된 ‘분산 이벤트 스트리밍 플랫폼(Distributed Data Streaming Platform)’입니다. 이는 실시간으로 발생하는 대량의 데이터를 중앙 허브를 통해 흐르도록 설계되어 있습니다. 이를 통해 데이터의 일관성을 유지하고 시스템 전반의 복잡성을 줄일 수 있습니다.

- 이러한 다량의 데이터는 A 지점에서 B 지점까지만의 데이터가 이동되는 것뿐만 아니라 A 지점에서 Z지점까지의 필요한 모든 곳에 대규모 데이터를 동시에 전달할 수 있습니다.
- 현대적인 데이터 파이프라인 구축에 필수적인 도구로 자리잡았으며, Netflix, LinkedIn, Uber 등 많은 기업들이 핵심 인프라로 사용하고 있습니다.

https://docs.confluent.io/kafka/introduction.html

 

 

Kafka란? - Apache Kafka 설명 - AWS

Kafka는 실시간 스트리밍 데이터 파이프라인과 실시간 스트리밍 애플리케이션을 구축하는 데 사용됩니다. 데이터 파이프라인은 데이터를 안정적으로 처리하고 한 시스템에서 다른 시스템으로

aws.amazon.com

 
 

2) Spring Kafka(Spring for Apache Kafka)


💡 Spring Boot Kafka

- Apache Kafka를 Spring 프레임워크에서 쉽게 Kafka 기반 메시징 솔루션 개발을 사용할 수 있도록 해주는 프로젝트입니다.

- 메시지를 보내기 위한 상위 수준 추상화로 KafkaTemplate을 제공합니다. 또한, @KafkaListener 어노테이션을 이용하여 리스너 컨테이너가 있는 메시지 기반 POJO에 대한 지원도 제공합니다.

- 다양한 메시징 시스템(JMS, RabbitMQ, Kafka 등)에 대해 일관된 프로그래밍 모델을 제공합니다. 예를 들어서, 메시지 전송을 위한 KafkaTemplate, JmsTemplate, RabbitTemplate를 이용하거나 메시지 수신을 위한 어노테이션으로 @KafkaListener, @JmsListener, @RabbitListener를 사용하면서 다양한 메시징 시스템에 비슷한 패턴과 사용법을 공유하여 다른 시스템으로 전환할 때 개발자가 쉽게 적응할 수 있습니다.
 

Spring for Apache Kafka

The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions. It provides a "template" as a high-level abstraction for sending messages. It also provides support for Message-driven PO

spring.io

 

1. 주요 특징


💡 주요 특징

- 대규모 메시지 처리, 실시간 스트리밍 데이터 처리, 이벤트 기반 아키텍처 구현 등에 적합한 솔루션을 제공합니다.

- 주요한 특징으로는 복잡한 설정에 대해서 application.properties/yaml을 통해서 간단하게 환경설정을 하며, Spring에서 설정해 주는 추상화 계층을 통해서 단순화하여 메시지를 전송/수신할 수 있다는 점이 있습니다.
특징 설명
스프링의 추상화 계층 Kafka의 복잡한 설정과 구현을 스프링의 추상화 계층으로 단순화
템플릿 기반 접근 KafkaTemplate을 통한 간편한 메시지 발행 기능 제공
리스너 컨테이너 @KafkaListener 어노테이션을 통한 선언적 메시지 소비 지원
스프링 부트 자동 설정 application.properties/yaml을 통한 간편한 설정 관리

 

[ 더 알아보기 ]

💡 추상화(Abstraction)

- 복잡한 내부 구현을 숨기고 더 단순하고 이해하기 쉬운 인터페이스를 제공하는 것을 의미합니다.

- Spring Kafka에서는 Kafka의 복잡한 설정과 동작을 간단한 어노테이션(@KafkaListener)이나 메서드(KafkaTemplate.send())로 추상화하여 개발자가 쉽게 사용할 수 있도록 합니다.

 
 

2. Spring Kafka vs Kafka Client


💡 Spring Kafka vs Kafka Client

- Spring 환경에서 Apache Kafka를 이용하기 위해서 Spring Kafka와 Kafka Client 라이브러리를 활용할 수 있습니다. 이에 대한 차이를 이해하고 각각에 대해서 이해해봅니다.

 

특징 Spring Kafka Kafka Client
추상화 수준 높은 수준의 추상화 제공 낮은 수준의 직접적인 제어
설정 방식 Spring Boot 자동 설정, properties/yaml 기반 상세한 수동 설정 필요
통합성 Spring 생태계와 원활한 통합 독립적인 사용
개발 편의성 어노테이션 기반 (@KafkaListener 등) 명시적인 코드 구현 필요
유연성 제한적인 커스터마이징 완전한 커스터마이징 가능
학습 곡선 Spring 사용자에게 친숙 Kafka 자체에 대한 깊은 이해 필요

 

[ 더 알아보기]

💡 추상화 수준이 높거나 낮다는 의미는 무슨 말일까?

- 추상화 수준이 높을수록 사용하기는 쉽지만 유연성이 떨어지고, 추상화 수준이 낮을수록 더 많은 통제가 가능하지만 구현이 복잡할 수 있다는 의미입니다.

 
 

2.1. Spring Kafka(Spring for Apache Kafka)


💡 Spring Kafka(Spring for Apache Kafka)

- Spring에서 제공하는 공식 Kafka 클라이언트로, 스프링의 추상화 계층을 제공하며 스프링 생태계와 잘 통합됩니다.

 
Maven Repository: org.springframework.kafka » spring-kafka

// <https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka>
implementation 'org.springframework.kafka:spring-kafka:3.3.1'

 
 
 

2.2. Kafka Client


💡 Kafka Client

- Apache Kafka에서 제공하는 순수 Java 클라이언트로, 더 낮은 수준의 제어가 필요할 때 사용됩니다.

 
Maven Repository: org.apache.kafka » kafka

// <https://mvnrepository.com/artifact/org.apache.kafka/kafka>
implementation 'org.apache.kafka:kafka_2.13:3.9.0'

 
 

3) Spring Kafka 활용 사례


💡 Spring Kafka 활용 사례

- Spring Kafka는 다양한 실제 비즈니스 시나리오에서 활용되며, 특히 실시간 데이터 처리와 마이크로서비스 아키텍처에서 중요한 역할을 합니다.

 
 

💡 [참고] LinkedIn에서 Apache Kafka를 적용한 아키텍처

- 아래와 같이 LinkedIn에서는 Log Search, Monitoring, Security & Fraud, Real-time Analytics 등의 기능으로 사용되고 있습니다.

https://www.confluent.io/blog/event-streaming-platform-1/

 

1. 실시간 로그 수집 시스템


💡 실시간 로그 수집 시스템

- Kafka를 활용하여 여러 서비스의 로그를 실시간으로 수집하여 중앙 집중화된 로깅 시스템으로 전송하는 형태의 시스템을 구성할 수 있습니다.
- 실시간 로그 수집 시스템을 통하여 애플리케이션 성능 모니터링 및 문제 추적에 활용을 합니다.

 
 

💡 ELK Stack(Elasticsearch, Logstash, Kibana)과의 통합하여 마이크로서비스 아키텍처에서 중요한 역할을 합니다.

- Kafka가 로그 데이터를 수집하고 버퍼링 하는 중간 계층 역할 수행
 - Logstash가 Kafka로부터 로그를 소비하여 Elasticsearch로 전달
- Elasticsearch에서 로그 데이터 인덱싱 및 저장
- Kibana를 통해 시각화 및 분석 수행

https://seculo.tistory.com/18

 

💡 Spring Kafka를 사용하여 로그를 수집하는 서비스 클래스를 구현한 간단한 예시입니다

- LoggingService 클래스는 KafkaTemplate을 사용하여 로그 메시지를 Kafka로 전송합니다.
- sendLogToKafka() 메서드는 로그 메시지를 받아서 'elk-log-topic'이라는 Kafka 토픽으로 전송하며, 이 토픽은 ELK Stack과 연동되어 로그를 처리합니다.
@Service
public class LoggingService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public LoggingService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendLogToKafka(String logMessage) {
        // ELK Stack으로 전달될 로그를 Kafka topic으로 전송
        kafkaTemplate.send("elk-log-topic", logMessage);
    }
}

 
 

2. 실시간 알림 시스템


💡 실시간 알림 시스템

- Kafka를 활용하여 사용자 활동에 기반한 실시간 알림 발송을 하는 시스템으로 구성할 수 있습니다. 이는 푸시 알림, 이메일, SMS 등 다양한 채널로의 메시지 전달하는 역할을 수행합니다.

- 예시로는 A, B 사용자가 있을 때, B 사용자가 A 사용자에게 친구요청을 하는 경우, A 사용자에게 실시간 알람을 전송합니다.
- 즉, B 사용자의 친구요청 처리가 된 이후 A에게 알림을 주는 메시지를 전달할 때, Kafka 입장에서는 B 사용자는 생성자(Producer) 입장이 되어서 Kafka로 정보를 담아 특정 토픽에 메시지를 전송을 하고 완료됩니다.
- 이후 처리는 A 사용자인 소비자(Consumer)가 리스너가 되어서 Kafka의 특정 토픽을 수신하여 메시지가 생기면 이를 수신하여 알림 처리를 수행합니다.

 
 

💡 [참고] 우아한 형제들에서 Kafka를 이용한 실시간 알림 시스템을 이용한 프로세스 구성 예시

https://techblog.woowahan.com/17386/

 

💡 Spring Kafka를 사용하여 실시간 알람을 전송하는 서비스 클래스를 구현한 간단한 예시입니다

1. NotificationService (Producer)

- KafkaTemplate을 사용하여 알림 이벤트를 Kafka에 전송합니다
- 'notification-topic'이라는 토픽으로 NotificationEvent를 발행합니다

2. NotificationConsumer (Consumer)

- @KafkaListener 어노테이션을 사용하여 'notification-topic'의 메시지를 구독합니다
- 수신된 알림 이벤트를 처리하여 실제 알림을 발송하는 로직을 구현합니다
@Service
public class NotificationService {
    private final KafkaTemplate<String, NotificationEvent> kafkaTemplate;

    public void sendNotification(NotificationEvent event) {
        kafkaTemplate.send("notification-topic", event);
    }
}

@Service
public class NotificationConsumer {
    @KafkaListener(topics = "notification-topic")
    public void handleNotification(NotificationEvent event) {
        // 알림 처리 및 발송 로직
    }
}

 
 

3. 마이크로서비스 간 통신


💡 마이크로서비스 간 통신

- Kafka를 활용하여 서비스 간 비동기 통신을 위한 이벤트 기반 아키텍처(EDA: Event-Driven Architecture) 구현을 위해서 사용이 됩니다.

- 주문 처리, 재고 관리 등 비즈니스 프로세스 연동을 하면서 사용자 활동 로깅 및 분석을 하며, 결제 처리 및 알림 발송까지 통신 내에 구성이 가능합니다.

 
 

💡 Kafka를 이용한 마이크로서비스 간 통신을 이용한 프로세스 구성 예시

https://upcurvewave.tistory.com/369

 
 

💡 Spring Kafka를 기반으로 마이크로서비스 간의 통신을 구현한 간단한 예시입니다.

- 아래의 프로세스는 각각 서버가 분리된 마이크로서비스라는 가정하에 주문, 재고, 배송 서비스가 각각 존재합니다. 그리고 이는 주문 서비스 → 재고 서비스 → 배송 서비스 연계되는 처리입니다.

- 주문이 들어오면, 재고를 확인하고, 배송이 되는 각각 서비스 간의 Kafka Topic 내에 메시지로 전달이 되어서 비동기적으로 처리가 되는 예시입니다.

1. OrderService (주문 서비스): KafkaTemplate을 사용하여 주문이 생성될 때 'order-created-topic'으로 주문 이벤트를 발송합니다. 

2. InventoryService (재고 서비스): 'order-created-topic'을 구독하여 주문 이벤트를 수신하고, 재고를 확인한 후 'inventory-processed-topic'으로 재고 처리 결과를 발송합니다.

3.  DeliveryService (배송 서비스): 'inventory-processed-topic'을 구독하여 재고 처리가 완료된 주문에 대해 배송 처리를 시작합니다

- 이러한 서비스 간의 독립성을 보장하며, 장애 격리, 확장성과 유연성을 향상할 수 있다는 장점이 있습니다.
// 주문 서비스
@Service
public class OrderService {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void createOrder(Order order) {
        // 주문 처리 로직
        OrderEvent orderEvent = new OrderEvent(order);
        kafkaTemplate.send("order-created-topic", orderEvent);
    }
}

// 재고 서비스
@Service
public class InventoryService {
    private final KafkaTemplate<String, InventoryEvent> kafkaTemplate;

    @KafkaListener(topics = "order-created-topic")
    public void handleOrderCreated(OrderEvent orderEvent) {
        // 재고 확인 및 처리
        InventoryEvent inventoryEvent = processInventory(orderEvent);
        kafkaTemplate.send("inventory-processed-topic", inventoryEvent);
    }
}

// 배송 서비스
@Service
public class DeliveryService {
    @KafkaListener(topics = "inventory-processed-topic")
    public void handleInventoryProcessed(InventoryEvent inventoryEvent) {
        // 배송 처리 시작
        initiateDelivery(inventoryEvent);
    }
}

 
 

💡 [참고] 이벤트 기반 아키텍처에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
 

[공통/아키텍처] 이벤트 기반 아키텍처(EDA: Event-Driven Architecture) 이해하기 -1 : 정의 및 특징, 주요

해당 글에서는 이벤트 기반 아키텍처(EDA: Event-Driven Architecture)에 대해 이해를 돕기 위해 작성한 글입니다.    💡 이벤트 기반 아키텍처(EDA: Event-Driven Architecture)를 이해하기 이전에 마이크로

adjh54.tistory.com

 
 

4. 실시간 데이터 분석


💡 실시간 데이터 분석

- Kafka를 활용하여 실시간으로 발생하는 대량의 데이터를 수집하고 처리하는 시스템입니다.

- 웹사이트나 앱에서의 사용자 활동을 실시간으로 추적하고 분석하는 사용자 행동 패턴 분석을 수행하거나 다수의 IoT 디바이스에서 발생하는 센서 데이터를 실시간으로 수집 및 모니터링을 하는 데이터 처리를 수행합니다.

- 이를 통해서 실시간 데이터를 기반으로 신속한 비즈니스 의사결정 가능하며, 시스템 이상이나 비정상적인 패턴을 즉시 감지가 가능합니다.

 

💡 Spring Kafka를 사용하여 실시간 데이터 분석하는 서비스 클래스를 구현한 간단한 예시입니다

1. AnalyticsService (Producer)

- KafkaTemplate을 사용하여 사용자 활동 데이터를 'analytics-topic'으로 전송합니다.
- trackUserActivity() 메서드를 통해 UserActivity 객체를 Kafka에 전송합니다.

2. AnalyticsProcessor (Consumer)

- @KafkaListener 어노테이션을 사용하여 'analytics-topic'의 메시지를 구독합니다.
- 수신된 사용자 활동 데이터를 분석하고 통계 처리하는 역할을 수행합니다.
@Service
public class AnalyticsService {
    private final KafkaTemplate<String, UserActivity> kafkaTemplate;

    public void trackUserActivity(UserActivity activity) {
        kafkaTemplate.send("analytics-topic", activity);
    }
}

@Service
public class AnalyticsProcessor {
    @KafkaListener(topics = "analytics-topic")
    public void processActivity(UserActivity activity) {
        // 데이터 분석 및 통계 처리
    }
}

 
 
 

3) Spring Kafka 환경설정 클래스/메서드


💡 Spring Kafka 환경설정 클래스/메서드

- Spring Kafka를 Java Configuartion을 통하여 환경 설정을 하는 경우 사용되는 클래스와 메서드를 알아봅니다.

 

1. 환경설정 파일 구성 형태 확인


💡 환경설정 파일 구성 형태 확인

- 해당 파일은 Docker로 구성한 docker-compose.yml 파일입니다. 이를 기준으로 연결 및 관리를 위한 환경설정 파일을 구성합니다.

 

💡 [참고] 아래의 Repository 내에서 Docker-Compose 파일을 확인하실 수 있습니다.
 

multiflex-docker/simple-apache-kafka at main · adjh54ir/multiflex-docker

Docker의 Dockerfile, Docker-Compose 구성 Repository입니다. - adjh54ir/multiflex-docker

github.com

 
 

1.1. yaml 파일을 통한 Kafka 연결 확인


💡 yaml 파일을 통한 Kafka 연결 확인

- yaml 파일을 활용하면 아래와 같이 Kafka에 연결하여서 메시지를 전송하고 수신할 수 있습니다.
spring:
  kafka:
    bootstrap-servers: localhost:29092   
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

구성 설정값 설명
bootstrap-servers localhost:29092 Kafka 서버 접속 주소 (Docker compose의 PLAINTEXT_HOST 포트)
producer.key-serializer StringSerializer 프로듀서 키 직렬화 방식
producer.value-serializer StringSerializer 프로듀서 값 직렬화 방식
consumer.group-id my-group 컨슈머 그룹 식별자
consumer.auto-offset-reset earliest 초기 오프셋 설정 (가장 처음부터 메시지 소비)
consumer.key-deserializer StringDeserializer 컨슈머 키 역직렬화 방식
consumer.value-deserializer StringDeserializer 컨슈머 값 역직렬화 방식

 
 
 

1.2. Java Configuation 방식을 통한 Kafka 연결 확인


💡 Java Configuation 방식을 통한 Kafka 연결 확인

- 위에 yaml 파일을 아래와 같이 Java Configuation 방식을 이용하여서 구성이 가능합니다.
package com.adjh.springbootkafka.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka 연결 설정을 관리하는 관리파일.
 *
 * @author : jonghoon
 * @fileName : KafkaConfig
 * @since : 25. 1. 9.
 */
@Slf4j
@Configuration
public class KafkaConfig {

    // Kafka 연결 서버 주소
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * Producer 설정을 위한 Factory Bean
     * - 메시지 생산자의 직렬화 설정 및 서버 연결 설정을 담당
     *
     * @return ProducerFactory<String, String>
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    /**
     * Consumer 설정을 위한 Factory Bean
     * 메시지 소비자의 역직렬화 설정 및 그룹 ID, 오프셋 설정을 담당
     *
     * @return ConsumerFactory<String, String>
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * Kafka 메시지를 전송하기 위한 템플릿 Bean
     * 실제 애플리케이션에서 메시지 전송시 사용됨
     *
     * @return KafkaTemplate<String, String>
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 
 

[ 더 알아보기 ]

💡 yaml 파일을 사용하거나 Java Configuration 방식을 사용했을 때 뭐가 더 좋은 걸까?

- yaml 파일을 사용하면 가독성이 좋고 계층 구조가 직관적이라는 장점이 있습니다. 그러나 동적 설정에 대해서는 제한적이거나 타입의 안정성이 상대적으로 떨어진다는 단점이 있습니다.
- Java Configuration 방식을 사용하면 타입에 대한 안정성이 보장되고, IDE 내에서 자동완성을 지원한다는 장점이 있습니다. 그러나 상대적으로 많은 코드가 필요하고, 가독성이 낮다는 단점이 있습니다.

- 일반적으로 간단한 설정은 yaml 파일을 사용하고, 복잡한 로직이나 타입 안정성이 중요한 설정은 Java Configuration을 사용하는 것이 권장됩니다.


💡 yaml 파일과 Java Configuration 방식을 둘 다 구성했다면, 어떤 파일이 실행될까?

- 두 설정 방식이 공존할 때는 Java Configuration의 설정이 우선적으로 적용되며, 누락된 설정은 yaml 파일에서 보완하는 방식으로 동작합니다.
- Java Configuration(@Configuration)이 yaml/properties 파일보다 우선순위가 높습니다.
- 동일한 설정이 있는 경우, Java Configuration의 설정이 yaml 파일의 설정을 덮어씁니다.
- Java Configuration에서 정의되지 않은 설정은 yaml 파일의 설정이 사용됩니다.

- 예를 들어서, yaml 파일에서 bootstrap-servers=localhost:29092, group-id=group1로 설정하고 Java Configuration에서 bootstrap-servers=localhost:29093만 설정만 했을 경우

- 최종적으로 bootstrap-servers=localhost:29093(Java Config), group-id=group1(yaml)이 됩니다.

 
 

2. ProducerFactory


💡 ProducerFactory

- Kafka 생성자(Producer) 인스턴스를 생성하고 관리하는 팩토리 클래스입니다.

- Producer의 구성 설정을 캡슐화하고, 필요할 때마다 새로운 Producer 인스턴스를 생성합니다.

 

💡 ProducerFactory 인터페이스의 주요 속성
메서드 리턴 타입 설명
assignReplicas(int partition, List<Integer> replicaList) TopicBuilder 개별 레플리카 할당을 추가합니다.
build() org.apache.kafka.clients.admin.NewTopic NewTopic 객체를 생성합니다.
compact() TopicBuilder 토픽의 정리 정책을 압축으로 설정합니다.
config(String configName, String configValue) TopicBuilder 설정 옵션을 지정합니다.
configs(Map<String,String> configProps) TopicBuilder 여러 설정을 한번에 지정합니다.
name(String name) TopicBuilder 지정된 이름으로 TopicBuilder를 생성합니다.
partitions(int partitionCount) TopicBuilder 파티션 수를 설정합니다.
replicas(int replicaCount) TopicBuilder 레플리카 수를 설정합니다.
replicasAssignments(Map<Integer,List<Integer>> replicaAssignments) TopicBuilder 레플리카 할당을 설정합니다.

 

 

ProducerFactory (Spring for Apache Kafka 3.3.1 API)

getPostProcessors Get the current list of post processors. Returns: the post processors. Since: 2.5.3

docs.spring.io

 

💡 ProducerFactory 사용예시
@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 추가 설정
    configProps.put(ProducerConfig.ACKS_CONFIG, "all");
    configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
    
    return new DefaultKafkaProducerFactory<>(configProps);
}

 
 

3. ConsumerFactory


💡 ConsumerFactory

- Kafka 소비자(Consumer) 인스턴스를 생성하고 관리하는 팩토리 클래스입니다.

- Consumer의 구성 설정을 캡슐화하고, 필요할 때마다 새로운 Consumer 인스턴스를 생성합니다.
 💡 ConsumerFactory 인터페이스의 주요 속성
설정 옵션 설명 주요 특징
BOOTSTRAP_SERVERS_CONFIG Kafka 브로커의 호스트:포트 목록 - 클러스터 연결을 위한 초기 호스트 및 포트 정보
- 쉼표로 구분된 목록 형태
GROUP_ID_CONFIG Consumer 그룹 식별자 - 동일한 그룹의 Consumer들은 파티션을 분배하여 처리
- 로드 밸런싱과 장애 복구에 활용
KEY_DESERIALIZER_CLASS_CONFIG 메시지 키의 역직렬화 방식 - StringDeserializer, IntegerDeserializer 등 기본 제공
- 커스텀 역직렬화기 구현 가능
VALUE_DESERIALIZER_CLASS_CONFIG 메시지 값의 역직렬화 방식 - JSON, Avro 등 다양한 형식 지원
- 복잡한 객체는 JsonDeserializer 활용
AUTO_OFFSET_RESET_CONFIG 초기 오프셋 설정 - earliest: 가장 처음부터 메시지 소비
- latest: 가장 최근 메시지부터 소비
- none: 이전 오프셋이 없으면 예외 발생

 

 

ConsumerFactory (Spring for Apache Kafka 3.3.1 API)

getPostProcessors Get the current list of post processors. Returns: the post processor. Since: 2.5.3

docs.spring.io

 

💡 ConsumerFactory 사용예시
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    return new DefaultKafkaConsumerFactory<>(props);
}

 
 
 

4. ConcurrentKafkaListenerContainerFactory


💡 ConcurrentKafkaListenerContainerFactory

- KafkaListenerContainerFactory의 구현체로, 메시지 리스너의 동시성을 지원합니다.

- 멀티스레드 환경에서 메시지를 효율적으로 처리할 수 있게 해 줍니다. 또한, 각 리스너 컨테이너에 대한 세부적인 설정이 가능합니다.

 

💡 KafkaListenerContainerFactory 주요 설정 옵션

- KafkaListenerContainerFactory를 구성하기 위한 구현체로 KafkaListenerContainerFactory를 구성합니다. 그렇기에 KafkaListenerContainerFactory에 대한 주요 메서드에 대해 알아봅니다.
설정 설명 주요 특징
setConsumerFactory Consumer 팩토리 설정 - Consumer 인스턴스 생성을 위한 팩토리 설정
- 메시지 처리를 위한 기본 구성 제공
setConcurrency 리스너 컨테이너의 동시성 설정 - 각 리스너당 실행할 스레드 수 지정
- 처리량 조절 가능
setAutoStartup 컨테이너 자동 시작 여부 - true: 애플리케이션 시작 시 자동으로 리스너 시작
- false: 수동으로 시작 필요
setBatchListener 배치 처리 모드 설정 - true: 여러 레코드를 한 번에 처리
- false: 레코드 단위 처리
setAckMode 승인 모드 설정 - MANUAL: 수동 커밋
- MANUAL_IMMEDIATE: 즉시 수동 커밋
- RECORD: 레코드 단위 자동 커밋
- BATCH: 배치 단위 자동 커밋
setCommonErrorHandler 공통 에러 핸들러 설정 - 메시지 처리 중 발생하는 예외 처리
- 커스텀 에러 핸들링 로직 구현 가능
- 재시도, 데드 레터 큐 등 구성 가능

 

💡 KafkaListenerContainerFactory 사용예시
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    
    // Consumer 팩토리 설정
    factory.setConsumerFactory(consumerFactory);
    
    // 동시성 설정 - 3개의 스레드로 병렬 처리
    factory.setConcurrency(3);
    
    // 자동 시작 설정
    factory.setAutoStartup(true);
    
    // 배치 처리 모드 설정
    factory.setBatchListener(true);
    
    // 승인 모드 설정 - 배치 단위로 커밋
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    
    // 에러 핸들러 설정
    factory.setCommonErrorHandler(new DefaultErrorHandler(
        (consumerRecord, exception) -> {
            log.error("Error in processing: {}", exception.getMessage());
            log.error("Failed record: {}", consumerRecord);
            
            // 추가적인 에러 처리 로직
            // 예: 데드 레터 큐로 전송, 알림 발송 등
        }
    ));
    
    return factory;
}

 
 

4) Spring Kafka 동작 관련 주요 클래스 / 메서드


 

1. TopicBuilder


 💡 TopicBuilder

- Spring Kafka에서 제공하는 토픽 생성을 위한 빌더 클래스입니다.

- Configuration 내에 bean 형태로 구성함으로써 서버가 실행될때, 지정한 Topic이 구성되고 생성이 됩니다
- 토픽의 다양한 설정(파티션 수, 복제 팩터, 설정 값 등)을 메서드 체이닝 방식으로 쉽게 구성할 수 있게 해줍니다.

💡 TopicBuilder 주요 메서드
메서드 리턴 타입 설명
assignReplicas(int partition, List<Integer> replicaList) TopicBuilder 개별 레플리카 할당을 추가합니다.
build() org.apache.kafka.clients.admin.NewTopic NewTopic 객체를 생성합니다.
compact() TopicBuilder 토픽의 정리 정책을 압축으로 설정합니다.
config(String configName, String configValue) TopicBuilder 설정 옵션을 지정합니다.
configs(Map<String,String> configProps) TopicBuilder 여러 설정을 한번에 지정합니다.
name(String name) TopicBuilder 지정된 이름으로 TopicBuilder를 생성합니다.
partitions(int partitionCount) TopicBuilder 파티션 수를 설정합니다.
replicas(int replicaCount) TopicBuilder 레플리카 수를 설정합니다.
replicasAssignments(Map<Integer,List<Integer>> replicaAssignments) TopicBuilder 레플리카 할당을 설정합니다.
 

TopicBuilder (Spring for Apache Kafka 3.3.1 API)

Set the TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_COMPACT.

docs.spring.io

 

💡 TopicBuilder 사용예시

- TopicBuilder의 build()를 통해서 최종적으로 NewTopic()이 생성이 됩니다.
- @Bean으로 등록되어 Spring 컨테이너에서 관리되며, TopicBuilder를 사용하여 메서드 체이닝 방식으로 토픽의 설정을 구성하고 있습니다.
@Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic exampleTopic() {
        return TopicBuilder.name("example-topic")
                .partitions(3)                     // 파티션 수 설정
                .replicas(2)                       // 복제 팩터 설정
                .config(                           // 추가 설정
                    TopicConfig.RETENTION_MS_CONFIG,
                    String.valueOf(7 * 24 * 60 * 60 * 1000L)  // 7일
                )
                .build();
    }
    
    @Bean
    public NewTopic compactTopic() {
        return TopicBuilder.name("compact-topic")
                .partitions(1)
                .replicas(1)
                .compact()                         // 압축 정책 설정
                .build();
    }
}

 

 💡 [참고] 메서드 체이닝에 대해 궁금하시면 아래의 글을 참고하시면 도움이 됩니다.
 

[Java/디자인 패턴] 메서드 체이닝(Method Chaining), 플루언트 인터페이스(Fluent Interface), 빌더 패턴(Buil

해당 글에서는 메서드 체이닝(Method Chaining), 플루언트 인터페이스(Fluent Interface), 빌더 패턴(Builder Pattern)에 대해서 알아봅니다.1) 메서드 체이닝(Method Chaining)💡 메서드 체이닝(Method Chaining)- 여러

adjh54.tistory.com

 


2. KafkaTemplate


💡 KafkaTemplate

- Apache Kafka와의 통신을 단순화하는 Spring의 핵심 클래스로, 생산자가 Kafka 토픽으로 쉽게 메시지를 전송할 수 있게 해줍니다.

 

💡 KafkaTemplate 주요 메서드
메서드  리턴 타입 설명
send(String topic, Integer partition, Long timestamp, K key, V data) CompletableFuture<SendResult<K,V>> 특정 토픽, 파티션, 타임스탬프, 키, 데이터를 지정하여 메시지 전송
send(String topic, Integer partition, K key, V data) CompletableFuture<SendResult<K,V>> 특정 토픽, 파티션, 키, 데이터를 지정하여 메시지 전송
send(String topic, K key, V data) CompletableFuture<SendResult<K,V>> 특정 토픽에 키와 데이터를 지정하여 메시지 전송
send(String topic, V data) CompletableFuture<SendResult<K,V>> 특정 토픽에 데이터만 지정하여 메시지 전송
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record) CompletableFuture<SendResult<K,V>> ProducerRecord 객체를 사용하여 메시지 전송
send(Message<?> message) CompletableFuture<SendResult<K,V>> Spring 메시징의 Message 객체를 사용하여 메시지 전송
     
sendDefault(Integer partition, Long timestamp, K key, V data) CompletableFuture<SendResult<K,V>> 기본 토픽으로 파티션, 타임스탬프, 키, 데이터를 지정하여 메시지 전송
sendDefault(Integer partition, K key, V data) CompletableFuture<SendResult<K,V>> 기본 토픽으로 파티션, 키, 데이터를 지정하여 메시지 전송
sendDefault(K key, V data) CompletableFuture<SendResult<K,V>> 기본 토픽으로 키와 데이터를 지정하여 메시지 전송
sendDefault(V data) CompletableFuture<SendResult<K,V>> 기본 토픽으로 데이터만 지정하여 메시지 전송
sendDefault(Integer partition, Long timestamp, K key, V data) CompletableFuture<SendResult<K,V>> 지정된 파티션, 타임스탬프, 키와 함께 기본 토픽으로 데이터 전송
sendDefault(Integer partition, K key, V data) CompletableFuture<SendResult<K,V>> 지정된 파티션, 키와 함께 기본 토픽으로 데이터 전송
sendDefault(K key, V data) CompletableFuture<SendResult<K,V>> 지정된 키와 함께 기본 토픽으로 데이터 전송 (파티션 미지정)
sendDefault(V data) CompletableFuture<SendResult<K,V>> 키나 파티션 없이 기본 토픽으로 데이터만 전송

 

 

KafkaTemplate (Spring for Apache Kafka 3.3.1 API)

Create an instance using the supplied producer factory and autoFlush setting. Set autoFlush to true if you wish for the send operations on this template to occur immediately, regardless of the linger.ms or batch.size property values. This will also block u

docs.spring.io

 

Sending Messages :: Spring Kafka

When the @KafkaListener returns a Message , with versions before 2.5, it was necessary to populate the reply topic and correlation id headers. In this example, we use the reply topic header from the request: @KafkaListener(id = "requestor", topics = "reque

docs.spring.io

 

 💡 KafkaTemplate을 이용한 Kafka 토픽으로 메시지를 전송하는 다양한 방법입니다.
package com.adjh.springbootkafka.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

/**
 * kafkaTemplate 활용한 Topic 내에 메시지를 전송하는 다양한 방법
 *
 * @author : jonghoon
 * @fileName : KafkaProducerService
 * @since : 25. 1. 10.
 */
@Slf4j
@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String TOPIC_NAME = "example-topic";

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 기본적인 메시지 전송
     *
     * @param message 전송 메시지
     */
    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC_NAME, message);
    }

    /**
     * 키와 함께 메시지 전송
     *
     * @param key     Topic Key
     * @param message 전송 메시지
     */
    public void sendMessageWithKey(String key, String message) {
        kafkaTemplate.send(TOPIC_NAME, key, message);
    }

    /**
     * 특정 파티션으로 메시지 전송
     *
     * @param message   전송 메시지
     * @param partition 파티션 명
     */
    public void sendMessageToPartition(String message, int partition) {
        kafkaTemplate.send(TOPIC_NAME, partition, null, message);
    }

    /**
     * 비동기 전송 결과 처리
     *
     * @param message 전송 메시지
     */
    public void sendMessageWithCallback(String message) {
        kafkaTemplate.send(TOPIC_NAME, message)
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        log.debug("Success: {} ", result.getRecordMetadata());
                    } else {
                        log.error("Failed: {}", ex.getMessage());
                    }
                });
    }
}

 
 

3. @KafkaListener


💡 @KafkaListener

- Spring Kafka에서 메시지를 소비하기 위한 핵심 어노테이션입니다. 이 어노테이션을 메서드에 적용하면 지정된 Kafka 토픽으로부터 자동으로 메시지를 수신하고 처리할 수 있습니다.

- 메서드 레벨에서 선언하여 간단하게 메시지 소비자(Consumer)를 구현할 수 있고, 여러 토픽을 동시에 구독할 수 있으며, 토픽 패턴을 사용할 수도 있다는 특징이 있습니다.
- 컨슈머 그룹을 통해 메시지 소비를 분산처리할 수 있으며, 메시지 처리 중 발생하는 예외를 ErrorHandler를 통해 처리할 수 있습니다.

 

💡 @KafkaListener 속성
속성 타입 설명
autoStartup String 리스너의 자동 시작 여부를 설정
batch String 배치 처리 여부를 설정
beanRef String 리스너를 포함하는 빈의 참조명
clientIdPrefix String Kafka 클라이언트 ID의 접두사 설정
concurrency String 리스너의 동시 실행 스레드 수 설정
containerFactory String 리스너 컨테이너 팩토리 빈의 이름 지정
containerGroup String 리스너 컨테이너의 그룹 지정
errorHandler String 에러 처리를 위한 ErrorHandler 빈 지정
filter String 메시지 필터링을 위한 설정
groupId String 컨슈머 그룹 ID 설정
id String 리스너의 고유 식별자
properties String[] 추가적인 Kafka 속성 설정
topicPartitions TopicPartition[] 특정 토픽의 파티션 지정
topicPattern String 정규식을 이용한 토픽 패턴 지정
topics String[] 구독할 토픽 목록 지정

 

 

KafkaListener (Spring for Apache Kafka 3.3.1 API)

The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint. If not specified, the default container factory is used, if any. If a SpEL expression is provided (#{...}), the expressio

docs.spring.io

 

 

@KafkaListener Annotation :: Spring Kafka

When using manual AckMode, you can also provide the listener with the Acknowledgment. To activate the manual AckMode, you need to set the ack-mode in ContainerProperties to the appropriate manual mode. The following example also shows how to use a differen

docs.spring.io

 

💡 @KafkaListener을 이용한 Kafka 토픽에서 메시지를 수신하는 예시입니다.
package com.adjh.springbootkafka.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * Kafka Topic을 구독하여 메시지를 수신하는 서비스
 *
 * @author : jonghoon
 * @fileName : KafkaConsumerService
 * @since : 25. 1. 10.
 */
@Slf4j
@Service
public class KafkaConsumerService {

    /**
     * 기본적인 토픽 리스닝
     *
     * @param message 수신 메시지
     */
    @KafkaListener(topics = "example-topic", groupId = "group-1")
    public void listen(String message) {
        log.debug("기본 메시지 수신: {}", message);
    }

    /**
     * 동시성과 자동시작 설정
     *
     * @param message 수신 메시지
     */
    @KafkaListener(
            topics = "high-volume-topic",
            groupId = "group-2",
            concurrency = "3",
            autoStartup = "true"
    )
    public void listenWithConcurrency(String message) {
        log.debug("동시처리 메시지 수신: {}", message);
    }

    /**
     * 배치 처리와 에러 핸들러 설정
     *
     * @param messages 수신 메시지
     */
    @KafkaListener(
            topics = "batch-topic",
            groupId = "group-3",
            batch = "true",
            errorHandler = "customErrorHandler"
    )
    public void listenBatch(List<String> messages) {
        log.debug("배치 메시지 수신: {}", messages.size() + "개");
    }

    /**
     * 토픽 패턴 사용
     *
     * @param message 수신 메시지
     */
    @KafkaListener(
            topicPattern = "test.*",
            groupId = "group-4",
            clientIdPrefix = "pattern-client"
    )
    public void listenToPattern(String message) {
        log.debug("패턴 토픽 메시지 수신: {}", message);
    }
}

 
 

4. ErrorHandler


💡 ErrorHandler

- Kafka 소비자(Consumer)에서 메시지 처리 중 발생하는 예외를 처리하기 위한 인터페이스입니다.
- Spring Kafka는 다양한 ErrorHandler 구현체를 제공하며, 사용자 정의 ErrorHandler도 구현할 수 있습니다.
ErrorHandler 종류 기능
DefaultErrorHandler - 기본적인 에러 처리 구현체
- 재시도 정책과 백오프 전략 설정 가능
- 특정 예외에 대한 재시도 제외 설정 가능
SeekToCurrentErrorHandler - 메시지 처리 실패 시 현재 오프셋으로 되돌아가서 재처리를 시도하는 핸들러
ContainerStoppingErrorHandler - 에러 발생 시 리스너 컨테이너를 중지시키는 핸들러
KafkaListenerErrorHandler - @KafkaListener 어노테이션에서 직접 사용 가능한 에러 핸들러
CommonErrorHandler - 배치 처리와 레코드 처리 모두에 사용 가능한 공통 에러 처리 인터페이스
 

Handling Exceptions :: Spring Kafka

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. By default, any unprocessed records (including the failed record) are re-fetched on the next poll. This is

docs.spring.io

 

4.1. DefaultErrorHandler


💡 DefaultErrorHandler

- Spring Kafka에서 제공하는 기본적인 에러 처리 구현체입니다. 메시지 처리 중 발생하는 예외를 처리하고 재시도 정책을 관리합니다.

 

💡 DefaultErrorHandler 사용 예시

- 메시지 처리 중 발생하는 예외를 처리하고 재시도 정책을 관리하는 Spring Kafka의 기본적인 에러 처리 구현체입니다

- 재시도 정책 설정: FixedBackOff를 사용하여 1초(1000L) 간격으로 최대 2번 재시도하도록 설정합니다
- 최종 실패 처리: 모든 재시도가 실패한 후 실행될 콜백 함수를 정의하며, 실패한 레코드와 예외 정보를 로깅합니다
- 예외 처리 설정: IllegalArgumentException의 경우 재시도하지 않도록 설정합니다
 /**
   * DefaultErrorHandler 구성
   *
   * @return
   */
  @Bean
  public DefaultErrorHandler defaultErrorHandler() {
      // 재시도 정책 설정
      BackOff backOff = new FixedBackOff(1000L, 2);

      DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
          // 최종 실패 시 처리할 로직
          log.debug("처리 실패한 레코드: {} exception : {} ", consumerRecord, exception);
      }, backOff);

      // 특정 예외는 재시도하지 않도록 설정
      errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
      return errorHandler;
  }

 

 💡 kafkaListenerContainerFactory 예시

- 어노테이션 @KafkaListener 메서드들을 위한 컨테이너 설정을 합니다.
- setCommonErrorHandler() 메서드를 통해서 defaultErrorHandler bean을 등록하여서 공통 에러 발생 시 위와 같은 처리를 수행합니다.
/**
   * Kafka Listener 컨테이너 Factory Bean
   * 어노테이션 @KafkaListener 메서드들을 위한 컨테이너 설정
   *
   * @return ConcurrentKafkaListenerContainerFactory<String, String>
   */
  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      factory.setCommonErrorHandler(defaultErrorHandler());
      return factory;
  }

 
 
 

4.2. KafkaListenerErrorHandler


💡 KafkaListenerErrorHandler

- @KafkaListener 어노테이션에서 발생하는 예외를 처리하기 위한 인터페이스입니다. 각각의 리스너 메서드에 대해 개별적인 예외 처리 로직을 정의할 수 있습니다.

 
 

💡 KafkaListenerErrorHandler 사용예시

- 아래와 같이 각각의 리스너 별로 다른 에러 처리 로직을 적용할 수 있고, 메시지 처리 실패 시 대체 응답을 반환할 수 있습니다.
- 필요한 경우 실패한 메시지를 다른 토픽으로 전달하거나 데이터베이스에 저장할 수 있습니다.
@Bean
public KafkaListenerErrorHandler customErrorHandler() {
    return (message, exception) -> {
        log.error("메시지 처리 중 오류 발생: {}", exception.getMessage());
        log.error("문제가 발생한 메시지: {}", message.getPayload());
        // 예외 처리 로직 구현
        // 필요한 경우 다른 결과 반환 가능
        return "Error Handled";
    };
}

 
 

💡 @KafkaListener 사용예시

- 위에 구성한 customErrorHandler bean을 @KafkaListener의 errorHandler 속성을 통해서 등록이 가능합니다.
@KafkaListener(
    topics = "example-topic",
    groupId = "group-1",
    errorHandler = "customErrorHandler"
)
public void listen(String message) {
    // 메시지 처리 로직
    log.debug("메시지 수신: {}", message);
}

 
 
 
 
 
오늘도 감사합니다 😀

그리드형