diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicator.java index bbb1468c0e7..ed42a0bbda6 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicator.java @@ -16,14 +16,17 @@ package org.springframework.boot.actuate.cassandra; -import com.datastax.oss.driver.api.core.ConsistencyLevel; +import java.util.Collection; +import java.util.Optional; + import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.Row; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.boot.actuate.health.Status; import org.springframework.util.Assert; /** @@ -31,13 +34,11 @@ import org.springframework.util.Assert; * Cassandra data stores. * * @author Alexandre Dutra + * @author Tomasz Lelek * @since 2.4.0 */ public class CassandraDriverHealthIndicator extends AbstractHealthIndicator { - private static final SimpleStatement SELECT = SimpleStatement - .newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); - private final CqlSession session; /** @@ -52,11 +53,10 @@ public class CassandraDriverHealthIndicator extends AbstractHealthIndicator { @Override protected void doHealthCheck(Health.Builder builder) throws Exception { - Row row = this.session.execute(SELECT).one(); - builder.up(); - if (row != null && !row.isNull(0)) { - builder.withDetail("version", row.getString(0)); - } + Collection nodes = this.session.getMetadata().getNodes().values(); + Optional nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny(); + builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN); + nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version)); } } diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicator.java index 40483ca04ef..893faf58556 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicator.java @@ -15,14 +15,18 @@ */ package org.springframework.boot.actuate.cassandra; -import com.datastax.oss.driver.api.core.ConsistencyLevel; +import java.util.Collection; +import java.util.Optional; + import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.ReactiveHealthIndicator; +import org.springframework.boot.actuate.health.Status; import org.springframework.util.Assert; /** @@ -30,13 +34,11 @@ import org.springframework.util.Assert; * for Cassandra data stores. * * @author Alexandre Dutra + * @author Tomasz Lelek * @since 2.4.0 */ public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator { - private static final SimpleStatement SELECT = SimpleStatement - .newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); - private final CqlSession session; /** @@ -51,8 +53,13 @@ public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHeal @Override protected Mono doHealthCheck(Health.Builder builder) { - return Mono.from(this.session.executeReactive(SELECT)) - .map((row) -> builder.up().withDetail("version", row.getString(0)).build()); + return Mono.fromSupplier(() -> { + Collection nodes = this.session.getMetadata().getNodes().values(); + Optional nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny(); + builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN); + nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version)); + return builder.build(); + }); } } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicatorTests.java index 875edd64c34..f9d8827c8b8 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverHealthIndicatorTests.java @@ -16,11 +16,19 @@ package org.springframework.boot.actuate.cassandra; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverTimeoutException; -import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.Row; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; import org.junit.jupiter.api.Test; import org.springframework.boot.actuate.health.Health; @@ -28,7 +36,6 @@ import org.springframework.boot.actuate.health.Status; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; @@ -36,7 +43,7 @@ import static org.mockito.Mockito.mock; * Tests for {@link CassandraDriverHealthIndicator}. * * @author Alexandre Dutra - * @since 2.4.0 + * @author Stephane Nicoll */ class CassandraDriverHealthIndicatorTests { @@ -46,24 +53,89 @@ class CassandraDriverHealthIndicatorTests { } @Test - void healthWithCassandraUp() { - CqlSession session = mock(CqlSession.class); - ResultSet resultSet = mock(ResultSet.class); - Row row = mock(Row.class); - given(session.execute(any(SimpleStatement.class))).willReturn(resultSet); - given(resultSet.one()).willReturn(row); - given(row.isNull(0)).willReturn(false); - given(row.getString(0)).willReturn("1.0.0"); + void healthWithOneHealthyNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP); CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.UP); - assertThat(health.getDetails().get("version")).isEqualTo("1.0.0"); } @Test - void healthWithCassandraDown() { + void healthWithOneUnhealthyNodeShouldReturnDown() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.DOWN); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + } + + @Test + void healthWithOneUnknownNodeShouldReturnDown() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UNKNOWN); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + } + + @Test + void healthWithOneForcedDownNodeShouldReturnDown() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.FORCED_DOWN); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + } + + @Test + void healthWithOneHealthyNodeAndOneUnhealthyNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.DOWN); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + } + + @Test + void healthWithOneHealthyNodeAndOneUnknownNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.UNKNOWN); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + } + + @Test + void healthWithOneHealthyNodeAndOneForcedDownNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.FORCED_DOWN); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + } + + @Test + void healthWithNodeVersionShouldAddVersionDetail() { CqlSession session = mock(CqlSession.class); - given(session.execute(any(SimpleStatement.class))).willThrow(new DriverTimeoutException("Test Exception")); + Metadata metadata = mock(Metadata.class); + given(session.getMetadata()).willReturn(metadata); + Node node = mock(Node.class); + given(node.getState()).willReturn(NodeState.UP); + given(node.getCassandraVersion()).willReturn(Version.V4_0_0); + given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(Collections.singletonList(node))); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + assertThat(health.getDetails().get("version")).isEqualTo(Version.V4_0_0); + } + + @Test + void healthWithoutNodeVersionShouldNotAddVersionDetail() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP); + CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + assertThat(health.getDetails().get("version")).isNull(); + } + + @Test + void healthWithcassandraDownShouldReturnDown() { + CqlSession session = mock(CqlSession.class); + given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception")); CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.DOWN); @@ -71,4 +143,24 @@ class CassandraDriverHealthIndicatorTests { .isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception"); } + private CqlSession mockCqlSessionWithNodeState(NodeState... nodeStates) { + CqlSession session = mock(CqlSession.class); + Metadata metadata = mock(Metadata.class); + List nodes = new ArrayList<>(); + for (NodeState nodeState : nodeStates) { + Node node = mock(Node.class); + given(node.getState()).willReturn(nodeState); + nodes.add(node); + } + given(session.getMetadata()).willReturn(metadata); + given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(nodes)); + return session; + } + + private Map createNodesWithRandomUUID(List nodes) { + Map indexedNodes = new HashMap<>(); + nodes.forEach((node) -> indexedNodes.put(UUID.randomUUID(), node)); + return indexedNodes; + } + } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicatorTests.java index 50cee9f37d0..c35deec14c5 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/cassandra/CassandraDriverReactiveHealthIndicatorTests.java @@ -15,15 +15,20 @@ */ package org.springframework.boot.actuate.cassandra; -import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet; -import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverTimeoutException; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; import org.junit.jupiter.api.Test; -import org.mockito.stubbing.Answer; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -32,16 +37,14 @@ import org.springframework.boot.actuate.health.Status; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; -import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; /** * Tests for {@link CassandraDriverReactiveHealthIndicator}. * * @author Alexandre Dutra - * @since 2.4.0 + * @author Stephane Nicoll */ class CassandraDriverReactiveHealthIndicatorTests { @@ -51,28 +54,101 @@ class CassandraDriverReactiveHealthIndicatorTests { } @Test - void testCassandraIsUp() { + void healthWithOneHealthyNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP)) + .verifyComplete(); + } + + @Test + void healthWithOneUnhealthyNodeShouldReturnDown() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.DOWN); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN)) + .verifyComplete(); + } + + @Test + void healthWithOneUnknownNodeShouldReturnDown() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UNKNOWN); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN)) + .verifyComplete(); + } + + @Test + void healthWithOneForcedDownNodeShouldReturnDown() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.FORCED_DOWN); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN)) + .verifyComplete(); + } + + @Test + void healthWithOneHealthyNodeAndOneUnhealthyNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.DOWN); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP)) + .verifyComplete(); + } + + @Test + void healthWithOneHealthyNodeAndOneUnknownNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.UNKNOWN); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP)) + .verifyComplete(); + } + + @Test + void healthWithOneHealthyNodeAndOneForcedDownNodeShouldReturnUp() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.FORCED_DOWN); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP)) + .verifyComplete(); + } + + @Test + void healthWithNodeVersionShouldAddVersionDetail() { CqlSession session = mock(CqlSession.class); - ReactiveResultSet results = mock(ReactiveResultSet.class); - ReactiveRow row = mock(ReactiveRow.class); - given(session.executeReactive(any(SimpleStatement.class))).willReturn(results); - willAnswer(mockReactiveResultSetBehavior(row)).given(results).subscribe(any()); - given(row.getString(0)).willReturn("6.0.0"); - CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator( - session); - Mono health = cassandraReactiveHealthIndicator.health(); + Metadata metadata = mock(Metadata.class); + given(session.getMetadata()).willReturn(metadata); + Node node = mock(Node.class); + given(node.getState()).willReturn(NodeState.UP); + given(node.getCassandraVersion()).willReturn(Version.V4_0_0); + given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(Collections.singletonList(node))); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); StepVerifier.create(health).consumeNextWith((h) -> { assertThat(h.getStatus()).isEqualTo(Status.UP); assertThat(h.getDetails()).containsOnlyKeys("version"); - assertThat(h.getDetails().get("version")).isEqualTo("6.0.0"); + assertThat(h.getDetails().get("version")).isEqualTo(Version.V4_0_0); }).verifyComplete(); } @Test - void testCassandraIsDown() { + void healthWithoutNodeVersionShouldNotAddVersionDetail() { + CqlSession session = mockCqlSessionWithNodeState(NodeState.UP); + CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session); + Mono health = healthIndicator.health(); + StepVerifier.create(health).consumeNextWith((h) -> { + assertThat(h.getStatus()).isEqualTo(Status.UP); + assertThat(h.getDetails().get("version")).isNull(); + }).verifyComplete(); + } + + @Test + void healthWithCassandraDownShouldReturnDown() { CqlSession session = mock(CqlSession.class); - given(session.executeReactive(any(SimpleStatement.class))) - .willThrow(new DriverTimeoutException("Test Exception")); + given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception")); CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator( session); Mono health = cassandraReactiveHealthIndicator.health(); @@ -84,23 +160,24 @@ class CassandraDriverReactiveHealthIndicatorTests { }).verifyComplete(); } - private Answer mockReactiveResultSetBehavior(ReactiveRow row) { - return (invocation) -> { - Subscriber subscriber = invocation.getArgument(0); - Subscription s = new Subscription() { - @Override - public void request(long n) { - subscriber.onNext(row); - subscriber.onComplete(); - } + private CqlSession mockCqlSessionWithNodeState(NodeState... nodeStates) { + CqlSession session = mock(CqlSession.class); + Metadata metadata = mock(Metadata.class); + List nodes = new ArrayList<>(); + for (NodeState nodeState : nodeStates) { + Node node = mock(Node.class); + given(node.getState()).willReturn(nodeState); + nodes.add(node); + } + given(session.getMetadata()).willReturn(metadata); + given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(nodes)); + return session; + } - @Override - public void cancel() { - } - }; - subscriber.onSubscribe(s); - return null; - }; + private Map createNodesWithRandomUUID(List nodes) { + Map indexedNodes = new HashMap<>(); + nodes.forEach((node) -> indexedNodes.put(UUID.randomUUID(), node)); + return indexedNodes; } }