diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 60d0199dbc8..2eac0ac960c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -24,11 +24,9 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; -import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.MessageConverter; @@ -55,10 +53,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private ConsumerAwareRebalanceListener rebalanceListener; - private ErrorHandler errorHandler; - - private BatchErrorHandler batchErrorHandler; - private CommonErrorHandler commonErrorHandler; private AfterRollbackProcessor afterRollbackProcessor; @@ -114,22 +108,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.rebalanceListener = rebalanceListener; } - /** - * Set the {@link ErrorHandler} to use. - * @param errorHandler the error handler - */ - void setErrorHandler(ErrorHandler errorHandler) { - this.errorHandler = errorHandler; - } - - /** - * Set the {@link BatchErrorHandler} to use. - * @param batchErrorHandler the error handler - */ - void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) { - this.batchErrorHandler = batchErrorHandler; - } - /** * Set the {@link CommonErrorHandler} to use. * @param commonErrorHandler the error handler. @@ -178,10 +156,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(this.replyTemplate).to(factory::setReplyTemplate); if (properties.getType().equals(Listener.Type.BATCH)) { factory.setBatchListener(true); - factory.setBatchErrorHandler(this.batchErrorHandler); - } - else { - factory.setErrorHandler(this.errorHandler); } map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler); map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index ad6217e952b..5a6fb0e5231 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,10 +29,8 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; -import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; -import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.BatchMessageConverter; @@ -65,10 +63,6 @@ class KafkaAnnotationDrivenConfiguration { private final ConsumerAwareRebalanceListener rebalanceListener; - private final ErrorHandler errorHandler; - - private final BatchErrorHandler batchErrorHandler; - private final CommonErrorHandler commonErrorHandler; private final AfterRollbackProcessor afterRollbackProcessor; @@ -81,8 +75,8 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider batchMessageConverter, ObjectProvider> kafkaTemplate, ObjectProvider> kafkaTransactionManager, - ObjectProvider rebalanceListener, ObjectProvider errorHandler, - ObjectProvider batchErrorHandler, ObjectProvider commonErrorHandler, + ObjectProvider rebalanceListener, + ObjectProvider commonErrorHandler, ObjectProvider> afterRollbackProcessor, ObjectProvider> recordInterceptor) { this.properties = properties; @@ -93,8 +87,6 @@ class KafkaAnnotationDrivenConfiguration { this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.transactionManager = kafkaTransactionManager.getIfUnique(); this.rebalanceListener = rebalanceListener.getIfUnique(); - this.errorHandler = errorHandler.getIfUnique(); - this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.commonErrorHandler = commonErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.recordInterceptor = recordInterceptor.getIfUnique(); @@ -112,8 +104,6 @@ class KafkaAnnotationDrivenConfiguration { configurer.setReplyTemplate(this.kafkaTemplate); configurer.setTransactionManager(this.transactionManager); configurer.setRebalanceListener(this.rebalanceListener); - configurer.setErrorHandler(this.errorHandler); - configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setCommonErrorHandler(this.commonErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setRecordInterceptor(this.recordInterceptor); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3e3a7ee970c..888db74518b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -57,12 +57,10 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; -import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; -import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -511,46 +509,6 @@ class KafkaAutoConfigurationTests { }); } - @Test - void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() { - this.contextRunner.withBean("errorHandler", ErrorHandler.class, () -> mock(ErrorHandler.class)) - .run((context) -> { - ConcurrentKafkaListenerContainerFactory factory = context - .getBean(ConcurrentKafkaListenerContainerFactory.class); - assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", context.getBean("errorHandler")); - }); - } - - @Test - void concurrentKafkaListenerContainerFactoryInBatchModeShouldUseBatchErrorHandler() { - this.contextRunner.withBean("batchErrorHandler", BatchErrorHandler.class, () -> mock(BatchErrorHandler.class)) - .withPropertyValues("spring.kafka.listener.type=batch").run((context) -> { - ConcurrentKafkaListenerContainerFactory factory = context - .getBean(ConcurrentKafkaListenerContainerFactory.class); - assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", - context.getBean("batchErrorHandler")); - }); - } - - @Test - void concurrentKafkaListenerContainerFactoryInBatchModeWhenBatchErrorHandlerNotAvailableShouldBeNull() { - this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> { - ConcurrentKafkaListenerContainerFactory factory = context - .getBean(ConcurrentKafkaListenerContainerFactory.class); - assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null); - }); - } - - @Test - void concurrentKafkaListenerContainerFactoryInBatchModeAndSimpleErrorHandlerShouldBeNull() { - this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch") - .withBean("errorHandler", ErrorHandler.class, () -> mock(ErrorHandler.class)).run((context) -> { - ConcurrentKafkaListenerContainerFactory factory = context - .getBean(ConcurrentKafkaListenerContainerFactory.class); - assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null); - }); - } - @Test void testConcurrentKafkaListenerContainerFactoryWithCustomCommonErrorHandler() { this.contextRunner.withBean("errorHandler", CommonErrorHandler.class, () -> mock(CommonErrorHandler.class)) @@ -710,7 +668,7 @@ class KafkaAutoConfigurationTests { @Bean RecordInterceptor recordInterceptor() { - return (record) -> record; + return (record, consumer) -> record; } } diff --git a/spring-boot-project/spring-boot-dependencies/build.gradle b/spring-boot-project/spring-boot-dependencies/build.gradle index a645f85ab94..a13abfc6e63 100644 --- a/spring-boot-project/spring-boot-dependencies/build.gradle +++ b/spring-boot-project/spring-boot-dependencies/build.gradle @@ -719,7 +719,7 @@ bom { ] } } - library("Kafka", "3.0.0") { + library("Kafka", "3.1.0") { group("org.apache.kafka") { modules = [ "connect", @@ -1355,7 +1355,7 @@ bom { ] } } - library("Spring Kafka", "3.0.0-M1") { + library("Spring Kafka", "3.0.0-SNAPSHOT") { group("org.springframework.kafka") { modules = [ "spring-kafka", diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/kafka.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/kafka.adoc index cb8e2cd553e..cb4c29f0fe5 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/kafka.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/kafka.adoc @@ -42,7 +42,7 @@ The following component creates a listener endpoint on the `someTopic` topic: include::code:MyBean[] If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory. -Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `CommonErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory. +Similarly, if a `RecordFilterStrategy`, `CommonErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory. Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory. If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.