parent
7e52607bab
commit
d25fc73dae
|
@ -20,8 +20,9 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.neo4j.driver.Driver;
|
import org.neo4j.driver.Driver;
|
||||||
import org.neo4j.driver.exceptions.SessionExpiredException;
|
import org.neo4j.driver.exceptions.SessionExpiredException;
|
||||||
import org.neo4j.driver.reactive.RxResult;
|
import org.neo4j.driver.reactive.ReactiveResult;
|
||||||
import org.neo4j.driver.reactive.RxSession;
|
import org.neo4j.driver.reactive.ReactiveSession;
|
||||||
|
import reactor.adapter.JdkFlowAdapter;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.util.retry.Retry;
|
import reactor.util.retry.Retry;
|
||||||
|
|
||||||
|
@ -64,11 +65,14 @@ public final class Neo4jReactiveHealthIndicator extends AbstractReactiveHealthIn
|
||||||
Mono<Neo4jHealthDetails> runHealthCheckQuery() {
|
Mono<Neo4jHealthDetails> runHealthCheckQuery() {
|
||||||
// We use WRITE here to make sure UP is returned for a server that supports
|
// We use WRITE here to make sure UP is returned for a server that supports
|
||||||
// all possible workloads
|
// all possible workloads
|
||||||
return Mono.using(() -> this.driver.rxSession(Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG), (session) -> {
|
return Mono.using(() -> this.driver.reactiveSession(Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG), (session) -> {
|
||||||
RxResult result = session.run(Neo4jHealthIndicator.CYPHER);
|
Mono<ReactiveResult> resultMono = JdkFlowAdapter
|
||||||
return Mono.from(result.records()).zipWhen((record) -> Mono.from(result.consume()))
|
.flowPublisherToFlux(session.run(Neo4jHealthIndicator.CYPHER)).single();
|
||||||
.map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2()));
|
return resultMono
|
||||||
}, RxSession::close);
|
.flatMapMany((result) -> JdkFlowAdapter.flowPublisherToFlux(result.records())
|
||||||
|
.zipWith(JdkFlowAdapter.flowPublisherToFlux(result.consume())))
|
||||||
|
.map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2())).single();
|
||||||
|
}, ReactiveSession::close);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,11 @@ import org.neo4j.driver.SessionConfig;
|
||||||
import org.neo4j.driver.Values;
|
import org.neo4j.driver.Values;
|
||||||
import org.neo4j.driver.exceptions.ServiceUnavailableException;
|
import org.neo4j.driver.exceptions.ServiceUnavailableException;
|
||||||
import org.neo4j.driver.exceptions.SessionExpiredException;
|
import org.neo4j.driver.exceptions.SessionExpiredException;
|
||||||
import org.neo4j.driver.reactive.RxResult;
|
import org.neo4j.driver.reactive.ReactiveResult;
|
||||||
import org.neo4j.driver.reactive.RxSession;
|
import org.neo4j.driver.reactive.ReactiveSession;
|
||||||
import org.neo4j.driver.summary.ResultSummary;
|
import org.neo4j.driver.summary.ResultSummary;
|
||||||
|
import reactor.adapter.JdkFlowAdapter;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.test.StepVerifier;
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
|
@ -46,6 +48,7 @@ import static org.mockito.Mockito.times;
|
||||||
*
|
*
|
||||||
* @author Michael J. Simons
|
* @author Michael J. Simons
|
||||||
* @author Stephane Nicoll
|
* @author Stephane Nicoll
|
||||||
|
* @author Brian Clozel
|
||||||
*/
|
*/
|
||||||
class Neo4jReactiveHealthIndicatorTests {
|
class Neo4jReactiveHealthIndicatorTests {
|
||||||
|
|
||||||
|
@ -64,17 +67,18 @@ class Neo4jReactiveHealthIndicatorTests {
|
||||||
@Test
|
@Test
|
||||||
void neo4jIsUpWithOneSessionExpiredException() {
|
void neo4jIsUpWithOneSessionExpiredException() {
|
||||||
ResultSummary resultSummary = ResultSummaryMock.createResultSummary("My Home", "");
|
ResultSummary resultSummary = ResultSummaryMock.createResultSummary("My Home", "");
|
||||||
RxSession session = mock(RxSession.class);
|
ReactiveSession session = mock(ReactiveSession.class);
|
||||||
RxResult statementResult = mockStatementResult(resultSummary, "4711", "some edition");
|
ReactiveResult statementResult = mockStatementResult(resultSummary, "4711", "some edition");
|
||||||
AtomicInteger count = new AtomicInteger();
|
AtomicInteger count = new AtomicInteger();
|
||||||
given(session.run(anyString())).will((invocation) -> {
|
given(session.run(anyString())).will((invocation) -> {
|
||||||
if (count.compareAndSet(0, 1)) {
|
if (count.compareAndSet(0, 1)) {
|
||||||
throw new SessionExpiredException("Session expired");
|
return JdkFlowAdapter
|
||||||
|
.publisherToFlowPublisher(Flux.error(new SessionExpiredException("Session expired")));
|
||||||
}
|
}
|
||||||
return statementResult;
|
return JdkFlowAdapter.publisherToFlowPublisher(Flux.just(statementResult));
|
||||||
});
|
});
|
||||||
Driver driver = mock(Driver.class);
|
Driver driver = mock(Driver.class);
|
||||||
given(driver.rxSession(any(SessionConfig.class))).willReturn(session);
|
given(driver.reactiveSession(any(SessionConfig.class))).willReturn(session);
|
||||||
Neo4jReactiveHealthIndicator healthIndicator = new Neo4jReactiveHealthIndicator(driver);
|
Neo4jReactiveHealthIndicator healthIndicator = new Neo4jReactiveHealthIndicator(driver);
|
||||||
healthIndicator.health().as(StepVerifier::create).consumeNextWith((health) -> {
|
healthIndicator.health().as(StepVerifier::create).consumeNextWith((health) -> {
|
||||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||||
|
@ -87,7 +91,7 @@ class Neo4jReactiveHealthIndicatorTests {
|
||||||
@Test
|
@Test
|
||||||
void neo4jIsDown() {
|
void neo4jIsDown() {
|
||||||
Driver driver = mock(Driver.class);
|
Driver driver = mock(Driver.class);
|
||||||
given(driver.rxSession(any(SessionConfig.class))).willThrow(ServiceUnavailableException.class);
|
given(driver.reactiveSession(any(SessionConfig.class))).willThrow(ServiceUnavailableException.class);
|
||||||
Neo4jReactiveHealthIndicator healthIndicator = new Neo4jReactiveHealthIndicator(driver);
|
Neo4jReactiveHealthIndicator healthIndicator = new Neo4jReactiveHealthIndicator(driver);
|
||||||
healthIndicator.health().as(StepVerifier::create).consumeNextWith((health) -> {
|
healthIndicator.health().as(StepVerifier::create).consumeNextWith((health) -> {
|
||||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||||
|
@ -95,22 +99,22 @@ class Neo4jReactiveHealthIndicatorTests {
|
||||||
}).verifyComplete();
|
}).verifyComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
private RxResult mockStatementResult(ResultSummary resultSummary, String version, String edition) {
|
private ReactiveResult mockStatementResult(ResultSummary resultSummary, String version, String edition) {
|
||||||
Record record = mock(Record.class);
|
Record record = mock(Record.class);
|
||||||
given(record.get("edition")).willReturn(Values.value(edition));
|
given(record.get("edition")).willReturn(Values.value(edition));
|
||||||
given(record.get("version")).willReturn(Values.value(version));
|
given(record.get("version")).willReturn(Values.value(version));
|
||||||
RxResult statementResult = mock(RxResult.class);
|
ReactiveResult statementResult = mock(ReactiveResult.class);
|
||||||
given(statementResult.records()).willReturn(Mono.just(record));
|
given(statementResult.records()).willReturn(JdkFlowAdapter.publisherToFlowPublisher(Mono.just(record)));
|
||||||
given(statementResult.consume()).willReturn(Mono.just(resultSummary));
|
given(statementResult.consume()).willReturn(JdkFlowAdapter.publisherToFlowPublisher(Mono.just(resultSummary)));
|
||||||
return statementResult;
|
return statementResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Driver mockDriver(ResultSummary resultSummary, String version, String edition) {
|
private Driver mockDriver(ResultSummary resultSummary, String version, String edition) {
|
||||||
RxResult statementResult = mockStatementResult(resultSummary, version, edition);
|
ReactiveResult statementResult = mockStatementResult(resultSummary, version, edition);
|
||||||
RxSession session = mock(RxSession.class);
|
ReactiveSession session = mock(ReactiveSession.class);
|
||||||
given(session.run(anyString())).willReturn(statementResult);
|
given(session.run(anyString())).willReturn(JdkFlowAdapter.publisherToFlowPublisher(Mono.just(statementResult)));
|
||||||
Driver driver = mock(Driver.class);
|
Driver driver = mock(Driver.class);
|
||||||
given(driver.rxSession(any(SessionConfig.class))).willReturn(session);
|
given(driver.reactiveSession(any(SessionConfig.class))).willReturn(session);
|
||||||
return driver;
|
return driver;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1086,7 +1086,7 @@ bom {
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
library("Neo4j Java Driver", "4.4.9") {
|
library("Neo4j Java Driver", "5.0.0") {
|
||||||
group("org.neo4j.driver") {
|
group("org.neo4j.driver") {
|
||||||
modules = [
|
modules = [
|
||||||
"neo4j-java-driver"
|
"neo4j-java-driver"
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class MyBean {
|
||||||
// @fold:on // ...
|
// @fold:on // ...
|
||||||
public String someMethod(String message) {
|
public String someMethod(String message) {
|
||||||
try (Session session = this.driver.session()) {
|
try (Session session = this.driver.session()) {
|
||||||
return session.writeTransaction((transaction) -> transaction
|
return session.executeWrite((transaction) -> transaction
|
||||||
.run("CREATE (a:Greeting) SET a.message = $message RETURN a.message + ', from node ' + id(a)",
|
.run("CREATE (a:Greeting) SET a.message = $message RETURN a.message + ', from node ' + id(a)",
|
||||||
Values.parameters("message", message))
|
Values.parameters("message", message))
|
||||||
.single().get(0).asString());
|
.single().get(0).asString());
|
||||||
|
|
|
@ -16,9 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.boot.docs.data.nosql.neo4j.connecting
|
package org.springframework.boot.docs.data.nosql.neo4j.connecting
|
||||||
|
|
||||||
import org.neo4j.driver.Driver
|
import org.neo4j.driver.*
|
||||||
import org.neo4j.driver.Transaction
|
|
||||||
import org.neo4j.driver.Values
|
|
||||||
import org.springframework.stereotype.Component
|
import org.springframework.stereotype.Component
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
|
@ -26,11 +24,13 @@ class MyBean(private val driver: Driver) {
|
||||||
// @fold:on // ...
|
// @fold:on // ...
|
||||||
fun someMethod(message: String?): String {
|
fun someMethod(message: String?): String {
|
||||||
driver.session().use { session ->
|
driver.session().use { session ->
|
||||||
return@someMethod session.writeTransaction { transaction: Transaction ->
|
return@someMethod session.executeWrite { transaction: TransactionContext ->
|
||||||
transaction.run(
|
transaction
|
||||||
"CREATE (a:Greeting) SET a.message = \$message RETURN a.message + ', from node ' + id(a)",
|
.run(
|
||||||
Values.parameters("message", message)
|
"CREATE (a:Greeting) SET a.message = \$message RETURN a.message + ', from node ' + id(a)",
|
||||||
).single()[0].asString()
|
Values.parameters("message", message)
|
||||||
|
)
|
||||||
|
.single()[0].asString()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue