parent
0dbd9429cc
commit
7cd19822c6
|
@ -193,11 +193,6 @@
|
|||
<artifactId>elasticsearch</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.flywaydb</groupId>
|
||||
<artifactId>flyway-core</artifactId>
|
||||
|
@ -311,6 +306,11 @@
|
|||
<artifactId>spring-integration-core</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.security</groupId>
|
||||
<artifactId>spring-security-config</artifactId>
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
|||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
|
@ -41,40 +42,35 @@ import org.springframework.kafka.core.KafkaAdmin;
|
|||
* @author Juan Rada
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(KafkaAdmin.class)
|
||||
@ConditionalOnBean(KafkaAdmin.class)
|
||||
@ConditionalOnEnabledHealthIndicator("kafka")
|
||||
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
|
||||
@AutoConfigureAfter(KafkaAutoConfiguration.class)
|
||||
public class KafkaHealthIndicatorAutoConfiguration {
|
||||
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
|
||||
public class KafkaHealthIndicatorAutoConfiguration extends
|
||||
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnBean(KafkaAdmin.class)
|
||||
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
|
||||
static class KafkaClientHealthIndicatorConfiguration extends
|
||||
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
|
||||
private final Map<String, KafkaAdmin> admins;
|
||||
|
||||
private final Map<String, KafkaAdmin> admins;
|
||||
private final KafkaHealthIndicatorProperties properties;
|
||||
|
||||
private final KafkaHealthIndicatorProperties properties;
|
||||
KafkaHealthIndicatorAutoConfiguration(Map<String, KafkaAdmin> admins,
|
||||
KafkaHealthIndicatorProperties properties) {
|
||||
this.admins = admins;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
KafkaClientHealthIndicatorConfiguration(Map<String, KafkaAdmin> admins,
|
||||
KafkaHealthIndicatorProperties properties) {
|
||||
this.admins = admins;
|
||||
this.properties = properties;
|
||||
}
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
|
||||
public HealthIndicator kafkaHealthIndicator() {
|
||||
return createHealthIndicator(this.admins);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
|
||||
public HealthIndicator kafkaHealthIndicator() {
|
||||
return createHealthIndicator(this.admins);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
|
||||
Duration responseTimeout = this.properties.getResponseTimeout();
|
||||
|
||||
return new KafkaHealthIndicator(source,
|
||||
responseTimeout == null ? 100L : responseTimeout.toMillis());
|
||||
}
|
||||
@Override
|
||||
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
|
||||
Duration responseTimeout = this.properties.getResponseTimeout();
|
||||
return new KafkaHealthIndicator(source, responseTimeout.toMillis());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
|||
* Configuration properties for {@link KafkaHealthIndicator}.
|
||||
*
|
||||
* @author Juan Rada
|
||||
* @since 2.0.0
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
|
||||
public class KafkaHealthIndicatorProperties {
|
||||
|
@ -32,7 +33,7 @@ public class KafkaHealthIndicatorProperties {
|
|||
/**
|
||||
* Time to wait for a response from the cluster description operation.
|
||||
*/
|
||||
private Duration responseTimeout = Duration.ofMillis(100);
|
||||
private Duration responseTimeout = Duration.ofMillis(1000);
|
||||
|
||||
public Duration getResponseTimeout() {
|
||||
return this.responseTimeout;
|
||||
|
|
|
@ -15,6 +15,6 @@
|
|||
*/
|
||||
|
||||
/**
|
||||
* Auto-configuration for actuator kafka support.
|
||||
* Auto-configuration for actuator Apache Kafka support.
|
||||
*/
|
||||
package org.springframework.boot.actuate.autoconfigure.kafka;
|
||||
|
|
|
@ -103,6 +103,12 @@
|
|||
"description": "Whether to enable JMS health check.",
|
||||
"defaultValue": true
|
||||
},
|
||||
{
|
||||
"name": "management.health.kafka.enabled",
|
||||
"type": "java.lang.Boolean",
|
||||
"description": "Whether to enable Kafka health check.",
|
||||
"defaultValue": true
|
||||
},
|
||||
{
|
||||
"name": "management.health.ldap.enabled",
|
||||
"type": "java.lang.Boolean",
|
||||
|
@ -145,12 +151,6 @@
|
|||
"description": "Whether to enable Neo4j health check.",
|
||||
"defaultValue": true
|
||||
},
|
||||
{
|
||||
"name": "management.health.kafka.enabled",
|
||||
"type": "java.lang.Boolean",
|
||||
"description": "Whether to enable kafka health check.",
|
||||
"defaultValue": true
|
||||
},
|
||||
{
|
||||
"name": "management.info.build.enabled",
|
||||
"type": "java.lang.Boolean",
|
||||
|
|
|
@ -11,7 +11,6 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP
|
|||
org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\
|
||||
|
@ -25,6 +24,7 @@ org.springframework.boot.actuate.autoconfigure.info.InfoEndpointAutoConfiguratio
|
|||
org.springframework.boot.actuate.autoconfigure.jdbc.DataSourceHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.jms.JmsHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.jolokia.JolokiaEndpointAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.ldap.LdapHealthIndicatorAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.liquibase.LiquibaseEndpointAutoConfiguration,\
|
||||
org.springframework.boot.actuate.autoconfigure.logging.LogFileWebEndpointAutoConfiguration,\
|
||||
|
|
|
@ -172,11 +172,6 @@
|
|||
<artifactId>spring-rabbit</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-cassandra</artifactId>
|
||||
|
@ -230,6 +225,11 @@
|
|||
<artifactId>spring-integration-core</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.security</groupId>
|
||||
<artifactId>spring-security-core</artifactId>
|
||||
|
@ -263,13 +263,13 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.common.config.ConfigResource.Type;
|
|||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Health.Builder;
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.kafka.core.KafkaAdmin;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
|
@ -43,37 +44,35 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
|
|||
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
|
||||
|
||||
private final KafkaAdmin kafkaAdmin;
|
||||
|
||||
private final DescribeClusterOptions describeOptions;
|
||||
|
||||
/**
|
||||
* Create a new {@link KafkaHealthIndicator} instance.
|
||||
*
|
||||
* @param kafkaAdmin the kafka admin
|
||||
* @param responseTimeout the describe cluster request timeout in milliseconds
|
||||
* @param requestTimeout the request timeout in milliseconds
|
||||
*/
|
||||
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) {
|
||||
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
|
||||
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
|
||||
this.kafkaAdmin = kafkaAdmin;
|
||||
this.describeOptions = new DescribeClusterOptions()
|
||||
.timeoutMs((int) responseTimeout);
|
||||
.timeoutMs((int) requestTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doHealthCheck(Builder builder) throws Exception {
|
||||
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
|
||||
DescribeClusterResult result = adminClient.describeCluster(this.describeOptions);
|
||||
DescribeClusterResult result = adminClient.describeCluster(
|
||||
this.describeOptions);
|
||||
String brokerId = result.controller().get().idString();
|
||||
int replicationFactor = getReplicationFactor(brokerId, adminClient);
|
||||
int nodes = result.nodes().get().size();
|
||||
if (nodes >= replicationFactor) {
|
||||
builder.up();
|
||||
}
|
||||
else {
|
||||
builder.down();
|
||||
}
|
||||
builder.withDetail("clusterId", result.clusterId().get());
|
||||
builder.withDetail("brokerId", brokerId);
|
||||
builder.withDetail("nodes", nodes);
|
||||
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
|
||||
builder.status(status)
|
||||
.withDetail("clusterId", result.clusterId().get())
|
||||
.withDetail("brokerId", brokerId)
|
||||
.withDetail("nodes", nodes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,5 +84,6 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
|
|||
Config brokerConfig = kafkaConfig.get(configResource);
|
||||
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,6 @@
|
|||
*/
|
||||
|
||||
/**
|
||||
* Actuator support for Kafka.
|
||||
* Actuator support for Apache Kafka.
|
||||
*/
|
||||
package org.springframework.boot.actuate.kafka;
|
||||
|
|
|
@ -20,27 +20,73 @@ import java.util.Collections;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.kafka.core.KafkaAdmin;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.util.SocketUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Test for {@link KafkaHealthIndicator}
|
||||
* Tests for {@link KafkaHealthIndicator}.
|
||||
*
|
||||
* @author Juan Rada
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
public class KafkaHealthIndicatorTests {
|
||||
|
||||
private static final Long RESPONSE_TIME = 1000L;
|
||||
|
||||
private KafkaEmbedded kafkaEmbedded;
|
||||
|
||||
private KafkaAdmin kafkaAdmin;
|
||||
|
||||
@After
|
||||
public void shutdownKafka() throws Exception {
|
||||
if (this.kafkaEmbedded != null) {
|
||||
this.kafkaEmbedded.destroy();
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void kafkaIsUp() throws Exception {
|
||||
startKafka(1);
|
||||
KafkaHealthIndicator healthIndicator =
|
||||
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
assertDetails(health.getDetails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void kafkaIsDown() {
|
||||
int freePort = SocketUtils.findAvailableTcpPort();
|
||||
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort));
|
||||
KafkaHealthIndicator healthIndicator =
|
||||
new KafkaHealthIndicator(this.kafkaAdmin, 1L);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
assertThat((String) health.getDetails().get("error")).isNotEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void notEnoughNodesForReplicationFactor() throws Exception {
|
||||
startKafka(2);
|
||||
KafkaHealthIndicator healthIndicator =
|
||||
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
assertDetails(health.getDetails());
|
||||
}
|
||||
|
||||
private void assertDetails(Map<String, Object> details) {
|
||||
assertThat(details).containsEntry("brokerId", "0");
|
||||
assertThat(details).containsKey("clusterId");
|
||||
assertThat(details).containsEntry("nodes", 1);
|
||||
}
|
||||
|
||||
private void startKafka(int replicationFactor) throws Exception {
|
||||
this.kafkaEmbedded = new KafkaEmbedded(1, true);
|
||||
this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
|
||||
|
@ -52,46 +98,4 @@ public class KafkaHealthIndicatorTests {
|
|||
this.kafkaEmbedded.getBrokersAsString()));
|
||||
}
|
||||
|
||||
private void shutdownKafka() throws Exception {
|
||||
this.kafkaEmbedded.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void kafkaIsUp() throws Exception {
|
||||
startKafka(1);
|
||||
KafkaHealthIndicator healthIndicator =
|
||||
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
assertDetails(health.getDetails());
|
||||
shutdownKafka();
|
||||
}
|
||||
|
||||
private void assertDetails(Map<String, Object> details) {
|
||||
assertThat(details).containsEntry("brokerId", "0");
|
||||
assertThat(details).containsKey("clusterId");
|
||||
assertThat(details).containsEntry("nodes", 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void notEnoughNodesForReplicationFactor() throws Exception {
|
||||
startKafka(2);
|
||||
KafkaHealthIndicator healthIndicator =
|
||||
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
assertDetails(health.getDetails());
|
||||
shutdownKafka();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void kafkaIsDown() throws Exception {
|
||||
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987"));
|
||||
KafkaHealthIndicator healthIndicator =
|
||||
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
assertThat((String) health.getDetails().get("error")).isNotEmpty();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1271,6 +1271,8 @@ content into your application. Rather, pick only the properties that you need.
|
|||
management.health.elasticsearch.response-timeout=100ms # The time to wait for a response from the cluster.
|
||||
management.health.influxdb.enabled=true # Whether to enable InfluxDB health check.
|
||||
management.health.jms.enabled=true # Whether to enable JMS health check.
|
||||
management.health.kafka.enabled=true # Whether to enable Kafka health check.
|
||||
management.health.kafka.response-timeout=1000ms # Time to wait for a response from the cluster description operation.
|
||||
management.health.ldap.enabled=true # Whether to enable LDAP health check.
|
||||
management.health.mail.enabled=true # Whether to enable Mail health check.
|
||||
management.health.mongo.enabled=true # Whether to enable MongoDB health check.
|
||||
|
|
|
@ -572,6 +572,9 @@ The following `HealthIndicators` are auto-configured by Spring Boot when appropr
|
|||
|{sc-spring-boot-actuator}/jms/JmsHealthIndicator.{sc-ext}[`JmsHealthIndicator`]
|
||||
|Checks that a JMS broker is up.
|
||||
|
||||
|{sc-spring-boot-actuator}/kafka/KafkaHealthIndicator.{sc-ext}[`KafkaHealthIndicator`]
|
||||
|Checks that a Kafka server is up.
|
||||
|
||||
|{sc-spring-boot-actuator}/mail/MailHealthIndicator.{sc-ext}[`MailHealthIndicator`]
|
||||
|Checks that a mail server is up.
|
||||
|
||||
|
|
Loading…
Reference in New Issue