From de857372a5381acbc9b3a525711fb8401348db28 Mon Sep 17 00:00:00 2001 From: ayudovin Date: Wed, 8 May 2019 20:47:26 +0300 Subject: [PATCH 1/2] Drop blocking RedisReactiveHealthIndicator calls Update `RedisReactiveHealthIndicator` so that `getReactiveConnection` is not called directly since it blocks. Fixed gh-16756 --- .../redis/RedisReactiveHealthIndicator.java | 14 ++++++++++---- .../redis/RedisReactiveHealthIndicatorTests.java | 7 +++++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java index 7bbf580d41d..757bdb153ff 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java @@ -19,6 +19,7 @@ package org.springframework.boot.actuate.redis; import java.util.Properties; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.Health; @@ -31,6 +32,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; * * @author Stephane Nicoll * @author Mark Paluch + * @author Artsiom Yudovin * @since 2.0.0 */ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicator { @@ -44,10 +46,14 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato @Override protected Mono doHealthCheck(Health.Builder builder) { - ReactiveRedisConnection connection = this.connectionFactory - .getReactiveConnection(); - return connection.serverCommands().info().map((info) -> up(builder, info)) - .doFinally((signal) -> connection.close()); + Mono connection = Mono + .fromSupplier(this.connectionFactory::getReactiveConnection) + .subscribeOn(Schedulers.parallel()); + + return connection + .flatMap((c) -> c.serverCommands().info().map((info) -> up(builder, info)) + .onErrorResume((e) -> Mono.just(builder.down(e).build())) + .flatMap((signal) -> c.closeLater().thenReturn(signal))); } private Health up(Health.Builder builder, Properties info) { diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicatorTests.java index a69c162e58a..3571c539371 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicatorTests.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify; * @author Stephane Nicoll * @author Mark Paluch * @author Nikolay Rybak + * @author Artsiom Yudovin */ public class RedisReactiveHealthIndicatorTests { @@ -49,6 +50,7 @@ public class RedisReactiveHealthIndicatorTests { Properties info = new Properties(); info.put("redis_version", "2.8.9"); ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class); + given(redisConnection.closeLater()).willReturn(Mono.empty()); ReactiveServerCommands commands = mock(ReactiveServerCommands.class); given(commands.info()).willReturn(Mono.just(info)); RedisReactiveHealthIndicator healthIndicator = createHealthIndicator( @@ -59,7 +61,7 @@ public class RedisReactiveHealthIndicatorTests { assertThat(h.getDetails()).containsOnlyKeys("version"); assertThat(h.getDetails().get("version")).isEqualTo("2.8.9"); }).verifyComplete(); - verify(redisConnection).close(); + verify(redisConnection).closeLater(); } @Test @@ -68,13 +70,14 @@ public class RedisReactiveHealthIndicatorTests { given(commands.info()).willReturn( Mono.error(new RedisConnectionFailureException("Connection failed"))); ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class); + given(redisConnection.closeLater()).willReturn(Mono.empty()); RedisReactiveHealthIndicator healthIndicator = createHealthIndicator( redisConnection, commands); Mono health = healthIndicator.health(); StepVerifier.create(health) .consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN)) .verifyComplete(); - verify(redisConnection).close(); + verify(redisConnection).closeLater(); } @Test From f790556f960e1e8ac5c4d7d3b6c4ee62227b8311 Mon Sep 17 00:00:00 2001 From: Phillip Webb Date: Tue, 14 May 2019 21:23:31 -0700 Subject: [PATCH 2/2] Polish 'Drop blocking RedisReactiveHealthIndicator calls' See gh-16756 --- .../redis/RedisReactiveHealthIndicator.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java index 757bdb153ff..f7b5a93a2e7 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisReactiveHealthIndicator.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,14 +46,20 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato @Override protected Mono doHealthCheck(Health.Builder builder) { - Mono connection = Mono - .fromSupplier(this.connectionFactory::getReactiveConnection) - .subscribeOn(Schedulers.parallel()); + return getConnection() + .flatMap((connection) -> doHealthCheck(builder, connection)); + } - return connection - .flatMap((c) -> c.serverCommands().info().map((info) -> up(builder, info)) - .onErrorResume((e) -> Mono.just(builder.down(e).build())) - .flatMap((signal) -> c.closeLater().thenReturn(signal))); + private Mono doHealthCheck(Health.Builder builder, + ReactiveRedisConnection connection) { + return connection.serverCommands().info().map((info) -> up(builder, info)) + .onErrorResume((ex) -> Mono.just(down(builder, ex))) + .flatMap((health) -> connection.closeLater().thenReturn(health)); + } + + private Mono getConnection() { + return Mono.fromSupplier(this.connectionFactory::getReactiveConnection) + .subscribeOn(Schedulers.parallel()); } private Health up(Health.Builder builder, Properties info) { @@ -61,4 +67,8 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato info.getProperty(RedisHealthIndicator.REDIS_VERSION)).build(); } + private Health down(Health.Builder builder, Throwable cause) { + return builder.down(cause).build(); + } + }