From 6d9a2eb9b855a05bc79d22e735209688cada2ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 7 Mar 2024 11:13:55 +0100 Subject: [PATCH] Improve fix of duplicate upstream subscription during reactive cache put This commit fixes an issue where a Cacheable method which returns a Flux (or multi-value publisher) will be invoked once, but the returned publisher is actually subscribed twice. The previous fix 988f3630c would cause the cached elements to depend on the first usage pattern / request pattern, which is likely to be too confusing to users. This fix reintroduces the notion of exhausting the original Flux by having a second subscriber dedicated to that, but uses `refCount(2)` to ensure that the original `Flux` returned by the cached method is still only subscribed once. Closes gh-32370 --- .../CaffeineReactiveCachingTests.java | 30 ++++++++++- .../cache/interceptor/CacheAspectSupport.java | 52 ++++++++----------- .../annotation/ReactiveCachingTests.java | 26 +++++++++- 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java index 24ac179c47d..f92e69c2f62 100644 --- a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java +++ b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java @@ -107,6 +107,28 @@ class CaffeineReactiveCachingTests { } + @ParameterizedTest + @ValueSource(classes = {AsyncCacheModeConfig.class, AsyncCacheModeConfig.class}) + void fluxCacheDoesntDependOnFirstRequest(Class configClass) { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + + List l1 = service.cacheFlux(key).take(1L, true).collectList().block(); + List l2 = service.cacheFlux(key).take(3L, true).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); + + Long first = l1.get(0); + + assertThat(l1).as("l1").containsExactly(first); + assertThat(l2).as("l2").containsExactly(first, 0L, -1L); + assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L); + + ctx.close(); + } + + @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -119,12 +141,16 @@ class CaffeineReactiveCachingTests { @Cacheable Mono cacheMono(Object arg) { - return Mono.just(this.counter.getAndIncrement()); + // here counter not only reflects invocations of cacheMono but subscriptions to + // the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370 + return Mono.defer(() -> Mono.just(this.counter.getAndIncrement())); } @Cacheable Flux cacheFlux(Object arg) { - return Flux.just(this.counter.getAndIncrement(), 0L); + // here counter not only reflects invocations of cacheFlux but subscriptions to + // the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370 + return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L)); } } diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index df2ad3f56a0..e438872e75e 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -26,12 +26,12 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import reactor.core.observability.DefaultSignalListener; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -90,7 +90,6 @@ import org.springframework.util.function.SupplierUtils; * @author Sam Brannen * @author Stephane Nicoll * @author Sebastien Deleuze - * @author Simon Baslé * @since 3.1 */ public abstract class CacheAspectSupport extends AbstractCacheInvoker @@ -1037,45 +1036,34 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker /** - * Reactor stateful SignalListener for collecting a List to cache. + * Reactive Streams Subscriber for exhausting the Flux and collecting a List + * to cache. */ - private class CachePutSignalListener extends DefaultSignalListener { + private final class CachePutListSubscriber implements Subscriber { - private final AtomicReference request; + private final CachePutRequest request; private final List cacheValue = new ArrayList<>(); - public CachePutSignalListener(CachePutRequest request) { - this.request = new AtomicReference<>(request); + public CachePutListSubscriber(CachePutRequest request) { + this.request = request; } @Override - public void doOnNext(Object o) { + public void onSubscribe(Subscription s) { + s.request(Integer.MAX_VALUE); + } + @Override + public void onNext(Object o) { this.cacheValue.add(o); } - @Override - public void doOnComplete() { - CachePutRequest r = this.request.get(); - if (this.request.compareAndSet(r, null)) { - r.performCachePut(this.cacheValue); - } + public void onError(Throwable t) { + this.cacheValue.clear(); } - @Override - public void doOnCancel() { - // Note: we don't use doFinally as we want to propagate the signal after cache put, not before - CachePutRequest r = this.request.get(); - if (this.request.compareAndSet(r, null)) { - r.performCachePut(this.cacheValue); - } - } - - @Override - public void doOnError(Throwable error) { - if (this.request.getAndSet(null) != null) { - this.cacheValue.clear(); - } + public void onComplete() { + this.request.performCachePut(this.cacheValue); } } @@ -1159,8 +1147,10 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null); if (adapter != null) { if (adapter.isMultiValue()) { - return adapter.fromPublisher(Flux.from(adapter.toPublisher(result)) - .tap(() -> new CachePutSignalListener(request))); + Flux source = Flux.from(adapter.toPublisher(result)) + .publish().refCount(2); + source.subscribe(new CachePutListSubscriber(request)); + return adapter.fromPublisher(source); } else { return adapter.fromPublisher(Mono.from(adapter.toPublisher(result)) diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index 7300ee59d46..5c04f80a519 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -111,6 +111,30 @@ class ReactiveCachingTests { ctx.close(); } + @ParameterizedTest + @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) + void fluxCacheDoesntDependOnFirstRequest(Class configClass) { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + + List l1 = service.cacheFlux(key).take(1L, true).collectList().block(); + List l2 = service.cacheFlux(key).take(3L, true).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); + + Long first = l1.get(0); + + assertThat(l1).as("l1").containsExactly(first); + assertThat(l2).as("l2").containsExactly(first, 0L, -1L); + assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L); + + ctx.close(); + } + @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -132,7 +156,7 @@ class ReactiveCachingTests { Flux cacheFlux(Object arg) { // here counter not only reflects invocations of cacheFlux but subscriptions to // the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370 - return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L)); + return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L)); } }