Merge branch '6.1.x'

This commit is contained in:
Simon Baslé 2024-03-07 12:10:07 +01:00
commit 7f0ab22c47
3 changed files with 74 additions and 34 deletions

View File

@ -107,6 +107,28 @@ class CaffeineReactiveCachingTests {
}
@ParameterizedTest
@ValueSource(classes = {AsyncCacheModeConfig.class, AsyncCacheModeConfig.class})
void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);
Object key = new Object();
List<Long> l1 = service.cacheFlux(key).take(1L, true).collectList().block();
List<Long> l2 = service.cacheFlux(key).take(3L, true).collectList().block();
List<Long> l3 = service.cacheFlux(key).collectList().block();
Long first = l1.get(0);
assertThat(l1).as("l1").containsExactly(first);
assertThat(l2).as("l2").containsExactly(first, 0L, -1L);
assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L);
ctx.close();
}
@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {
@ -119,12 +141,16 @@ class CaffeineReactiveCachingTests {
@Cacheable
Mono<Long> cacheMono(Object arg) {
return Mono.just(this.counter.getAndIncrement());
// here counter not only reflects invocations of cacheMono but subscriptions to
// the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Mono.defer(() -> Mono.just(this.counter.getAndIncrement()));
}
@Cacheable
Flux<Long> cacheFlux(Object arg) {
return Flux.just(this.counter.getAndIncrement(), 0L);
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L));
}
}

View File

@ -26,12 +26,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.observability.DefaultSignalListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -90,7 +90,6 @@ import org.springframework.util.function.SupplierUtils;
* @author Sam Brannen
* @author Stephane Nicoll
* @author Sebastien Deleuze
* @author Simon Baslé
* @since 3.1
*/
public abstract class CacheAspectSupport extends AbstractCacheInvoker
@ -1037,45 +1036,34 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
/**
* Reactor stateful SignalListener for collecting a List to cache.
* Reactive Streams Subscriber for exhausting the Flux and collecting a List
* to cache.
*/
private class CachePutSignalListener extends DefaultSignalListener<Object> {
private final class CachePutListSubscriber implements Subscriber<Object> {
private final AtomicReference<CachePutRequest> request;
private final CachePutRequest request;
private final List<Object> cacheValue = new ArrayList<>();
public CachePutSignalListener(CachePutRequest request) {
this.request = new AtomicReference<>(request);
public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
}
@Override
public void doOnNext(Object o) {
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
this.cacheValue.add(o);
}
@Override
public void doOnComplete() {
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
public void onError(Throwable t) {
this.cacheValue.clear();
}
@Override
public void doOnCancel() {
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
}
@Override
public void doOnError(Throwable error) {
if (this.request.getAndSet(null) != null) {
this.cacheValue.clear();
}
public void onComplete() {
this.request.performCachePut(this.cacheValue);
}
}
@ -1159,8 +1147,10 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
.tap(() -> new CachePutSignalListener(request)));
Flux<?> source = Flux.from(adapter.toPublisher(result))
.publish().refCount(2);
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))

View File

@ -111,6 +111,30 @@ class ReactiveCachingTests {
ctx.close();
}
@ParameterizedTest
@ValueSource(classes = {EarlyCacheHitDeterminationConfig.class,
EarlyCacheHitDeterminationWithoutNullValuesConfig.class,
LateCacheHitDeterminationConfig.class,
LateCacheHitDeterminationWithValueWrapperConfig.class})
void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);
Object key = new Object();
List<Long> l1 = service.cacheFlux(key).take(1L, true).collectList().block();
List<Long> l2 = service.cacheFlux(key).take(3L, true).collectList().block();
List<Long> l3 = service.cacheFlux(key).collectList().block();
Long first = l1.get(0);
assertThat(l1).as("l1").containsExactly(first);
assertThat(l2).as("l2").containsExactly(first, 0L, -1L);
assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L);
ctx.close();
}
@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {
@ -132,7 +156,7 @@ class ReactiveCachingTests {
Flux<Long> cacheFlux(Object arg) {
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L));
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L));
}
}