From 33d8bfa99d0669e7aef54567e8f3ee9dae1c215b Mon Sep 17 00:00:00 2001 From: dreis2211 Date: Tue, 24 Sep 2019 14:46:57 +0200 Subject: [PATCH 1/2] Apply TTL invocation caching on reactor types Update `CachingOperationInvoker` so that TTL caching is applied directly to reactive types. Prior to this commit, a `Mono` would be cached, but the values that it emitted would not. See gh-18339 --- .../cache/CachingOperationInvoker.java | 12 +++++- .../cache/CachingOperationInvokerTests.java | 37 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java index 6578c103fd6..7a514f53409 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java @@ -16,9 +16,12 @@ package org.springframework.boot.actuate.endpoint.invoker.cache; +import java.time.Duration; import java.util.Map; import java.util.Objects; +import reactor.core.publisher.Mono; + import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import org.springframework.util.Assert; @@ -67,13 +70,20 @@ public class CachingOperationInvoker implements OperationInvoker { long accessTime = System.currentTimeMillis(); CachedResponse cached = this.cachedResponse; if (cached == null || cached.isStale(accessTime, this.timeToLive)) { - Object response = this.invoker.invoke(context); + Object response = handleMonoResponse(this.invoker.invoke(context)); this.cachedResponse = new CachedResponse(response, accessTime); return response; } return cached.getResponse(); } + private Object handleMonoResponse(Object response) { + if (response instanceof Mono) { + return ((Mono) response).cache(Duration.ofMillis(this.timeToLive)); + } + return response; + } + private boolean hasInput(InvocationContext context) { if (context.getSecurityContext().getPrincipal() != null) { return true; diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java index 050171e9c79..b3c6202345e 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java @@ -17,15 +17,20 @@ package org.springframework.boot.actuate.endpoint.invoker.cache; import java.security.Principal; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.junit.Rule; import org.junit.Test; +import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.SecurityContext; +import org.springframework.boot.actuate.endpoint.invoke.MissingParametersException; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; +import org.springframework.boot.test.rule.OutputCapture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -42,6 +47,9 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; */ public class CachingOperationInvokerTests { + @Rule + public OutputCapture outputCapture = new OutputCapture(); + @Test public void createInstanceWithTtlSetToZero() { assertThatIllegalArgumentException() @@ -62,6 +70,21 @@ public class CachingOperationInvokerTests { assertCacheIsUsed(parameters); } + @Test + public void cacheInTtlWithMonoResponse() { + MonoOperationInvoker target = new MonoOperationInvoker(); + InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); + CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); + Object monoResponse = invoker.invoke(context); + assertThat(monoResponse).isInstanceOf(Mono.class); + Object response = ((Mono) monoResponse).block(Duration.ofSeconds(30)); + Object cachedMonoResponse = invoker.invoke(context); + assertThat(cachedMonoResponse).isInstanceOf(Mono.class); + Object cachedResponse = ((Mono) cachedMonoResponse).block(Duration.ofSeconds(30)); + assertThat(response).isSameAs(cachedResponse); + assertThat(this.outputCapture.toString()).containsOnlyOnce("invoked"); + } + private void assertCacheIsUsed(Map parameters) { OperationInvoker target = mock(OperationInvoker.class); Object expected = new Object(); @@ -119,4 +142,18 @@ public class CachingOperationInvokerTests { verify(target, times(2)).invoke(context); } + private static class MonoOperationInvoker implements OperationInvoker { + + @Override + public Object invoke(InvocationContext context) throws MissingParametersException { + return Mono.fromCallable(this::printInvocation); + } + + private Mono printInvocation() { + System.out.println("MonoOperationInvoker invoked"); + return Mono.just("test"); + } + + } + } From 38968d2fffd7e69627d23c6643adca9d0d5ce69c Mon Sep 17 00:00:00 2001 From: Phillip Webb Date: Sat, 28 Sep 2019 21:26:47 -0700 Subject: [PATCH 2/2] Polish 'Apply TTL invocation caching on reactor types' Extract reactor specific code to an inner class to protect against ClassNotFound exceptions if reactor is not in use. Also add support for `Flux`. See gh-18339 --- .../cache/CachingOperationInvoker.java | 47 ++++++++++++---- .../cache/CachingOperationInvokerTests.java | 55 +++++++++++++------ 2 files changed, 75 insertions(+), 27 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java index 7a514f53409..aaee518b9fe 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java @@ -20,11 +20,13 @@ import java.time.Duration; import java.util.Map; import java.util.Objects; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; /** @@ -32,10 +34,14 @@ import org.springframework.util.ObjectUtils; * configurable time to live. * * @author Stephane Nicoll + * @author Christoph Dreis + * @author Phillip Webb * @since 2.0.0 */ public class CachingOperationInvoker implements OperationInvoker { + private static final boolean IS_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Mono", null); + private final OperationInvoker invoker; private final long timeToLive; @@ -70,20 +76,13 @@ public class CachingOperationInvoker implements OperationInvoker { long accessTime = System.currentTimeMillis(); CachedResponse cached = this.cachedResponse; if (cached == null || cached.isStale(accessTime, this.timeToLive)) { - Object response = handleMonoResponse(this.invoker.invoke(context)); - this.cachedResponse = new CachedResponse(response, accessTime); - return response; + Object response = this.invoker.invoke(context); + cached = createCachedResponse(response, accessTime); + this.cachedResponse = cached; } return cached.getResponse(); } - private Object handleMonoResponse(Object response) { - if (response instanceof Mono) { - return ((Mono) response).cache(Duration.ofMillis(this.timeToLive)); - } - return response; - } - private boolean hasInput(InvocationContext context) { if (context.getSecurityContext().getPrincipal() != null) { return true; @@ -95,6 +94,13 @@ public class CachingOperationInvoker implements OperationInvoker { return false; } + private CachedResponse createCachedResponse(Object response, long accessTime) { + if (IS_REACTOR_PRESENT) { + return new ReactiveCachedResponse(response, accessTime, this.timeToLive); + } + return new CachedResponse(response, accessTime); + } + /** * Apply caching configuration when appropriate to the given invoker. * @param invoker the invoker to wrap @@ -134,4 +140,25 @@ public class CachingOperationInvoker implements OperationInvoker { } + /** + * {@link CachedResponse} variant used when Reactor is present. + */ + static class ReactiveCachedResponse extends CachedResponse { + + ReactiveCachedResponse(Object response, long creationTime, long timeToLive) { + super(applyCaching(response, timeToLive), creationTime); + } + + private static Object applyCaching(Object response, long timeToLive) { + if (response instanceof Mono) { + return ((Mono) response).cache(Duration.ofMillis(timeToLive)); + } + if (response instanceof Flux) { + return ((Flux) response).cache(Duration.ofMillis(timeToLive)); + } + return response; + } + + } + } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java index b3c6202345e..0d4e9f37d1c 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java @@ -17,20 +17,19 @@ package org.springframework.boot.actuate.endpoint.invoker.cache; import java.security.Principal; -import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.junit.Rule; import org.junit.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.SecurityContext; import org.springframework.boot.actuate.endpoint.invoke.MissingParametersException; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; -import org.springframework.boot.test.rule.OutputCapture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -44,12 +43,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; * Tests for {@link CachingOperationInvoker}. * * @author Stephane Nicoll + * @author Christoph Dreis + * @author Phillip Webb */ public class CachingOperationInvokerTests { - @Rule - public OutputCapture outputCapture = new OutputCapture(); - @Test public void createInstanceWithTtlSetToZero() { assertThatIllegalArgumentException() @@ -72,17 +70,26 @@ public class CachingOperationInvokerTests { @Test public void cacheInTtlWithMonoResponse() { + MonoOperationInvoker.invocations = 0; MonoOperationInvoker target = new MonoOperationInvoker(); InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); - Object monoResponse = invoker.invoke(context); - assertThat(monoResponse).isInstanceOf(Mono.class); - Object response = ((Mono) monoResponse).block(Duration.ofSeconds(30)); - Object cachedMonoResponse = invoker.invoke(context); - assertThat(cachedMonoResponse).isInstanceOf(Mono.class); - Object cachedResponse = ((Mono) cachedMonoResponse).block(Duration.ofSeconds(30)); + Object response = ((Mono) invoker.invoke(context)).block(); + Object cachedResponse = ((Mono) invoker.invoke(context)).block(); + assertThat(MonoOperationInvoker.invocations).isEqualTo(1); + assertThat(response).isSameAs(cachedResponse); + } + + @Test + public void cacheInTtlWithFluxResponse() { + FluxOperationInvoker.invocations = 0; + FluxOperationInvoker target = new FluxOperationInvoker(); + InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); + CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); + Object response = ((Flux) invoker.invoke(context)).blockLast(); + Object cachedResponse = ((Flux) invoker.invoke(context)).blockLast(); + assertThat(FluxOperationInvoker.invocations).isEqualTo(1); assertThat(response).isSameAs(cachedResponse); - assertThat(this.outputCapture.toString()).containsOnlyOnce("invoked"); } private void assertCacheIsUsed(Map parameters) { @@ -144,14 +151,28 @@ public class CachingOperationInvokerTests { private static class MonoOperationInvoker implements OperationInvoker { + static int invocations; + @Override public Object invoke(InvocationContext context) throws MissingParametersException { - return Mono.fromCallable(this::printInvocation); + return Mono.fromCallable(() -> { + invocations++; + return Mono.just("test"); + }); } - private Mono printInvocation() { - System.out.println("MonoOperationInvoker invoked"); - return Mono.just("test"); + } + + private static class FluxOperationInvoker implements OperationInvoker { + + static int invocations; + + @Override + public Object invoke(InvocationContext context) throws MissingParametersException { + return Flux.fromIterable(() -> { + invocations++; + return Arrays.asList("spring", "boot").iterator(); + }); } }