Support setting more properties of KafkaAdmin
See gh-33288
This commit is contained in:
parent
db80fc1734
commit
6e064ed5e0
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -141,8 +141,16 @@ public class KafkaAutoConfiguration {
|
|||
@ConditionalOnMissingBean
|
||||
public KafkaAdmin kafkaAdmin() {
|
||||
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
|
||||
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
|
||||
kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());
|
||||
KafkaProperties.Admin admin = this.properties.getAdmin();
|
||||
if (admin.getCloseTimeout() != null) {
|
||||
kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds());
|
||||
}
|
||||
if (admin.getOperationTimeout() != null) {
|
||||
kafkaAdmin.setOperationTimeout((int) admin.getOperationTimeout().getSeconds());
|
||||
}
|
||||
kafkaAdmin.setFatalIfBrokerNotAvailable(admin.isFailFast());
|
||||
kafkaAdmin.setModifyTopicConfigs(admin.isModifyTopicConfigs());
|
||||
kafkaAdmin.setAutoCreate(admin.isAutoCreate());
|
||||
return kafkaAdmin;
|
||||
}
|
||||
|
||||
|
|
|
@ -642,6 +642,16 @@ public class KafkaProperties {
|
|||
*/
|
||||
private final Map<String, String> properties = new HashMap<>();
|
||||
|
||||
/**
|
||||
* The close timeout.
|
||||
*/
|
||||
private Duration closeTimeout;
|
||||
|
||||
/**
|
||||
* The operation timeout.
|
||||
*/
|
||||
private Duration operationTimeout;
|
||||
|
||||
/**
|
||||
* Whether to fail fast if the broker is not available on startup.
|
||||
*/
|
||||
|
@ -652,6 +662,12 @@ public class KafkaProperties {
|
|||
*/
|
||||
private boolean modifyTopicConfigs;
|
||||
|
||||
/**
|
||||
* Whether to automatically create topics during context initialization. When set
|
||||
* to false, disables automatic topic creation during context initialization.
|
||||
*/
|
||||
private boolean autoCreate = true;
|
||||
|
||||
public Ssl getSsl() {
|
||||
return this.ssl;
|
||||
}
|
||||
|
@ -668,6 +684,22 @@ public class KafkaProperties {
|
|||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public Duration getCloseTimeout() {
|
||||
return this.closeTimeout;
|
||||
}
|
||||
|
||||
public void setCloseTimeout(Duration closeTimeout) {
|
||||
this.closeTimeout = closeTimeout;
|
||||
}
|
||||
|
||||
public Duration getOperationTimeout() {
|
||||
return this.operationTimeout;
|
||||
}
|
||||
|
||||
public void setOperationTimeout(Duration operationTimeout) {
|
||||
this.operationTimeout = operationTimeout;
|
||||
}
|
||||
|
||||
public boolean isFailFast() {
|
||||
return this.failFast;
|
||||
}
|
||||
|
@ -684,6 +716,14 @@ public class KafkaProperties {
|
|||
this.modifyTopicConfigs = modifyTopicConfigs;
|
||||
}
|
||||
|
||||
public boolean isAutoCreate() {
|
||||
return this.autoCreate;
|
||||
}
|
||||
|
||||
public void setAutoCreate(boolean autoCreate) {
|
||||
this.autoCreate = autoCreate;
|
||||
}
|
||||
|
||||
public Map<String, String> getProperties() {
|
||||
return this.properties;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import java.io.File;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -209,14 +210,17 @@ class KafkaAutoConfigurationTests {
|
|||
|
||||
@Test
|
||||
void adminProperties() {
|
||||
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
|
||||
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.admin.fail-fast=true",
|
||||
"spring.kafka.admin.properties.fiz.buz=fix.fox", "spring.kafka.admin.security.protocol=SSL",
|
||||
"spring.kafka.admin.ssl.key-password=p4", "spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
|
||||
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
|
||||
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
|
||||
"spring.kafka.admin.ssl.trust-store-password=p6", "spring.kafka.admin.ssl.trust-store-type=PKCS12",
|
||||
"spring.kafka.admin.ssl.protocol=TLSv1.2", "spring.kafka.admin.modify-topic-configs=true")
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
|
||||
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox",
|
||||
"spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4",
|
||||
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
|
||||
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
|
||||
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
|
||||
"spring.kafka.admin.ssl.trust-store-password=p6",
|
||||
"spring.kafka.admin.ssl.trust-store-type=PKCS12", "spring.kafka.admin.ssl.protocol=TLSv1.2",
|
||||
"spring.kafka.admin.close-timeout=35s", "spring.kafka.admin.operation-timeout=60s",
|
||||
"spring.kafka.admin.modify-topic-configs=true", "spring.kafka.admin.auto-create=false")
|
||||
.run((context) -> {
|
||||
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
|
||||
Map<String, Object> configs = admin.getConfigurationProperties();
|
||||
|
@ -237,8 +241,11 @@ class KafkaAutoConfigurationTests {
|
|||
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).isEmpty();
|
||||
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");
|
||||
assertThat(configs).containsEntry("fiz.buz", "fix.fox");
|
||||
assertThat(admin).hasFieldOrPropertyWithValue("closeTimeout", Duration.ofSeconds(35));
|
||||
assertThat(admin).hasFieldOrPropertyWithValue("operationTimeout", 60);
|
||||
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true);
|
||||
assertThat(admin).hasFieldOrPropertyWithValue("modifyTopicConfigs", true);
|
||||
assertThat(admin).hasFieldOrPropertyWithValue("autoCreate", false);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue