diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java index 6578c103fd6..aaee518b9fe 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java @@ -16,12 +16,17 @@ package org.springframework.boot.actuate.endpoint.invoker.cache; +import java.time.Duration; import java.util.Map; import java.util.Objects; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; /** @@ -29,10 +34,14 @@ import org.springframework.util.ObjectUtils; * configurable time to live. * * @author Stephane Nicoll + * @author Christoph Dreis + * @author Phillip Webb * @since 2.0.0 */ public class CachingOperationInvoker implements OperationInvoker { + private static final boolean IS_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Mono", null); + private final OperationInvoker invoker; private final long timeToLive; @@ -68,8 +77,8 @@ public class CachingOperationInvoker implements OperationInvoker { CachedResponse cached = this.cachedResponse; if (cached == null || cached.isStale(accessTime, this.timeToLive)) { Object response = this.invoker.invoke(context); - this.cachedResponse = new CachedResponse(response, accessTime); - return response; + cached = createCachedResponse(response, accessTime); + this.cachedResponse = cached; } return cached.getResponse(); } @@ -85,6 +94,13 @@ public class CachingOperationInvoker implements OperationInvoker { return false; } + private CachedResponse createCachedResponse(Object response, long accessTime) { + if (IS_REACTOR_PRESENT) { + return new ReactiveCachedResponse(response, accessTime, this.timeToLive); + } + return new CachedResponse(response, accessTime); + } + /** * Apply caching configuration when appropriate to the given invoker. * @param invoker the invoker to wrap @@ -124,4 +140,25 @@ public class CachingOperationInvoker implements OperationInvoker { } + /** + * {@link CachedResponse} variant used when Reactor is present. + */ + static class ReactiveCachedResponse extends CachedResponse { + + ReactiveCachedResponse(Object response, long creationTime, long timeToLive) { + super(applyCaching(response, timeToLive), creationTime); + } + + private static Object applyCaching(Object response, long timeToLive) { + if (response instanceof Mono) { + return ((Mono) response).cache(Duration.ofMillis(timeToLive)); + } + if (response instanceof Flux) { + return ((Flux) response).cache(Duration.ofMillis(timeToLive)); + } + return response; + } + + } + } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java index 050171e9c79..0d4e9f37d1c 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java @@ -17,14 +17,18 @@ package org.springframework.boot.actuate.endpoint.invoker.cache; import java.security.Principal; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.SecurityContext; +import org.springframework.boot.actuate.endpoint.invoke.MissingParametersException; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import static org.assertj.core.api.Assertions.assertThat; @@ -39,6 +43,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; * Tests for {@link CachingOperationInvoker}. * * @author Stephane Nicoll + * @author Christoph Dreis + * @author Phillip Webb */ public class CachingOperationInvokerTests { @@ -62,6 +68,30 @@ public class CachingOperationInvokerTests { assertCacheIsUsed(parameters); } + @Test + public void cacheInTtlWithMonoResponse() { + MonoOperationInvoker.invocations = 0; + MonoOperationInvoker target = new MonoOperationInvoker(); + InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); + CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); + Object response = ((Mono) invoker.invoke(context)).block(); + Object cachedResponse = ((Mono) invoker.invoke(context)).block(); + assertThat(MonoOperationInvoker.invocations).isEqualTo(1); + assertThat(response).isSameAs(cachedResponse); + } + + @Test + public void cacheInTtlWithFluxResponse() { + FluxOperationInvoker.invocations = 0; + FluxOperationInvoker target = new FluxOperationInvoker(); + InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); + CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); + Object response = ((Flux) invoker.invoke(context)).blockLast(); + Object cachedResponse = ((Flux) invoker.invoke(context)).blockLast(); + assertThat(FluxOperationInvoker.invocations).isEqualTo(1); + assertThat(response).isSameAs(cachedResponse); + } + private void assertCacheIsUsed(Map parameters) { OperationInvoker target = mock(OperationInvoker.class); Object expected = new Object(); @@ -119,4 +149,32 @@ public class CachingOperationInvokerTests { verify(target, times(2)).invoke(context); } + private static class MonoOperationInvoker implements OperationInvoker { + + static int invocations; + + @Override + public Object invoke(InvocationContext context) throws MissingParametersException { + return Mono.fromCallable(() -> { + invocations++; + return Mono.just("test"); + }); + } + + } + + private static class FluxOperationInvoker implements OperationInvoker { + + static int invocations; + + @Override + public Object invoke(InvocationContext context) throws MissingParametersException { + return Flux.fromIterable(() -> { + invocations++; + return Arrays.asList("spring", "boot").iterator(); + }); + } + + } + }