diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java index 7a514f53409..aaee518b9fe 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java @@ -20,11 +20,13 @@ import java.time.Duration; import java.util.Map; import java.util.Objects; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; /** @@ -32,10 +34,14 @@ import org.springframework.util.ObjectUtils; * configurable time to live. * * @author Stephane Nicoll + * @author Christoph Dreis + * @author Phillip Webb * @since 2.0.0 */ public class CachingOperationInvoker implements OperationInvoker { + private static final boolean IS_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Mono", null); + private final OperationInvoker invoker; private final long timeToLive; @@ -70,20 +76,13 @@ public class CachingOperationInvoker implements OperationInvoker { long accessTime = System.currentTimeMillis(); CachedResponse cached = this.cachedResponse; if (cached == null || cached.isStale(accessTime, this.timeToLive)) { - Object response = handleMonoResponse(this.invoker.invoke(context)); - this.cachedResponse = new CachedResponse(response, accessTime); - return response; + Object response = this.invoker.invoke(context); + cached = createCachedResponse(response, accessTime); + this.cachedResponse = cached; } return cached.getResponse(); } - private Object handleMonoResponse(Object response) { - if (response instanceof Mono) { - return ((Mono) response).cache(Duration.ofMillis(this.timeToLive)); - } - return response; - } - private boolean hasInput(InvocationContext context) { if (context.getSecurityContext().getPrincipal() != null) { return true; @@ -95,6 +94,13 @@ public class CachingOperationInvoker implements OperationInvoker { return false; } + private CachedResponse createCachedResponse(Object response, long accessTime) { + if (IS_REACTOR_PRESENT) { + return new ReactiveCachedResponse(response, accessTime, this.timeToLive); + } + return new CachedResponse(response, accessTime); + } + /** * Apply caching configuration when appropriate to the given invoker. * @param invoker the invoker to wrap @@ -134,4 +140,25 @@ public class CachingOperationInvoker implements OperationInvoker { } + /** + * {@link CachedResponse} variant used when Reactor is present. + */ + static class ReactiveCachedResponse extends CachedResponse { + + ReactiveCachedResponse(Object response, long creationTime, long timeToLive) { + super(applyCaching(response, timeToLive), creationTime); + } + + private static Object applyCaching(Object response, long timeToLive) { + if (response instanceof Mono) { + return ((Mono) response).cache(Duration.ofMillis(timeToLive)); + } + if (response instanceof Flux) { + return ((Flux) response).cache(Duration.ofMillis(timeToLive)); + } + return response; + } + + } + } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java index b3c6202345e..0d4e9f37d1c 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java @@ -17,20 +17,19 @@ package org.springframework.boot.actuate.endpoint.invoker.cache; import java.security.Principal; -import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.junit.Rule; import org.junit.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.SecurityContext; import org.springframework.boot.actuate.endpoint.invoke.MissingParametersException; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; -import org.springframework.boot.test.rule.OutputCapture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -44,12 +43,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; * Tests for {@link CachingOperationInvoker}. * * @author Stephane Nicoll + * @author Christoph Dreis + * @author Phillip Webb */ public class CachingOperationInvokerTests { - @Rule - public OutputCapture outputCapture = new OutputCapture(); - @Test public void createInstanceWithTtlSetToZero() { assertThatIllegalArgumentException() @@ -72,17 +70,26 @@ public class CachingOperationInvokerTests { @Test public void cacheInTtlWithMonoResponse() { + MonoOperationInvoker.invocations = 0; MonoOperationInvoker target = new MonoOperationInvoker(); InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); - Object monoResponse = invoker.invoke(context); - assertThat(monoResponse).isInstanceOf(Mono.class); - Object response = ((Mono) monoResponse).block(Duration.ofSeconds(30)); - Object cachedMonoResponse = invoker.invoke(context); - assertThat(cachedMonoResponse).isInstanceOf(Mono.class); - Object cachedResponse = ((Mono) cachedMonoResponse).block(Duration.ofSeconds(30)); + Object response = ((Mono) invoker.invoke(context)).block(); + Object cachedResponse = ((Mono) invoker.invoke(context)).block(); + assertThat(MonoOperationInvoker.invocations).isEqualTo(1); + assertThat(response).isSameAs(cachedResponse); + } + + @Test + public void cacheInTtlWithFluxResponse() { + FluxOperationInvoker.invocations = 0; + FluxOperationInvoker target = new FluxOperationInvoker(); + InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); + CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); + Object response = ((Flux) invoker.invoke(context)).blockLast(); + Object cachedResponse = ((Flux) invoker.invoke(context)).blockLast(); + assertThat(FluxOperationInvoker.invocations).isEqualTo(1); assertThat(response).isSameAs(cachedResponse); - assertThat(this.outputCapture.toString()).containsOnlyOnce("invoked"); } private void assertCacheIsUsed(Map parameters) { @@ -144,14 +151,28 @@ public class CachingOperationInvokerTests { private static class MonoOperationInvoker implements OperationInvoker { + static int invocations; + @Override public Object invoke(InvocationContext context) throws MissingParametersException { - return Mono.fromCallable(this::printInvocation); + return Mono.fromCallable(() -> { + invocations++; + return Mono.just("test"); + }); } - private Mono printInvocation() { - System.out.println("MonoOperationInvoker invoked"); - return Mono.just("test"); + } + + private static class FluxOperationInvoker implements OperationInvoker { + + static int invocations; + + @Override + public Object invoke(InvocationContext context) throws MissingParametersException { + return Flux.fromIterable(() -> { + invocations++; + return Arrays.asList("spring", "boot").iterator(); + }); } }