- spring.kafka.bootstrap-servers : Kafka 서버 접속 주소를 지정하였습니다. - spring.kafka.producer.key-serializer : 생성자의 메시지 키 값을 ‘문자열 형태로 직렬화(StringSerializer)’ 합니다. - spring.kafka.producer.value-serializer : 생성자의 메시지 값을 ‘문자열 형태로 직렬화(StringSerializer)’ 합니다.
3. Spring Kafka Logging
- 프로젝트 및 Kafka에 대한 로깅을 수행하기 위해 로깅 레벨을 지정합니다.
# Server Config
server:
port: 8000
# Spring Config
spring:
kafka:
bootstrap-servers: localhost:29092 # Kafka 접속 정보 지정
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 문자열 형태로 메시지 키 값 직렬화
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 문자열 형태로 메시지 값 직렬화# Spring Kafka Logging
logging:
level:
com.adjh.springbootkafka: DEBUG
org:
apache.kafka: INFO
springframework.kafka: INFO
[ 더 알아보기 ] 💡 producer.key-serializer, producer.value-serializer는 왜 직렬화를 수행할까?
- Kafka는 네트워크를 통해 메시지를 전송하는데, Java 객체를 있는 그대로 네트워크로 전송할 수 없습니다. 따라서 바이트 스트림으로 변환(직렬화)이 필요합니다. - 또한, 서로 다른 시스템이나 언어 간에 데이터를 주고받을 때, 직렬화된 형태를 통해 호환성을 보장할 수 있습니다.
1.1. 직렬화(Serializer) 종류 및 특징
💡 직렬화(Serializer) 종류 및 특징
- 메시지를 구성하고 전송할 때 이용하는 직렬화는 메시지 형태를 바이트 스트림 형태로 변환하는 과정을 의미합니다. - 메시지를 수신하는 소비자(Consumer) 내에서도 동일하게 역직렬화를 위해서 동일한 형태로 구성해야 합니다.
2.1. spring.kafka.bootstrap-servers : Kafka 서버 접속 주소를 지정하였습니다.
2.2. spring.kafka.consumer.group-id : my-group이라는 Consumer들을 논리적으로 그룹화하였습니다. - 예를 들어서, 아래와 같이 시스템 별로 Group Id를 사용하면, 동일한 메시지를 각 시스템에서 독립으로 처리할 수 있습니다. - 주문 처리 시스템: order-processing-group - 알림 시스템: notification-group - 데이터 분석 시스템: analytics-group
2.3. spring.kafka.consumer.auto-offset-reset - Consumer가 저장된 offset을 찾지 못했을 때 어떻게 동작할지를 결정합니다.
2.3. spring.kafka.consumer.key-serializer : 생성자의 메시지 키 값을 ‘문자열 형태로 직렬화(StringSerializer)’ 합니다.
2.4. spring.kafka.consumer.value-serializer : 생성자의 메시지 값을 ‘문자열 형태로 직렬화(StringSerializer)’ 합니다.
3. Spring Kafka Logging
- 프로젝트 및 Kafka에 대한 로깅을 수행하기 위해 로깅 레벨을 지정합니다.
# Server Config
server:
port: 8001
# Spring Config
spring:
kafka:
bootstrap-servers: localhost:29092 # Kafka 접속 정보 지정
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 문자열 형태로 메시지 키 값 역직렬화
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 문자열 형태로 메시지 값 역직렬화
# Spring Kafka Logging
logging:
level:
com.adjh.springbootkafka: DEBUG
org:
apache.kafka: INFO
springframework.kafka: INFO
💡 spring.kafka.consumer.auto-offset-reset 속성의 설정 값
설정값
설명
사용 사례
earliest
가장 처음의 offset부터 메시지를 소비
데이터 손실을 허용하지 않고 모든 메시지를 처리해야 하는 경우
latest
가장 최근의 offset부터 메시지를 소비 (기본값)
실시간 데이터 처리나 최신 메시지만 필요한 경우
none
저장된 offset을 찾지 못하면 예외 발생
엄격한 offset 관리가 필요한 경우
💡 [참고] Consumer Group
- 하나의 토픽에 대해 협력하여 메시지를 처리하는 소비자(Consumer)들의 집합을 의미합니다. 즉, 동일한 토픽을 구독하는 소비자들의 논리적인 그룹입니다.
- Spring Boot 시작을 하는 부분에 @EnableKafka 어노테이션을 선언해 줍니다.
💡 @EnableKafka
- Spring for Apache Kafka의 핵심 어노테이션입니다. Kafka 관련 기능을 활성화하고 @KafkaListener 어노테이션을 사용할 수 있게 해주는 설정 어노테이션입니다. - @EnableKafka가 없다면 @KafkaListener 어노테이션이 동작하지 않아 Kafka로부터 메시지를 수신할 수 없게 됩니다.
💡 주요 기능
1. Kafka 리스너 컨테이너를 자동으로 생성 2. Kafka 메시지 리스닝에 필요한 인프라 구성 자동화 3. @KafkaListener 어노테이션 활성화 4. Kafka 관련 컴포넌트들의 자동 구성 지원
- Kafka Listener를 통해서 전달받은 메시지를 비즈니스 로직 처리 합니다. - processMessage() 메서드에서는 간단하게 전달 받은 메시지를 대문자로 변환합니다.
package com.adjh.springbootkafkaconsumer.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* Kafka Listener 를 통해서 전달 받은 메시지를 비즈니스 로직 처리
*
* @author : jonghoon
* @fileName : KafkaConsumerService
* @since : 25. 1. 10.
*/@Slf4j@ServicepublicclassKafkaConsumerService {
/**
* 간단한 비즈니스 로직 처리
*
* @param message 리스너로 들어온 메시지
* @return
*/public String processMessage(String message) {
try {
// 비즈니스 로직 처리 : 메시지를 대문자로 변환하여 이를 반환합니다.return message.toUpperCase();
} catch (Exception e) {
log.error("메시지 처리 중 오류 발생: {}", e.getMessage(), e);
thrownewRuntimeException("메시지 처리 실패", e);
}
}
}
💡 yaml 파일과 Java Configuration 방식을 둘 다 구성한 경우 어떤 파일이 실행될까?
- 두 설정 방식이 공존할 때는 “Java Configuration의 설정”이 우선적으로 적용되며, 누락된 설정은 yaml 파일에서 보완하는 방식으로 동작합니다.
- 동일한 설정이 있는 경우, Java Configuration의 설정이 yaml 파일의 설정을 덮어씁니다. - 예를 들어서, yaml 파일에서 bootstrap-servers=localhost:29092, group-id=group1로 설정하고 Java Configuration에서 bootstrap-servers=localhost:29093만 설정만 했을 경우 - 최종적으로 bootstrap-servers=localhost:29093(Java Config), group-id=group1(yaml)이 됩니다.
- 기존에 yaml 파일 형태로 Kafka 연결 및 생성 데이터에 대한 환경 설정을 yaml 파일 형태로 구성하였습니다.
# Spring Config
spring:
kafka:
bootstrap-servers: localhost:29092 # Kafka 접속 정보 지정
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 문자열 형태로 메시지 키 값 직렬화
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 문자열 형태로 메시지 값 직렬화
💡 기존의 yaml 파일을 Java Configuration 클래스를 구성하였습니다.
1. @Value - yaml 파일에서 정의된 Kafka 서버 주소를 주입받습니다
2. producerFactory() 메서드
- Producer 설정을 위한 Factory Bean을 생성합니다 - 메시지 생산자의 직렬화 설정과 서버 연결 설정을 담당합니다. - bootstrap 서버, key serializer, value serializer 등 기본 설정을 Map 형태로 구성합니다
3. kafkaTemplate() 메서드
- 실제 애플리케이션에서 메시지를 전송할 때 사용되는 템플릿 Bean을 생성합니다 - producerFactory를 이용하여 KafkaTemplate 인스턴스를 생성합니다
package com.adjh.springbootkafka.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 연결 설정을 관리하는 관리파일.
*
* @author : jonghoon
* @fileName : KafkaConfig
* @since : 25. 1. 9.
*/@Slf4j@Configurationpublic class. KafkaConfig {
// Kafka 연결 서버 주소@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;
/**
* Producer 설정을 위한 Factory Bean
* - 메시지 생산자의 직렬화 설정 및 서버 연결 설정을 담당
*
* @return ProducerFactory<String, String>
*/@Beanpublic ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
returnnewDefaultKafkaProducerFactory<>(props);
}
/**
* Kafka 메시지를 전송하기 위한 템플릿 Bean
* 실제 애플리케이션에서 메시지 전송시 사용됨
*
* @return KafkaTemplate<String, String>
*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {
returnnewKafkaTemplate<>(producerFactory());
}
}
- 기존에 yaml 파일 형태로 Kafka 연결 및 그룹 지정 및 데이터 수신 데이터 역직렬화에 대해 yaml 파일 형태로 구성하였습니다.
# Spring Config
spring:
kafka:
bootstrap-servers: localhost:29092 # Kafka 접속 정보 지정
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 문자열 형태로 메시지 키 값 역직렬화
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 문자열 형태로 메시지 값 역직렬화
💡 기존의 yaml 파일을 Java Configuration 클래스를 구성하였습니다. 1. @Value
- yaml 파일에서 정의된 Kafka 서버 주소를 주입받습니다
2. consumerFactory() 메서드
- Consumer 설정을 위한 Factory Bean을 생성하는 메서드입니다 - ConsumerConfig를 통한 키 갑을 통해서 서버, 그룹 아이디, Offset Reset 설정, 메시지 키 역직렬화, 메시지 키 값 역직렬화 설정을 지정하였습니다.
3. kafkaListenerContainerFactory() 메서드
- @KafkaListener 어노테이션이 붙은 메서드들을 위한 컨테이너 설정을 담당합니다 - consumerFactory를 사용하여 리스너 컨테이너를 생성합니다 - Kafka 메시지를 수신하고 처리하는데 필요한 기본 설정을 제공합니다
package com.adjh.springbootkafkaconsumer.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 연결 설정을 관리하는 관리파일.
*
* @author : jonghoon
* @fileName : KafkaConfig
* @since : 25. 1. 9.
*/@Slf4j@ConfigurationpublicclassKafkaConfig {
// Kafka 연결 서버 주소@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;
/**
* Consumer 설정을 위한 Factory Bean
* 메시지 소비자의 역직렬화 설정 및 그룹 ID, 오프셋 설정을 담당
*
* @return ConsumerFactory<String, String>
*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
returnnewDefaultKafkaConsumerFactory<>(props);
}
/**
* Kafka Listener 컨테이너 Factory Bean
* 어노테이션 @KafkaListener 메서드들을 위한 컨테이너 설정
*
* @return ConcurrentKafkaListenerContainerFactory<String, String>
*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}