Введение
В мире распределенных систем и микросервисов асинхронная коммуникация через события становится стандартом де-факто. Apache Kafka — это не просто очередь сообщений, а распределенная потоковая платформа, способная обрабатывать триллионы событий в день.
Почему Kafka + Spring Boot — идеальный дуэт?
✅ Масштабируемость — горизонтальное масштабирование за счет партиций и потребителей.
✅ Отказоустойчивость — данные реплицируются и сохраняются на диске.
✅ Высокая пропускная способность — обработка сотен тысяч сообщений в секунду.
✅ Гибкость интеграции — Spring Kafka предоставляет элегантную абстракцию над нативным клиентом.
В этом руководстве мы пройдем путь от настройки простого Producer/Consumer до построения надежного event-driven приложения:
-
Базовый обмен сообщениями — отправка и прием строк.
-
Работа с объектами — сериализация/десериализация POJO.
-
Обработка ошибок и надежность — Retry, Dead Letter Topics (DLT).
-
Продвинутые паттерны —
@KafkaListener, ручное управление оффсетами.
Настройка проекта
1.1. Инициализация через Spring Initializr
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' // Основная зависимость compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' }
Базовый Producer и Consumer
2.1. Конфигурация Kafka (application.yml)
spring: kafka: bootstrap-servers: localhost:9092 # Адрес Kafka-кластера producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: my-group # Идентификатор группы потребителей auto-offset-reset: earliest # Чтение с начала, если оффсет не найден
2.2. Простой Producer (отправщик сообщений)
@Service @Slf4j public class KafkaProducerService { private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topic, String message) { log.info("->> Отправка сообщения в топик '{}': {}", topic, message); // ListenableFuture позволяет добавить коллбэки на успех/ошибку kafkaTemplate.send(topic, message) .addCallback( result -> log.info("Сообщение успешно отправлено, offset: {}", result != null ? result.getRecordMetadata().offset() : "N/A"), ex -> log.error("Ошибка отправки сообщения", ex) ); } }
2.3. Простой Consumer (получатель сообщений)
@Service @Slf4j public class KafkaConsumerService { // @KafkaListener - основная аннотация для подписки на топики @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { log.info("<<- Получено сообщение: {}", message); // Здесь бизнес-логика обработки сообщения } }
Работа с объектами (JSON)
3.1. Модель данных
@Data // Lombok @NoArgsConstructor @AllArgsConstructor public class OrderEvent { private String orderId; private BigDecimal amount; private String status; // CREATED, PROCESSED, CANCELLED }
3.2. Конфигурация для JSON
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "*" // Для десериализации наших классов
3.3. Producer и Consumer для POJO
@Service @Slf4j public class OrderEventProducer { private final KafkaTemplate<String, OrderEvent> kafkaTemplate; public void sendOrderEvent(String topic, OrderEvent orderEvent) { kafkaTemplate.send(topic, orderEvent.getOrderId(), orderEvent); } } @Service @Slf4j public class OrderEventConsumer { @KafkaListener(topics = "order-events") public void handleOrderEvent(OrderEvent orderEvent) { log.info("Получен заказ: {}, сумма: {}", orderEvent.getOrderId(), orderEvent.getAmount()); // Обработка события заказа... } }
Обработка ошибок и Dead Letter Topic (DLT)
4.1. Конфигурация Retry и DLT
@Configuration public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<String, OrderEvent> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, consumerFactory); // Настройка Retry с фиксированной задержкой (3 попытки) factory.setErrorHandler(new SeekToCurrentErrorHandler( new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(1000L, 2) // Задержка 1 сек, 2 повторные попытки )); return factory; } }
4.2. Consumer для Dead Letter Topic
@Service @Slf4j public class DltOrderEventConsumer { @KafkaListener(topics = "order-events.DLT") // По соглашению .DLT public void handleFailedOrderEvent(OrderEvent orderEvent) { log.error("СОБЫТИЕ В DLT! Не удалось обработать заказ: {}", orderEvent.getOrderId()); // Аварийная логика: уведомление админа, сохранение в БД для ручного разбора } }
Продвинутые сценарии
5.1. Ручное подтверждение (Ack) и управление оффсетами
@Configuration @Slf4j public class ManualAckConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> manualAckContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // Ручное подтверждение return factory; } @KafkaListener(topics = "important-topic", containerFactory = "manualAckContainerFactory") public void listenWithAck(String message, Acknowledgment ack) { try { // Критичная бизнес-логика processMessage(message); ack.acknowledge(); // Подтверждаем только после успешной обработки } catch (Exception e) { log.error("Ошибка обработки, сообщение не будет подтверждено и будет перечитано", e); // ack не вызывается -> сообщение будет обработано повторно } } }
5.2. Обработка батчей (пакетов сообщений)
Заключение
Интеграция Spring Boot с Apache Kafka открывает путь к созданию масштабируемых, отказоустойчивых и слабосвязанных систем. Мы прошли ключевые этапы:
✅ От простой отправки строк до работы с комплексными событиями (DTO)
✅ Настроили надежную доставку с Retry и Dead Letter Topics
✅ Рассмотрели продвинутые сценарии: ручное управление и батчи
Для дальнейшего углубления:
-
Kafka Streams — обработка потоков данных прямо в вашем приложении (аналогия с Apache Flink/Spark Streaming, но проще).
-
Транзакции в Kafka — гарантия exactly-once семантики для критичных операций.
-
Schema Registry (Confluent, Apicurio) — управление версиями схемы сообщений (Avro, Protobuf).
-
Мониторинг — интеграция с Micrometer, метрики Spring Boot Actuator (
/actuator/kafka). -
Тестирование — использование
EmbeddedKafkaдля интеграционных тестов.

