Add config property for KafkaAdmin modifyTopicConfigs
See gh-31679
This commit is contained in:
parent
8e3aaf1cf4
commit
677c05a5b1
|
|
@ -142,6 +142,7 @@ public class KafkaAutoConfiguration {
|
||||||
public KafkaAdmin kafkaAdmin() {
|
public KafkaAdmin kafkaAdmin() {
|
||||||
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
|
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
|
||||||
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
|
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
|
||||||
|
kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());
|
||||||
return kafkaAdmin;
|
return kafkaAdmin;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -647,6 +647,11 @@ public class KafkaProperties {
|
||||||
*/
|
*/
|
||||||
private boolean failFast;
|
private boolean failFast;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to enable modification of existing topic configuration.
|
||||||
|
*/
|
||||||
|
private boolean modifyTopicConfigs;
|
||||||
|
|
||||||
public Ssl getSsl() {
|
public Ssl getSsl() {
|
||||||
return this.ssl;
|
return this.ssl;
|
||||||
}
|
}
|
||||||
|
|
@ -671,6 +676,14 @@ public class KafkaProperties {
|
||||||
this.failFast = failFast;
|
this.failFast = failFast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isModifyTopicConfigs() {
|
||||||
|
return this.modifyTopicConfigs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setModifyTopicConfigs(boolean modifyTopicConfigs) {
|
||||||
|
this.modifyTopicConfigs = modifyTopicConfigs;
|
||||||
|
}
|
||||||
|
|
||||||
public Map<String, String> getProperties() {
|
public Map<String, String> getProperties() {
|
||||||
return this.properties;
|
return this.properties;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -209,15 +209,14 @@ class KafkaAutoConfigurationTests {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void adminProperties() {
|
void adminProperties() {
|
||||||
this.contextRunner
|
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
|
||||||
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
|
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.admin.fail-fast=true",
|
||||||
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox",
|
"spring.kafka.admin.properties.fiz.buz=fix.fox", "spring.kafka.admin.security.protocol=SSL",
|
||||||
"spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4",
|
"spring.kafka.admin.ssl.key-password=p4", "spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
|
||||||
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
|
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
|
||||||
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
|
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
|
||||||
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
|
"spring.kafka.admin.ssl.trust-store-password=p6", "spring.kafka.admin.ssl.trust-store-type=PKCS12",
|
||||||
"spring.kafka.admin.ssl.trust-store-password=p6",
|
"spring.kafka.admin.ssl.protocol=TLSv1.2", "spring.kafka.admin.modify-topic-configs=true")
|
||||||
"spring.kafka.admin.ssl.trust-store-type=PKCS12", "spring.kafka.admin.ssl.protocol=TLSv1.2")
|
|
||||||
.run((context) -> {
|
.run((context) -> {
|
||||||
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
|
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
|
||||||
Map<String, Object> configs = admin.getConfigurationProperties();
|
Map<String, Object> configs = admin.getConfigurationProperties();
|
||||||
|
|
@ -239,6 +238,7 @@ class KafkaAutoConfigurationTests {
|
||||||
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
||||||
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
||||||
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true);
|
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true);
|
||||||
|
assertThat(admin).hasFieldOrPropertyWithValue("modifyTopicConfigs", true);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue