diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 0a3a4665e98..167dd98f5c9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.ErrorHandler; @@ -84,9 +85,10 @@ class KafkaAnnotationDrivenConfiguration { @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ConsumerFactory kafkaConsumerFactory) { + ObjectProvider> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory); + configurer.configure(factory, kafkaConsumerFactory + .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()))); return factory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index b367f989968..2acd944e35b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; @@ -476,6 +477,16 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void testConcurrentKafkaListenerContainerFactoryWithCustomConsumerFactory() { + this.contextRunner.withUserConfiguration(ConsumerFactoryConfiguration.class).run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(kafkaListenerContainerFactory.getConsumerFactory()) + .isNotSameAs(context.getBean(ConsumerFactoryConfiguration.class).consumerFactory); + }); + } + @Configuration protected static class MessageConverterConfiguration { @@ -520,6 +531,18 @@ public class KafkaAutoConfigurationTests { } + @Configuration + protected static class ConsumerFactoryConfiguration { + + private final ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + + @Bean + public ConsumerFactory myConsumerFactory() { + return this.consumerFactory; + } + + } + @Configuration @EnableKafkaStreams protected static class EnableKafkaStreamsConfiguration {