- 실시간으로 스트림 데이터를 수집하고 처리하는 데 최적화된 ‘분산 이벤트 스트리밍 플랫폼(Distributed Data Streaming Platform)’입니다. 이는 실시간으로 발생하는 대량의 데이터를 중앙 허브를 통해 흐르도록 설계되어 있습니다. 이를 통해 데이터의 일관성을 유지하고 시스템 전반의 복잡성을 줄일 수 있습니다.
- 이러한 다량의 데이터는 A 지점에서 B 지점까지만의 데이터가 이동되는 것뿐만 아니라 A 지점에서 Z지점까지의 필요한 모든 곳에 대규모 데이터를 동시에 전달할 수 있습니다. - 현대적인 데이터 파이프라인 구축에 필수적인 도구로 자리잡았으며, Netflix, LinkedIn, Uber 등 많은 기업들이 핵심 인프라로 사용하고 있습니다.
- Apache Kafka를 Spring 프레임워크에서 쉽게 Kafka 기반 메시징 솔루션 개발을 사용할 수 있도록 해주는 프로젝트입니다.
- 메시지를 보내기 위한 상위 수준 추상화로 KafkaTemplate을 제공합니다. 또한, @KafkaListener 어노테이션을 이용하여 리스너 컨테이너가 있는 메시지 기반 POJO에 대한 지원도 제공합니다.
- 다양한 메시징 시스템(JMS, RabbitMQ, Kafka 등)에 대해 일관된 프로그래밍 모델을 제공합니다. 예를 들어서, 메시지 전송을 위한 KafkaTemplate, JmsTemplate, RabbitTemplate를 이용하거나 메시지 수신을 위한 어노테이션으로 @KafkaListener, @JmsListener, @RabbitListener를 사용하면서 다양한 메시징 시스템에 비슷한 패턴과 사용법을 공유하여 다른 시스템으로 전환할 때 개발자가 쉽게 적응할 수 있습니다.
- Kafka를 활용하여 사용자 활동에 기반한 실시간 알림 발송을 하는 시스템으로 구성할 수 있습니다. 이는푸시 알림, 이메일, SMS 등 다양한 채널로의 메시지 전달하는 역할을 수행합니다.
- 예시로는 A, B 사용자가 있을 때, B 사용자가 A 사용자에게 친구요청을 하는 경우, A 사용자에게 실시간 알람을 전송합니다. - 즉, B 사용자의 친구요청 처리가 된 이후 A에게 알림을 주는 메시지를 전달할 때, Kafka 입장에서는 B 사용자는 생성자(Producer) 입장이 되어서 Kafka로 정보를 담아 특정 토픽에 메시지를 전송을 하고 완료됩니다. - 이후 처리는 A 사용자인 소비자(Consumer)가 리스너가 되어서 Kafka의 특정 토픽을 수신하여 메시지가 생기면 이를 수신하여 알림 처리를 수행합니다.
💡 [참고] 우아한 형제들에서 Kafka를 이용한 실시간 알림 시스템을 이용한 프로세스 구성 예시
https://techblog.woowahan.com/17386/
💡 Spring Kafka를 사용하여 실시간 알람을 전송하는 서비스 클래스를 구현한 간단한 예시입니다 1. NotificationService (Producer)
- Kafka를 활용하여 실시간으로 발생하는 대량의 데이터를 수집하고 처리하는 시스템입니다.
- 웹사이트나 앱에서의 사용자 활동을 실시간으로 추적하고 분석하는 사용자 행동 패턴 분석을 수행하거나 다수의 IoT 디바이스에서 발생하는 센서 데이터를 실시간으로 수집 및 모니터링을 하는 데이터 처리를 수행합니다.
- 이를 통해서 실시간 데이터를 기반으로 신속한 비즈니스 의사결정 가능하며, 시스템 이상이나 비정상적인 패턴을 즉시 감지가 가능합니다.
💡 Spring Kafka를 사용하여 실시간 데이터 분석하는 서비스 클래스를 구현한 간단한 예시입니다
1. AnalyticsService (Producer)
- KafkaTemplate을 사용하여 사용자 활동 데이터를 'analytics-topic'으로 전송합니다. - trackUserActivity() 메서드를 통해 UserActivity 객체를 Kafka에 전송합니다.
2. AnalyticsProcessor (Consumer)
- @KafkaListener 어노테이션을 사용하여 'analytics-topic'의 메시지를 구독합니다. - 수신된 사용자 활동 데이터를 분석하고 통계 처리하는 역할을 수행합니다.
@ServicepublicclassAnalyticsService{
privatefinal KafkaTemplate<String, UserActivity> kafkaTemplate;
publicvoidtrackUserActivity(UserActivity activity){
kafkaTemplate.send("analytics-topic", activity);
}
}
@ServicepublicclassAnalyticsProcessor{
@KafkaListener(topics = "analytics-topic")publicvoidprocessActivity(UserActivity activity){
// 데이터 분석 및 통계 처리
}
}
3) Spring Kafka 환경설정 클래스/메서드
💡 Spring Kafka 환경설정 클래스/메서드
- Spring Kafka를 Java Configuartion을 통하여 환경 설정을 하는 경우 사용되는 클래스와 메서드를 알아봅니다.
1. 환경설정 파일 구성 형태 확인
💡 환경설정 파일 구성 형태 확인
- 해당 파일은 Docker로 구성한 docker-compose.yml 파일입니다. 이를 기준으로 연결 및 관리를 위한 환경설정 파일을 구성합니다.
💡 [참고] 아래의 Repository 내에서 Docker-Compose 파일을 확인하실 수 있습니다.
Kafka 서버 접속 주소 (Docker compose의 PLAINTEXT_HOST 포트)
producer.key-serializer
StringSerializer
프로듀서 키 직렬화 방식
producer.value-serializer
StringSerializer
프로듀서 값 직렬화 방식
consumer.group-id
my-group
컨슈머 그룹 식별자
consumer.auto-offset-reset
earliest
초기 오프셋 설정 (가장 처음부터 메시지 소비)
consumer.key-deserializer
StringDeserializer
컨슈머 키 역직렬화 방식
consumer.value-deserializer
StringDeserializer
컨슈머 값 역직렬화 방식
1.2. Java Configuation 방식을 통한 Kafka 연결 확인
💡 Java Configuation 방식을 통한 Kafka 연결 확인
- 위에 yaml 파일을 아래와 같이 Java Configuation 방식을 이용하여서 구성이 가능합니다.
package com.adjh.springbootkafka.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 연결 설정을 관리하는 관리파일.
*
* @author : jonghoon
* @fileName : KafkaConfig
* @since : 25. 1. 9.
*/@Slf4j@ConfigurationpublicclassKafkaConfig{
// Kafka 연결 서버 주소@Value("${spring.kafka.bootstrap-servers}")
privateString bootstrapServers;
/**
* Producer 설정을 위한 Factory Bean
* - 메시지 생산자의 직렬화 설정 및 서버 연결 설정을 담당
*
* @return ProducerFactory<String, String>
*/@Beanpublic ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
returnnew DefaultKafkaProducerFactory<>(props);
}
/**
* Consumer 설정을 위한 Factory Bean
* 메시지 소비자의 역직렬화 설정 및 그룹 ID, 오프셋 설정을 담당
*
* @return ConsumerFactory<String, String>
*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
returnnew DefaultKafkaConsumerFactory<>(props);
}
/**
* Kafka 메시지를 전송하기 위한 템플릿 Bean
* 실제 애플리케이션에서 메시지 전송시 사용됨
*
* @return KafkaTemplate<String, String>
*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {
returnnew KafkaTemplate<>(producerFactory());
}
}
[ 더 알아보기 ] 💡 yaml 파일을 사용하거나 Java Configuration 방식을 사용했을 때 뭐가 더 좋은 걸까?
- yaml 파일을 사용하면 가독성이 좋고 계층 구조가 직관적이라는 장점이 있습니다. 그러나 동적 설정에 대해서는 제한적이거나 타입의 안정성이 상대적으로 떨어진다는 단점이 있습니다. - Java Configuration 방식을 사용하면 타입에 대한 안정성이 보장되고, IDE 내에서 자동완성을 지원한다는 장점이 있습니다. 그러나 상대적으로 많은 코드가 필요하고, 가독성이 낮다는 단점이 있습니다.
- 일반적으로 간단한 설정은 yaml 파일을 사용하고, 복잡한 로직이나 타입 안정성이 중요한 설정은 Java Configuration을 사용하는 것이 권장됩니다.
💡 yaml 파일과 Java Configuration 방식을 둘 다 구성했다면, 어떤 파일이 실행될까?
- 두 설정 방식이 공존할 때는 Java Configuration의 설정이 우선적으로 적용되며, 누락된 설정은 yaml 파일에서 보완하는 방식으로 동작합니다. - Java Configuration(@Configuration)이 yaml/properties 파일보다 우선순위가 높습니다. - 동일한 설정이 있는 경우, Java Configuration의 설정이 yaml 파일의 설정을 덮어씁니다. - Java Configuration에서 정의되지 않은 설정은 yaml 파일의 설정이 사용됩니다.
- 예를 들어서, yaml 파일에서 bootstrap-servers=localhost:29092, group-id=group1로 설정하고 Java Configuration에서 bootstrap-servers=localhost:29093만 설정만 했을 경우
- 최종적으로 bootstrap-servers=localhost:29093(Java Config), group-id=group1(yaml)이 됩니다.
2. ProducerFactory
💡 ProducerFactory
- Kafka 생성자(Producer) 인스턴스를 생성하고 관리하는 팩토리 클래스입니다.
- Producer의 구성 설정을 캡슐화하고, 필요할 때마다 새로운 Producer 인스턴스를 생성합니다.
- KafkaListenerContainerFactory의 구현체로, 메시지 리스너의 동시성을 지원합니다.
- 멀티스레드 환경에서 메시지를 효율적으로 처리할 수 있게 해 줍니다. 또한, 각 리스너 컨테이너에 대한 세부적인 설정이 가능합니다.
💡 KafkaListenerContainerFactory 주요 설정 옵션
- KafkaListenerContainerFactory를 구성하기 위한 구현체로 KafkaListenerContainerFactory를 구성합니다. 그렇기에 KafkaListenerContainerFactory에 대한 주요 메서드에 대해 알아봅니다.
설정
설명
주요 특징
setConsumerFactory
Consumer 팩토리 설정
- Consumer 인스턴스 생성을 위한 팩토리 설정 - 메시지 처리를 위한 기본 구성 제공
setConcurrency
리스너 컨테이너의 동시성 설정
- 각 리스너당 실행할 스레드 수 지정 - 처리량 조절 가능
setAutoStartup
컨테이너 자동 시작 여부
- true: 애플리케이션 시작 시 자동으로 리스너 시작 - false: 수동으로 시작 필요
setBatchListener
배치 처리 모드 설정
- true: 여러 레코드를 한 번에 처리 - false: 레코드 단위 처리
setAckMode
승인 모드 설정
- MANUAL: 수동 커밋 - MANUAL_IMMEDIATE: 즉시 수동 커밋 - RECORD: 레코드 단위 자동 커밋 - BATCH: 배치 단위 자동 커밋
setCommonErrorHandler
공통 에러 핸들러 설정
- 메시지 처리 중 발생하는 예외 처리 - 커스텀 에러 핸들링 로직 구현 가능 - 재시도, 데드 레터 큐 등 구성 가능
💡 KafkaListenerContainerFactory 사용예시
@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory){
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// Consumer 팩토리 설정
factory.setConsumerFactory(consumerFactory);
// 동시성 설정 - 3개의 스레드로 병렬 처리
factory.setConcurrency(3);
// 자동 시작 설정
factory.setAutoStartup(true);
// 배치 처리 모드 설정
factory.setBatchListener(true);
// 승인 모드 설정 - 배치 단위로 커밋
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
// 에러 핸들러 설정
factory.setCommonErrorHandler(new DefaultErrorHandler(
(consumerRecord, exception) -> {
log.error("Error in processing: {}", exception.getMessage());
log.error("Failed record: {}", consumerRecord);
// 추가적인 에러 처리 로직// 예: 데드 레터 큐로 전송, 알림 발송 등
}
));
return factory;
}
4) Spring Kafka 동작 관련 주요 클래스 / 메서드
1. TopicBuilder
💡 TopicBuilder
- Spring Kafka에서 제공하는 토픽 생성을 위한 빌더 클래스입니다. - Configuration 내에 bean 형태로 구성함으로써 서버가 실행될때, 지정한 Topic이 구성되고 생성이 됩니다 - 토픽의 다양한 설정(파티션 수, 복제 팩터, 설정 값 등)을 메서드 체이닝 방식으로 쉽게 구성할 수 있게 해줍니다.
💡 KafkaTemplate을 이용한 Kafka 토픽으로 메시지를 전송하는 다양한 방법입니다.
package com.adjh.springbootkafka.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* kafkaTemplate 활용한 Topic 내에 메시지를 전송하는 다양한 방법
*
* @author : jonghoon
* @fileName : KafkaProducerService
* @since : 25. 1. 10.
*/@Slf4j@ServicepublicclassKafkaProducerService{
privatefinal KafkaTemplate<String, String> kafkaTemplate;
privatefinal String TOPIC_NAME = "example-topic";
publicKafkaProducerService(KafkaTemplate<String, String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
/**
* 기본적인 메시지 전송
*
* @param message 전송 메시지
*/publicvoidsendMessage(String message){
kafkaTemplate.send(TOPIC_NAME, message);
}
/**
* 키와 함께 메시지 전송
*
* @param key Topic Key
* @param message 전송 메시지
*/publicvoidsendMessageWithKey(String key, String message){
kafkaTemplate.send(TOPIC_NAME, key, message);
}
/**
* 특정 파티션으로 메시지 전송
*
* @param message 전송 메시지
* @param partition 파티션 명
*/publicvoidsendMessageToPartition(String message, int partition){
kafkaTemplate.send(TOPIC_NAME, partition, null, message);
}
/**
* 비동기 전송 결과 처리
*
* @param message 전송 메시지
*/publicvoidsendMessageWithCallback(String message){
kafkaTemplate.send(TOPIC_NAME, message)
.whenComplete((result, ex) -> {
if (ex == null) {
log.debug("Success: {} ", result.getRecordMetadata());
} else {
log.error("Failed: {}", ex.getMessage());
}
});
}
}
3. @KafkaListener
💡 @KafkaListener
- Spring Kafka에서 메시지를 소비하기 위한 핵심 어노테이션입니다. 이 어노테이션을 메서드에 적용하면 지정된 Kafka 토픽으로부터 자동으로 메시지를 수신하고 처리할 수 있습니다.
- 메서드 레벨에서 선언하여 간단하게 메시지 소비자(Consumer)를 구현할 수 있고, 여러 토픽을 동시에 구독할 수 있으며, 토픽 패턴을 사용할 수도 있다는 특징이 있습니다. - 컨슈머 그룹을 통해 메시지 소비를 분산처리할 수 있으며, 메시지 처리 중 발생하는 예외를 ErrorHandler를 통해 처리할 수 있습니다.
- Spring Kafka에서 제공하는 기본적인 에러 처리 구현체입니다. 메시지 처리 중 발생하는 예외를 처리하고 재시도 정책을 관리합니다.
💡 DefaultErrorHandler 사용 예시
- 메시지 처리 중 발생하는 예외를 처리하고 재시도 정책을 관리하는 Spring Kafka의 기본적인 에러 처리 구현체입니다
- 재시도 정책 설정: FixedBackOff를 사용하여 1초(1000L) 간격으로 최대 2번 재시도하도록 설정합니다 - 최종 실패 처리: 모든 재시도가 실패한 후 실행될 콜백 함수를 정의하며, 실패한 레코드와 예외 정보를 로깅합니다 - 예외 처리 설정: IllegalArgumentException의 경우 재시도하지 않도록 설정합니다
/**
* DefaultErrorHandler 구성
*
* @return
*/@Beanpublic DefaultErrorHandler defaultErrorHandler(){
// 재시도 정책 설정
BackOff backOff = new FixedBackOff(1000L, 2);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
// 최종 실패 시 처리할 로직
log.debug("처리 실패한 레코드: {} exception : {} ", consumerRecord, exception);
}, backOff);
// 특정 예외는 재시도하지 않도록 설정
errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
return errorHandler;
}
💡 kafkaListenerContainerFactory 예시
- 어노테이션 @KafkaListener 메서드들을 위한 컨테이너 설정을 합니다. - setCommonErrorHandler() 메서드를 통해서 defaultErrorHandler bean을 등록하여서 공통 에러 발생 시 위와 같은 처리를 수행합니다.
- @KafkaListener 어노테이션에서 발생하는 예외를 처리하기 위한 인터페이스입니다. 각각의 리스너 메서드에 대해 개별적인 예외 처리 로직을 정의할 수 있습니다.
💡 KafkaListenerErrorHandler 사용예시
- 아래와 같이 각각의 리스너 별로 다른 에러 처리 로직을 적용할 수 있고, 메시지 처리 실패 시 대체 응답을 반환할 수 있습니다. - 필요한 경우 실패한 메시지를 다른 토픽으로 전달하거나 데이터베이스에 저장할 수 있습니다.
@Bean
public KafkaListenerErrorHandler customErrorHandler(){
return (message, exception) -> {
log.error("메시지 처리 중 오류 발생: {}", exception.getMessage());
log.error("문제가 발생한 메시지: {}", message.getPayload());
// 예외 처리 로직 구현// 필요한 경우 다른 결과 반환 가능return"Error Handled";
};
}
💡 @KafkaListener 사용예시
- 위에 구성한 customErrorHandler bean을 @KafkaListener의 errorHandler 속성을 통해서 등록이 가능합니다.