Spring Boot и Apache Kafka: от Producer к Consumer с нуля

Введение

В мире распределенных систем и микросервисов асинхронная коммуникация через события становится стандартом де-факто. Apache Kafka — это не просто очередь сообщений, а распределенная потоковая платформа, способная обрабатывать триллионы событий в день.

Почему Kafka + Spring Boot — идеальный дуэт?

✅ Масштабируемость — горизонтальное масштабирование за счет партиций и потребителей.
✅ Отказоустойчивость — данные реплицируются и сохраняются на диске.
✅ Высокая пропускная способность — обработка сотен тысяч сообщений в секунду.
✅ Гибкость интеграции — Spring Kafka предоставляет элегантную абстракцию над нативным клиентом.

В этом руководстве мы пройдем путь от настройки простого Producer/Consumer до построения надежного event-driven приложения:

  1. Базовый обмен сообщениями — отправка и прием строк.

  2. Работа с объектами — сериализация/десериализация POJO.

  3. Обработка ошибок и надежность — Retry, Dead Letter Topics (DLT).

  4. Продвинутые паттерны — @KafkaListener, ручное управление оффсетами.


Настройка проекта

1.1. Инициализация через Spring Initializr

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka' // Основная зависимость
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

Базовый Producer и Consumer

2.1. Конфигурация Kafka (application.yml)
spring:
  kafka:
    bootstrap-servers: localhost:9092 # Адрес Kafka-кластера
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: my-group # Идентификатор группы потребителей
      auto-offset-reset: earliest # Чтение с начала, если оффсет не найден
2.2. Простой Producer (отправщик сообщений)
@Service
@Slf4j
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        log.info("->> Отправка сообщения в топик '{}': {}", topic, message);
        // ListenableFuture позволяет добавить коллбэки на успех/ошибку
        kafkaTemplate.send(topic, message)
                .addCallback(
                        result -> log.info("Сообщение успешно отправлено, offset: {}", 
                                          result != null ? result.getRecordMetadata().offset() : "N/A"),
                        ex -> log.error("Ошибка отправки сообщения", ex)
                );
    }
}
2.3. Простой Consumer (получатель сообщений)
@Service
@Slf4j
public class KafkaConsumerService {

    // @KafkaListener - основная аннотация для подписки на топики
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        log.info("<<- Получено сообщение: {}", message);
        // Здесь бизнес-логика обработки сообщения
    }
}

Работа с объектами (JSON)

3.1. Модель данных
@Data // Lombok
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private BigDecimal amount;
    private String status; // CREATED, PROCESSED, CANCELLED
}
3.2. Конфигурация для JSON
spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*" // Для десериализации наших классов
3.3. Producer и Consumer для POJO
@Service
@Slf4j
public class OrderEventProducer {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void sendOrderEvent(String topic, OrderEvent orderEvent) {
        kafkaTemplate.send(topic, orderEvent.getOrderId(), orderEvent);
    }
}

@Service
@Slf4j
public class OrderEventConsumer {
    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(OrderEvent orderEvent) {
        log.info("Получен заказ: {}, сумма: {}", orderEvent.getOrderId(), orderEvent.getAmount());
        // Обработка события заказа...
    }
}

Обработка ошибок и Dead Letter Topic (DLT)

4.1. Конфигурация Retry и DLT

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<String, OrderEvent> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory);

        // Настройка Retry с фиксированной задержкой (3 попытки)
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate), 
                new FixedBackOff(1000L, 2) // Задержка 1 сек, 2 повторные попытки
        ));

        return factory;
    }
}
4.2. Consumer для Dead Letter Topic
@Service
@Slf4j
public class DltOrderEventConsumer {
    @KafkaListener(topics = "order-events.DLT") // По соглашению .DLT
    public void handleFailedOrderEvent(OrderEvent orderEvent) {
        log.error("СОБЫТИЕ В DLT! Не удалось обработать заказ: {}", orderEvent.getOrderId());
        // Аварийная логика: уведомление админа, сохранение в БД для ручного разбора
    }
}

Продвинутые сценарии

5.1. Ручное подтверждение (Ack) и управление оффсетами

@Configuration
@Slf4j
public class ManualAckConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> manualAckContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // Ручное подтверждение
        return factory;
    }

    @KafkaListener(topics = "important-topic", containerFactory = "manualAckContainerFactory")
    public void listenWithAck(String message, Acknowledgment ack) {
        try {
            // Критичная бизнес-логика
            processMessage(message);
            ack.acknowledge(); // Подтверждаем только после успешной обработки
        } catch (Exception e) {
            log.error("Ошибка обработки, сообщение не будет подтверждено и будет перечитано", e);
            // ack не вызывается -> сообщение будет обработано повторно
        }
    }
}

5.2. Обработка батчей (пакетов сообщений)

@Service
@Slf4j
public class BatchConsumerService {

    // Обработка пачки сообщений за один вызов (для повышения производительности)
    @KafkaListener(topics = "logs-topic", containerFactory = "batchFactory")
    public void listenBatch(List<ConsumerRecord<String, String>> records) {
        log.info("Получена пачка из {} сообщений", records.size());
        for (ConsumerRecord<String, String> record : records) {
            // Пакетная обработка
            processLogRecord(record.value());
        }
    }
}

Заключение

Интеграция Spring Boot с Apache Kafka открывает путь к созданию масштабируемых, отказоустойчивых и слабосвязанных систем. Мы прошли ключевые этапы:

✅ От простой отправки строк до работы с комплексными событиями (DTO)
✅ Настроили надежную доставку с Retry и Dead Letter Topics
✅ Рассмотрели продвинутые сценарии: ручное управление и батчи

Для дальнейшего углубления:

  1. Kafka Streams — обработка потоков данных прямо в вашем приложении (аналогия с Apache Flink/Spark Streaming, но проще).

  2. Транзакции в Kafka — гарантия exactly-once семантики для критичных операций.

  3. Schema Registry (Confluent, Apicurio) — управление версиями схемы сообщений (Avro, Protobuf).

  4. Мониторинг — интеграция с Micrometer, метрики Spring Boot Actuator (/actuator/kafka).

  5. Тестирование — использование EmbeddedKafka для интеграционных тестов.