Merge pull request #23041 from tomekl007
* pr/23041: Polish "Improve Cassandra health indicator with more robust mechanism" Improve Cassandra health indicator with more robust mechanism Closes gh-23041
This commit is contained in:
commit
48c2f4d9cc
|
|
@ -16,14 +16,17 @@
|
|||
|
||||
package org.springframework.boot.actuate.cassandra;
|
||||
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import com.datastax.oss.driver.api.core.metadata.NodeState;
|
||||
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
|
|
@ -31,13 +34,11 @@ import org.springframework.util.Assert;
|
|||
* Cassandra data stores.
|
||||
*
|
||||
* @author Alexandre Dutra
|
||||
* @author Tomasz Lelek
|
||||
* @since 2.4.0
|
||||
*/
|
||||
public class CassandraDriverHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
private static final SimpleStatement SELECT = SimpleStatement
|
||||
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
|
||||
|
||||
private final CqlSession session;
|
||||
|
||||
/**
|
||||
|
|
@ -52,11 +53,10 @@ public class CassandraDriverHealthIndicator extends AbstractHealthIndicator {
|
|||
|
||||
@Override
|
||||
protected void doHealthCheck(Health.Builder builder) throws Exception {
|
||||
Row row = this.session.execute(SELECT).one();
|
||||
builder.up();
|
||||
if (row != null && !row.isNull(0)) {
|
||||
builder.withDetail("version", row.getString(0));
|
||||
}
|
||||
Collection<Node> nodes = this.session.getMetadata().getNodes().values();
|
||||
Optional<Node> nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny();
|
||||
builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN);
|
||||
nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,14 +15,18 @@
|
|||
*/
|
||||
package org.springframework.boot.actuate.cassandra;
|
||||
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import com.datastax.oss.driver.api.core.metadata.NodeState;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
|
|
@ -30,13 +34,11 @@ import org.springframework.util.Assert;
|
|||
* for Cassandra data stores.
|
||||
*
|
||||
* @author Alexandre Dutra
|
||||
* @author Tomasz Lelek
|
||||
* @since 2.4.0
|
||||
*/
|
||||
public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
|
||||
|
||||
private static final SimpleStatement SELECT = SimpleStatement
|
||||
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
|
||||
|
||||
private final CqlSession session;
|
||||
|
||||
/**
|
||||
|
|
@ -51,8 +53,13 @@ public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHeal
|
|||
|
||||
@Override
|
||||
protected Mono<Health> doHealthCheck(Health.Builder builder) {
|
||||
return Mono.from(this.session.executeReactive(SELECT))
|
||||
.map((row) -> builder.up().withDetail("version", row.getString(0)).build());
|
||||
return Mono.fromSupplier(() -> {
|
||||
Collection<Node> nodes = this.session.getMetadata().getNodes().values();
|
||||
Optional<Node> nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny();
|
||||
builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN);
|
||||
nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version));
|
||||
return builder.build();
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,11 +16,19 @@
|
|||
|
||||
package org.springframework.boot.actuate.cassandra;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.DriverTimeoutException;
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.Version;
|
||||
import com.datastax.oss.driver.api.core.metadata.Metadata;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import com.datastax.oss.driver.api.core.metadata.NodeState;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
|
|
@ -28,7 +36,6 @@ import org.springframework.boot.actuate.health.Status;
|
|||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
|
|
@ -36,7 +43,7 @@ import static org.mockito.Mockito.mock;
|
|||
* Tests for {@link CassandraDriverHealthIndicator}.
|
||||
*
|
||||
* @author Alexandre Dutra
|
||||
* @since 2.4.0
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
class CassandraDriverHealthIndicatorTests {
|
||||
|
||||
|
|
@ -46,24 +53,89 @@ class CassandraDriverHealthIndicatorTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
void healthWithCassandraUp() {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
ResultSet resultSet = mock(ResultSet.class);
|
||||
Row row = mock(Row.class);
|
||||
given(session.execute(any(SimpleStatement.class))).willReturn(resultSet);
|
||||
given(resultSet.one()).willReturn(row);
|
||||
given(row.isNull(0)).willReturn(false);
|
||||
given(row.getString(0)).willReturn("1.0.0");
|
||||
void healthWithOneHealthyNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
assertThat(health.getDetails().get("version")).isEqualTo("1.0.0");
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithCassandraDown() {
|
||||
void healthWithOneUnhealthyNodeShouldReturnDown() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.DOWN);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneUnknownNodeShouldReturnDown() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UNKNOWN);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneForcedDownNodeShouldReturnDown() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.FORCED_DOWN);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneHealthyNodeAndOneUnhealthyNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.DOWN);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneHealthyNodeAndOneUnknownNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.UNKNOWN);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneHealthyNodeAndOneForcedDownNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.FORCED_DOWN);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithNodeVersionShouldAddVersionDetail() {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
given(session.execute(any(SimpleStatement.class))).willThrow(new DriverTimeoutException("Test Exception"));
|
||||
Metadata metadata = mock(Metadata.class);
|
||||
given(session.getMetadata()).willReturn(metadata);
|
||||
Node node = mock(Node.class);
|
||||
given(node.getState()).willReturn(NodeState.UP);
|
||||
given(node.getCassandraVersion()).willReturn(Version.V4_0_0);
|
||||
given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(Collections.singletonList(node)));
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
assertThat(health.getDetails().get("version")).isEqualTo(Version.V4_0_0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithoutNodeVersionShouldNotAddVersionDetail() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP);
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
assertThat(health.getDetails().get("version")).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithcassandraDownShouldReturnDown() {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception"));
|
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
|
||||
Health health = healthIndicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
|
|
@ -71,4 +143,24 @@ class CassandraDriverHealthIndicatorTests {
|
|||
.isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception");
|
||||
}
|
||||
|
||||
private CqlSession mockCqlSessionWithNodeState(NodeState... nodeStates) {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
Metadata metadata = mock(Metadata.class);
|
||||
List<Node> nodes = new ArrayList<>();
|
||||
for (NodeState nodeState : nodeStates) {
|
||||
Node node = mock(Node.class);
|
||||
given(node.getState()).willReturn(nodeState);
|
||||
nodes.add(node);
|
||||
}
|
||||
given(session.getMetadata()).willReturn(metadata);
|
||||
given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(nodes));
|
||||
return session;
|
||||
}
|
||||
|
||||
private Map<UUID, Node> createNodesWithRandomUUID(List<Node> nodes) {
|
||||
Map<UUID, Node> indexedNodes = new HashMap<>();
|
||||
nodes.forEach((node) -> indexedNodes.put(UUID.randomUUID(), node));
|
||||
return indexedNodes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,15 +15,20 @@
|
|||
*/
|
||||
package org.springframework.boot.actuate.cassandra;
|
||||
|
||||
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
|
||||
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.DriverTimeoutException;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.Version;
|
||||
import com.datastax.oss.driver.api.core.metadata.Metadata;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import com.datastax.oss.driver.api.core.metadata.NodeState;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
|
|
@ -32,16 +37,14 @@ import org.springframework.boot.actuate.health.Status;
|
|||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.BDDMockito.willAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Tests for {@link CassandraDriverReactiveHealthIndicator}.
|
||||
*
|
||||
* @author Alexandre Dutra
|
||||
* @since 2.4.0
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
class CassandraDriverReactiveHealthIndicatorTests {
|
||||
|
||||
|
|
@ -51,28 +54,101 @@ class CassandraDriverReactiveHealthIndicatorTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
void testCassandraIsUp() {
|
||||
void healthWithOneHealthyNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneUnhealthyNodeShouldReturnDown() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.DOWN);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneUnknownNodeShouldReturnDown() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UNKNOWN);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneForcedDownNodeShouldReturnDown() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.FORCED_DOWN);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneHealthyNodeAndOneUnhealthyNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.DOWN);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneHealthyNodeAndOneUnknownNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.UNKNOWN);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithOneHealthyNodeAndOneForcedDownNodeShouldReturnUp() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP, NodeState.FORCED_DOWN);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithNodeVersionShouldAddVersionDetail() {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
ReactiveResultSet results = mock(ReactiveResultSet.class);
|
||||
ReactiveRow row = mock(ReactiveRow.class);
|
||||
given(session.executeReactive(any(SimpleStatement.class))).willReturn(results);
|
||||
willAnswer(mockReactiveResultSetBehavior(row)).given(results).subscribe(any());
|
||||
given(row.getString(0)).willReturn("6.0.0");
|
||||
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator(
|
||||
session);
|
||||
Mono<Health> health = cassandraReactiveHealthIndicator.health();
|
||||
Metadata metadata = mock(Metadata.class);
|
||||
given(session.getMetadata()).willReturn(metadata);
|
||||
Node node = mock(Node.class);
|
||||
given(node.getState()).willReturn(NodeState.UP);
|
||||
given(node.getCassandraVersion()).willReturn(Version.V4_0_0);
|
||||
given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(Collections.singletonList(node)));
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> {
|
||||
assertThat(h.getStatus()).isEqualTo(Status.UP);
|
||||
assertThat(h.getDetails()).containsOnlyKeys("version");
|
||||
assertThat(h.getDetails().get("version")).isEqualTo("6.0.0");
|
||||
assertThat(h.getDetails().get("version")).isEqualTo(Version.V4_0_0);
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCassandraIsDown() {
|
||||
void healthWithoutNodeVersionShouldNotAddVersionDetail() {
|
||||
CqlSession session = mockCqlSessionWithNodeState(NodeState.UP);
|
||||
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
|
||||
Mono<Health> health = healthIndicator.health();
|
||||
StepVerifier.create(health).consumeNextWith((h) -> {
|
||||
assertThat(h.getStatus()).isEqualTo(Status.UP);
|
||||
assertThat(h.getDetails().get("version")).isNull();
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void healthWithCassandraDownShouldReturnDown() {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
given(session.executeReactive(any(SimpleStatement.class)))
|
||||
.willThrow(new DriverTimeoutException("Test Exception"));
|
||||
given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception"));
|
||||
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator(
|
||||
session);
|
||||
Mono<Health> health = cassandraReactiveHealthIndicator.health();
|
||||
|
|
@ -84,23 +160,24 @@ class CassandraDriverReactiveHealthIndicatorTests {
|
|||
}).verifyComplete();
|
||||
}
|
||||
|
||||
private Answer<Void> mockReactiveResultSetBehavior(ReactiveRow row) {
|
||||
return (invocation) -> {
|
||||
Subscriber<ReactiveRow> subscriber = invocation.getArgument(0);
|
||||
Subscription s = new Subscription() {
|
||||
@Override
|
||||
public void request(long n) {
|
||||
subscriber.onNext(row);
|
||||
subscriber.onComplete();
|
||||
}
|
||||
private CqlSession mockCqlSessionWithNodeState(NodeState... nodeStates) {
|
||||
CqlSession session = mock(CqlSession.class);
|
||||
Metadata metadata = mock(Metadata.class);
|
||||
List<Node> nodes = new ArrayList<>();
|
||||
for (NodeState nodeState : nodeStates) {
|
||||
Node node = mock(Node.class);
|
||||
given(node.getState()).willReturn(nodeState);
|
||||
nodes.add(node);
|
||||
}
|
||||
given(session.getMetadata()).willReturn(metadata);
|
||||
given(metadata.getNodes()).willReturn(createNodesWithRandomUUID(nodes));
|
||||
return session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
};
|
||||
subscriber.onSubscribe(s);
|
||||
return null;
|
||||
};
|
||||
private Map<UUID, Node> createNodesWithRandomUUID(List<Node> nodes) {
|
||||
Map<UUID, Node> indexedNodes = new HashMap<>();
|
||||
nodes.forEach((node) -> indexedNodes.put(UUID.randomUUID(), node));
|
||||
return indexedNodes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue