Polish "Add configuration property for Spring Kafka's missingTopicsFatal"

Closes gh-16740
This commit is contained in:
Stephane Nicoll 2019-05-20 14:45:59 +02:00
parent 1583ce8d26
commit 3c46b9e83d
3 changed files with 20 additions and 5 deletions

View File

@ -158,8 +158,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getMonitorInterval).as(Duration::getSeconds)
.as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(this.transactionManager).to(container::setTransactionManager);
map.from(properties::getMissingTopicsFatal).to(container::setMissingTopicsFatal);
}
}

View File

@ -874,9 +874,10 @@ public class KafkaProperties {
private Boolean logContainerConfig;
/**
* Set to false to disable checking that topic(s) exist.
* Whether the container should fail to start if at least one of the configured
* topics are not present on the broker.
*/
private Boolean missingTopicsFatal;
private boolean missingTopicsFatal = true;
public Type getType() {
return this.type;
@ -966,11 +967,11 @@ public class KafkaProperties {
this.logContainerConfig = logContainerConfig;
}
public Boolean getMissingTopicsFatal() {
public boolean isMissingTopicsFatal() {
return this.missingTopicsFatal;
}
public void setMissingTopicsFatal(Boolean missingTopicsFatal) {
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
this.missingTopicsFatal = missingTopicsFatal;
}

View File

@ -38,6 +38,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -511,6 +512,19 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void listenerPropertiesMatchDefaults() {
this.contextRunner.run((context) -> {
Listener listenerProperties = new KafkaProperties().getListener();
AbstractKafkaListenerContainerFactory<?, ?, ?> kafkaListenerContainerFactory = (AbstractKafkaListenerContainerFactory<?, ?, ?>) context
.getBean(KafkaListenerContainerFactory.class);
ContainerProperties containerProperties = kafkaListenerContainerFactory
.getContainerProperties();
assertThat(containerProperties.isMissingTopicsFatal())
.isEqualTo(listenerProperties.isMissingTopicsFatal());
});
}
@Test
public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)