Polish "Auto-configure Kafka MessageConverter"

Closes gh-10380
This commit is contained in:
Stephane Nicoll 2017-10-02 16:36:04 +02:00
parent d7bc93f278
commit 2537a0a753
5 changed files with 131 additions and 137 deletions

View File

@ -20,7 +20,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
/** /**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
@ -33,7 +33,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties; private KafkaProperties properties;
private MessageConverter messageConverter; private RecordMessageConverter messageConverter;
/** /**
* Set the {@link KafkaProperties} to use. * Set the {@link KafkaProperties} to use.
@ -44,10 +44,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
} }
/** /**
* Set the {@link MessageConverter} to use. * Set the {@link RecordMessageConverter} to use.
* @param messageConverter the message converter * @param messageConverter the message converter
*/ */
public void setMessageConverter(MessageConverter messageConverter) { void setMessageConverter(RecordMessageConverter messageConverter) {
this.messageConverter = messageConverter; this.messageConverter = messageConverter;
} }
@ -61,10 +61,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
public void configure( public void configure(
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory, ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) { ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory);
if (this.messageConverter != null) { if (this.messageConverter != null) {
listenerContainerFactory.setMessageConverter(this.messageConverter); listenerContainerFactory.setMessageConverter(this.messageConverter);
} }
listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener(); Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties(); .getContainerProperties();

View File

@ -25,7 +25,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
/** /**
* Configuration for Kafka annotation-driven support. * Configuration for Kafka annotation-driven support.
@ -40,12 +40,12 @@ class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties; private final KafkaProperties properties;
private final MessageConverter messageConverter; private final RecordMessageConverter messageConverter;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<MessageConverter> messageConverter) { ObjectProvider<RecordMessageConverter> messageConverter) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfAvailable(); this.messageConverter = messageConverter.getIfUnique();
} }
@Bean @Bean

View File

@ -60,7 +60,7 @@ public class KafkaAutoConfiguration {
public KafkaAutoConfiguration(KafkaProperties properties, public KafkaAutoConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter) { ObjectProvider<RecordMessageConverter> messageConverter) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfAvailable(); this.messageConverter = messageConverter.getIfUnique();
} }
@Bean @Bean

View File

@ -45,7 +45,6 @@ import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils;
@ -68,7 +67,8 @@ public class KafkaAutoConfigurationTests {
@Test @Test
public void consumerProperties() { public void consumerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner
.withUserConfiguration(TestConfiguration.class)
.withPropertyValues( .withPropertyValues(
"spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.properties.foo=bar", "spring.kafka.properties.foo=bar",
@ -90,57 +90,58 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer"
.run((context) -> { ).run((context) -> {
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
.getBean(DefaultKafkaConsumerFactory.class); .getBean(DefaultKafkaConsumerFactory.class);
Map<String, Object> configs = consumerFactory.getConfigurationProperties(); Map<String, Object> configs = consumerFactory.getConfigurationProperties();
// common // common
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("foo:1234")); .isEqualTo(Collections.singletonList("foo:1234"));
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLoc"); .endsWith(File.separator + "ksLoc");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc"); .endsWith(File.separator + "tsLoc");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p3"); .isEqualTo("p3");
// consumer // consumer
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
.isEqualTo(Boolean.FALSE); .isEqualTo(Boolean.FALSE);
assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))
.isEqualTo(123); .isEqualTo(123);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("earliest"); .isEqualTo("earliest");
assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789); assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)) assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG))
.isEqualTo(234); .isEqualTo(234);
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(LongDeserializer.class); .isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class); .isEqualTo(IntegerDeserializer.class);
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42); assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
assertThat(configs.get("foo")).isEqualTo("bar"); assertThat(configs.get("foo")).isEqualTo("bar");
assertThat(configs.get("baz")).isEqualTo("qux"); assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
}); });
} }
@Test @Test
public void producerProperties() { public void producerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner
.withUserConfiguration(TestConfiguration.class)
.withPropertyValues( .withPropertyValues(
"spring.kafka.clientId=cid", "spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.producer.acks=all", "spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20", "spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test "spring.kafka.producer.bootstrap-servers=bar:1234", // test
// override // override
"spring.kafka.producer.buffer-memory=12345", "spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
@ -151,38 +152,38 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.producer.ssl.keystore-password=p5", "spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6", "spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer") "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer"
.run((context) -> { ).run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context DefaultKafkaProducerFactory<?, ?> producerFactory = context
.getBean(DefaultKafkaProducerFactory.class); .getBean(DefaultKafkaProducerFactory.class);
Map<String, Object> configs = producerFactory.getConfigurationProperties(); Map<String, Object> configs = producerFactory.getConfigurationProperties();
// common // common
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// producer // producer
assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all"); assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20); assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("bar:1234")); // override .isEqualTo(Collections.singletonList("bar:1234")); // override
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L); assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
.isEqualTo(LongSerializer.class); .isEqualTo(LongSerializer.class);
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP"); .endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP"); .endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6"); .isEqualTo("p6");
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class); .isEqualTo(IntegerSerializer.class);
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty(); .isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
}); });
} }
@Test @Test
@ -210,7 +211,7 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6"); .isEqualTo("p6");
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty(); .isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable",
@ -232,56 +233,56 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.jaas.enabled=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.control-flag=REQUISITE",
"spring.kafka.jaas.options.useKeyTab=true") "spring.kafka.jaas.options.useKeyTab=true"
.run((context) -> { ).run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context DefaultKafkaProducerFactory<?, ?> producerFactory = context
.getBean(DefaultKafkaProducerFactory.class); .getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
.getBean(DefaultKafkaConsumerFactory.class); .getBean(DefaultKafkaConsumerFactory.class);
KafkaTemplate<?, ?> kafkaTemplate = context.getBean(KafkaTemplate.class); KafkaTemplate<?, ?> kafkaTemplate = context.getBean(KafkaTemplate.class);
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = context KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = context
.getBean(KafkaListenerContainerFactory.class); .getBean(KafkaListenerContainerFactory.class);
assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf( assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(
MessagingMessageConverter.class); MessagingMessageConverter.class);
assertThat(new DirectFieldAccessor(kafkaTemplate) assertThat(new DirectFieldAccessor(kafkaTemplate)
.getPropertyValue("producerFactory")).isEqualTo(producerFactory); .getPropertyValue("producerFactory")).isEqualTo(producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory); DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory); assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(dfa.getPropertyValue("containerProperties.ackMode")) assertThat(dfa.getPropertyValue("containerProperties.ackMode"))
.isEqualTo(AckMode.MANUAL); .isEqualTo(AckMode.MANUAL);
assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123); assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L); assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.isEqualTo(2000L); .isEqualTo(2000L);
assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true);
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.hasSize(1); .hasSize(1);
KafkaJaasLoginModuleInitializer jaas = context KafkaJaasLoginModuleInitializer jaas = context
.getBean(KafkaJaasLoginModuleInitializer.class); .getBean(KafkaJaasLoginModuleInitializer.class);
dfa = new DirectFieldAccessor(jaas); dfa = new DirectFieldAccessor(jaas);
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag")) assertThat(dfa.getPropertyValue("controlFlag"))
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(((Map<String, String>) dfa.getPropertyValue("options"))) assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
.containsExactly(entry("useKeyTab", "true")); .containsExactly(entry("useKeyTab", "true"));
}); });
} }
@Test @Test
public void testKafkaTemplateRecordMessageConverters() { public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(RecordMessageConvertersConfiguration.class) this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.run((context) -> { .run((context) -> {
KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class); KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);
assertThat(kafkaTemplate.getMessageConverter()) assertThat(kafkaTemplate.getMessageConverter())
.isSameAs(context.getBean("myRecordMessageConverter")); .isSameAs(context.getBean("myMessageConverter"));
}); });
} }
@Test @Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() { public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class) this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.run((context) -> { .run((context) -> {
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class); .getBean(ConcurrentKafkaListenerContainerFactory.class);
@ -298,23 +299,13 @@ public class KafkaAutoConfigurationTests {
} }
@Configuration @Configuration
protected static class RecordMessageConvertersConfiguration { protected static class MessageConverterConfiguration {
@Bean @Bean
public RecordMessageConverter myRecordMessageConverter() { public RecordMessageConverter myMessageConverter() {
return mock(RecordMessageConverter.class); return mock(RecordMessageConverter.class);
} }
} }
@Configuration
protected static class MessageConvertersConfiguration {
@Bean
public MessageConverter myMessageConverter() {
return mock(MessageConverter.class);
}
}
} }

View File

@ -4846,6 +4846,8 @@ public class MyBean {
} }
---- ----
NOTE: If a `RecordMessageConverter` bean is defined, it is associated automatically to the
auto-configured `KafkaTemplate`.
[[boot-features-kafka-receiving-a-message]] [[boot-features-kafka-receiving-a-message]]
@ -4853,7 +4855,8 @@ public class MyBean {
When the Apache Kafka infrastructure is present, any bean can be annotated with When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` `@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory`
has been defined, a default one is configured automatically with keys defined in has been defined, a default one is configured automatically with keys defined in
`spring.kafka.listener.*`. `spring.kafka.listener.*`. Also, if a `RecordMessageConverter` bean is defined, it is
associated automatically to the default factory.
The following component creates a listener endpoint on the `someTopic` topic: The following component creates a listener endpoint on the `someTopic` topic: