Auto-configure listener container factory without consumer factory
Previously, the presence of a `ConsumerFactory` bean would make the auto-configured one to back off, leading to a failure down the line if no available instance matches the generics criterion. This commit improves the auto-configuration to create a `ConsumerFactory<?,?>` behind the scenes if none is available. Closes gh-19221
This commit is contained in:
parent
5e61f0d7b1
commit
f5761bd508
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2019 the original author or authors.
|
* Copyright 2012-2020 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -25,6 +25,7 @@ import org.springframework.kafka.annotation.EnableKafka;
|
||||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
import org.springframework.kafka.config.KafkaListenerConfigUtils;
|
import org.springframework.kafka.config.KafkaListenerConfigUtils;
|
||||||
import org.springframework.kafka.core.ConsumerFactory;
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
||||||
import org.springframework.kafka.listener.ErrorHandler;
|
import org.springframework.kafka.listener.ErrorHandler;
|
||||||
|
|
@ -84,9 +85,10 @@ class KafkaAnnotationDrivenConfiguration {
|
||||||
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
|
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
|
||||||
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
|
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
|
||||||
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
|
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
|
||||||
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
|
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
|
||||||
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
configurer.configure(factory, kafkaConsumerFactory);
|
configurer.configure(factory, kafkaConsumerFactory
|
||||||
|
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2019 the original author or authors.
|
* Copyright 2012-2020 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -48,6 +48,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||||
import org.springframework.kafka.config.KafkaStreamsConfiguration;
|
import org.springframework.kafka.config.KafkaStreamsConfiguration;
|
||||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
import org.springframework.kafka.core.KafkaAdmin;
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
|
@ -476,6 +477,16 @@ public class KafkaAutoConfigurationTests {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcurrentKafkaListenerContainerFactoryWithCustomConsumerFactory() {
|
||||||
|
this.contextRunner.withUserConfiguration(ConsumerFactoryConfiguration.class).run((context) -> {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
|
||||||
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
||||||
|
assertThat(kafkaListenerContainerFactory.getConsumerFactory())
|
||||||
|
.isNotSameAs(context.getBean(ConsumerFactoryConfiguration.class).consumerFactory);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
protected static class MessageConverterConfiguration {
|
protected static class MessageConverterConfiguration {
|
||||||
|
|
||||||
|
|
@ -520,6 +531,18 @@ public class KafkaAutoConfigurationTests {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
protected static class ConsumerFactoryConfiguration {
|
||||||
|
|
||||||
|
private final ConsumerFactory<String, Object> consumerFactory = mock(ConsumerFactory.class);
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, Object> myConsumerFactory() {
|
||||||
|
return this.consumerFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableKafkaStreams
|
@EnableKafkaStreams
|
||||||
protected static class EnableKafkaStreamsConfiguration {
|
protected static class EnableKafkaStreamsConfiguration {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue