Support for late-determined cache misses from retrieve(key)
Closes gh-31637
This commit is contained in:
		
							parent
							
								
									8ffbecc7c3
								
							
						
					
					
						commit
						1410c466b7
					
				|  | @ -6,12 +6,12 @@ dependencies { | |||
| 	api(project(":spring-core")) | ||||
| 	optional(project(":spring-jdbc"))  // for Quartz support | ||||
| 	optional(project(":spring-tx"))  // for Quartz support | ||||
| 	optional("com.github.ben-manes.caffeine:caffeine") | ||||
| 	optional("jakarta.activation:jakarta.activation-api") | ||||
| 	optional("jakarta.mail:jakarta.mail-api") | ||||
| 	optional("javax.cache:cache-api") | ||||
| 	optional("com.github.ben-manes.caffeine:caffeine") | ||||
| 	optional("org.quartz-scheduler:quartz") | ||||
| 	optional("org.freemarker:freemarker") | ||||
| 	optional("org.quartz-scheduler:quartz") | ||||
| 	testFixturesApi("org.junit.jupiter:junit-jupiter-api") | ||||
| 	testFixturesImplementation("org.assertj:assertj-core") | ||||
| 	testFixturesImplementation("org.mockito:mockito-core") | ||||
|  | @ -20,10 +20,11 @@ dependencies { | |||
| 	testImplementation(testFixtures(project(":spring-context"))) | ||||
| 	testImplementation(testFixtures(project(":spring-core"))) | ||||
| 	testImplementation(testFixtures(project(":spring-tx"))) | ||||
| 	testImplementation("org.hsqldb:hsqldb") | ||||
| 	testImplementation("io.projectreactor:reactor-core") | ||||
| 	testImplementation("jakarta.annotation:jakarta.annotation-api") | ||||
| 	testRuntimeOnly("org.ehcache:jcache") | ||||
| 	testRuntimeOnly("org.ehcache:ehcache") | ||||
| 	testRuntimeOnly("org.glassfish:jakarta.el") | ||||
| 	testImplementation("org.hsqldb:hsqldb") | ||||
| 	testRuntimeOnly("com.sun.mail:jakarta.mail") | ||||
| 	testRuntimeOnly("org.ehcache:ehcache") | ||||
| 	testRuntimeOnly("org.ehcache:jcache") | ||||
| 	testRuntimeOnly("org.glassfish:jakarta.el") | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,141 @@ | |||
| /* | ||||
|  * Copyright 2002-2023 the original author or authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *      https://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| package org.springframework.cache.caffeine; | ||||
| 
 | ||||
| import java.util.List; | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| import java.util.concurrent.atomic.AtomicLong; | ||||
| 
 | ||||
| import org.junit.jupiter.api.Test; | ||||
| import reactor.core.publisher.Flux; | ||||
| import reactor.core.publisher.Mono; | ||||
| 
 | ||||
| import org.springframework.cache.CacheManager; | ||||
| import org.springframework.cache.annotation.CacheConfig; | ||||
| import org.springframework.cache.annotation.Cacheable; | ||||
| import org.springframework.cache.annotation.EnableCaching; | ||||
| import org.springframework.context.ApplicationContext; | ||||
| import org.springframework.context.annotation.AnnotationConfigApplicationContext; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.context.annotation.Configuration; | ||||
| 
 | ||||
| import static org.assertj.core.api.Assertions.assertThat; | ||||
| 
 | ||||
| /** | ||||
|  * Tests for annotation-based caching methods that use reactive operators. | ||||
|  * | ||||
|  * @author Juergen Hoeller | ||||
|  * @since 6.1 | ||||
|  */ | ||||
| public class CaffeineReactiveCachingTests { | ||||
| 
 | ||||
| 	@Test | ||||
| 	void withCaffeineAsyncCache() { | ||||
| 		ApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class, ReactiveCacheableService.class); | ||||
| 		ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); | ||||
| 
 | ||||
| 		Object key = new Object(); | ||||
| 
 | ||||
| 		Long r1 = service.cacheFuture(key).join(); | ||||
| 		Long r2 = service.cacheFuture(key).join(); | ||||
| 		Long r3 = service.cacheFuture(key).join(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		r1 = service.cacheMono(key).block(); | ||||
| 		r2 = service.cacheMono(key).block(); | ||||
| 		r3 = service.cacheMono(key).block(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		r1 = service.cacheFlux(key).blockFirst(); | ||||
| 		r2 = service.cacheFlux(key).blockFirst(); | ||||
| 		r3 = service.cacheFlux(key).blockFirst(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		List<Long> l1 = service.cacheFlux(key).collectList().block(); | ||||
| 		List<Long> l2 = service.cacheFlux(key).collectList().block(); | ||||
| 		List<Long> l3 = service.cacheFlux(key).collectList().block(); | ||||
| 
 | ||||
| 		assertThat(l1).isNotNull(); | ||||
| 		assertThat(l1).isEqualTo(l2).isEqualTo(l3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		r1 = service.cacheMono(key).block(); | ||||
| 		r2 = service.cacheMono(key).block(); | ||||
| 		r3 = service.cacheMono(key).block(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		// Same key as for Mono, reusing its cached value | ||||
| 
 | ||||
| 		r1 = service.cacheFlux(key).blockFirst(); | ||||
| 		r2 = service.cacheFlux(key).blockFirst(); | ||||
| 		r3 = service.cacheFlux(key).blockFirst(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 	} | ||||
| 
 | ||||
| 
 | ||||
| 	@CacheConfig(cacheNames = "first") | ||||
| 	static class ReactiveCacheableService { | ||||
| 
 | ||||
| 		private final AtomicLong counter = new AtomicLong(); | ||||
| 
 | ||||
| 		@Cacheable | ||||
| 		CompletableFuture<Long> cacheFuture(Object arg) { | ||||
| 			return CompletableFuture.completedFuture(this.counter.getAndIncrement()); | ||||
| 		} | ||||
| 
 | ||||
| 		@Cacheable | ||||
| 		Mono<Long> cacheMono(Object arg) { | ||||
| 			return Mono.just(this.counter.getAndIncrement()); | ||||
| 		} | ||||
| 
 | ||||
| 		@Cacheable | ||||
| 		Flux<Long> cacheFlux(Object arg) { | ||||
| 			return Flux.just(this.counter.getAndIncrement(), 0L); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 
 | ||||
| 	@Configuration(proxyBeanMethods = false) | ||||
| 	@EnableCaching | ||||
| 	static class Config { | ||||
| 
 | ||||
| 		@Bean | ||||
| 		CacheManager cacheManager() { | ||||
| 			CaffeineCacheManager ccm = new CaffeineCacheManager("first"); | ||||
| 			ccm.setAsyncCacheMode(true); | ||||
| 			return ccm; | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
|  | @ -25,9 +25,9 @@ import org.springframework.lang.Nullable; | |||
| /** | ||||
|  * Interface that defines common cache operations. | ||||
|  * | ||||
|  * <p>Serves as an SPI for Spring's annotation-based caching model | ||||
|  * ({@link org.springframework.cache.annotation.Cacheable} and co) | ||||
|  * as well as an API for direct usage in applications. | ||||
|  * <p>Serves primarily as an SPI for Spring's annotation-based caching | ||||
|  * model ({@link org.springframework.cache.annotation.Cacheable} and co) | ||||
|  * and secondarily as an API for direct usage in applications. | ||||
|  * | ||||
|  * <p><b>Note:</b> Due to the generic use of caching, it is recommended | ||||
|  * that implementations allow storage of {@code null} values | ||||
|  | @ -113,16 +113,26 @@ public interface Cache { | |||
| 	 * wrapped in a {@link CompletableFuture}. This operation must not block | ||||
| 	 * but is allowed to return a completed {@link CompletableFuture} if the | ||||
| 	 * corresponding value is immediately available. | ||||
| 	 * <p>Returns {@code null} if the cache contains no mapping for this key; | ||||
| 	 * otherwise, the cached value (which may be {@code null} itself) will | ||||
| 	 * be returned in the {@link CompletableFuture}. | ||||
| 	 * <p>Can return {@code null} if the cache can immediately determine that | ||||
| 	 * it contains no mapping for this key (e.g. through an in-memory key map). | ||||
| 	 * Otherwise, the cached value will be returned in the {@link CompletableFuture}, | ||||
| 	 * with {@code null} indicating a late-determined cache miss (and a nested | ||||
| 	 * {@link ValueWrapper} potentially indicating a nullable cached value). | ||||
| 	 * @param key the key whose associated value is to be returned | ||||
| 	 * @return the value to which this cache maps the specified key, | ||||
| 	 * contained within a {@link CompletableFuture} which may also hold | ||||
| 	 * a cached {@code null} value. A straight {@code null} being | ||||
| 	 * returned means that the cache contains no mapping for this key. | ||||
| 	 * @return the value to which this cache maps the specified key, contained | ||||
| 	 * within a {@link CompletableFuture} which may also be empty when a cache | ||||
| 	 * miss has been late-determined. A straight {@code null} being returned | ||||
| 	 * means that the cache immediately determined that it contains no mapping | ||||
| 	 * for this key. A {@link ValueWrapper} contained within the | ||||
| 	 * {@code CompletableFuture} can indicate a cached value that is potentially | ||||
| 	 * {@code null}; this is sensible in a late-determined scenario where a regular | ||||
| 	 * CompletableFuture-contained {@code null} indicates a cache miss. However, | ||||
| 	 * an early-determined cache will usually return the plain cached value here, | ||||
| 	 * and a late-determined cache may also return a plain value if it does not | ||||
| 	 * support the actual caching of {@code null} values. Spring's common cache | ||||
| 	 * processing can deal with all variants of these implementation strategies. | ||||
| 	 * @since 6.1 | ||||
| 	 * @see #get(Object) | ||||
| 	 * @see #retrieve(Object, Supplier) | ||||
| 	 */ | ||||
| 	@Nullable | ||||
| 	default CompletableFuture<?> retrieve(Object key) { | ||||
|  |  | |||
|  | @ -363,7 +363,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 	protected Object execute(CacheOperationInvoker invoker, Object target, Method method, Object[] args) { | ||||
| 		// Check whether aspect is enabled (to cope with cases where the AJ is pulled in automatically) | ||||
| 		if (this.initialized) { | ||||
| 			Class<?> targetClass = getTargetClass(target); | ||||
| 			Class<?> targetClass = AopProxyUtils.ultimateTargetClass(target); | ||||
| 			CacheOperationSource cacheOperationSource = getCacheOperationSource(); | ||||
| 			if (cacheOperationSource != null) { | ||||
| 				Collection<CacheOperation> operations = cacheOperationSource.getCacheOperations(method, targetClass); | ||||
|  | @ -374,7 +374,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		return invoker.invoke(); | ||||
| 		return invokeOperation(invoker); | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
|  | @ -392,10 +392,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 		return invoker.invoke(); | ||||
| 	} | ||||
| 
 | ||||
| 	private Class<?> getTargetClass(Object target) { | ||||
| 		return AopProxyUtils.ultimateTargetClass(target); | ||||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
| 	private Object execute(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { | ||||
| 		if (contexts.isSynchronized()) { | ||||
|  | @ -408,7 +404,104 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 				CacheOperationExpressionEvaluator.NO_RESULT); | ||||
| 
 | ||||
| 		// Check if we have a cached value matching the conditions | ||||
| 		Object cacheHit = findCachedValue(contexts.get(CacheableOperation.class)); | ||||
| 		Object cacheHit = findCachedValue(invoker, method, contexts); | ||||
| 		if (cacheHit == null || cacheHit instanceof Cache.ValueWrapper) { | ||||
| 			return evaluate(cacheHit, invoker, method, contexts); | ||||
| 		} | ||||
| 		return cacheHit; | ||||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
| 	private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { | ||||
| 		CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); | ||||
| 		if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { | ||||
| 			Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); | ||||
| 			Cache cache = context.getCaches().iterator().next(); | ||||
| 			if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) { | ||||
| 				return cache.retrieve(key, () -> (CompletableFuture<?>) invokeOperation(invoker)); | ||||
| 			} | ||||
| 			if (this.reactiveCachingHandler != null) { | ||||
| 				Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key); | ||||
| 				if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { | ||||
| 					return returnValue; | ||||
| 				} | ||||
| 			} | ||||
| 			try { | ||||
| 				return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker)))); | ||||
| 			} | ||||
| 			catch (Cache.ValueRetrievalException ex) { | ||||
| 				// Directly propagate ThrowableWrapper from the invoker, | ||||
| 				// or potentially also an IllegalArgumentException etc. | ||||
| 				ReflectionUtils.rethrowRuntimeException(ex.getCause()); | ||||
| 				// Never reached | ||||
| 				return null; | ||||
| 			} | ||||
| 		} | ||||
| 		else { | ||||
| 			// No caching required, just call the underlying method | ||||
| 			return invokeOperation(invoker); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * Find a cached value only for {@link CacheableOperation} that passes the condition. | ||||
| 	 * @param contexts the cacheable operations | ||||
| 	 * @return a {@link Cache.ValueWrapper} holding the cached value, | ||||
| 	 * or {@code null} if none is found | ||||
| 	 */ | ||||
| 	@Nullable | ||||
| 	private Object findCachedValue(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { | ||||
| 		for (CacheOperationContext context : contexts.get(CacheableOperation.class)) { | ||||
| 			if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { | ||||
| 				Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); | ||||
| 				Object cached = findInCaches(context, key, invoker, method, contexts); | ||||
| 				if (cached != null) { | ||||
| 					if (logger.isTraceEnabled()) { | ||||
| 						logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames()); | ||||
| 					} | ||||
| 					return cached; | ||||
| 				} | ||||
| 				else { | ||||
| 					if (logger.isTraceEnabled()) { | ||||
| 						logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames()); | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		return null; | ||||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
| 	private Object findInCaches(CacheOperationContext context, Object key, | ||||
| 			CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { | ||||
| 
 | ||||
| 		for (Cache cache : context.getCaches()) { | ||||
| 			if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { | ||||
| 				CompletableFuture<?> result = cache.retrieve(key); | ||||
| 				if (result != null) { | ||||
| 					return result.thenCompose(value -> (CompletableFuture<?>) evaluate( | ||||
| 							(value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null), | ||||
| 							invoker, method, contexts)); | ||||
| 				} | ||||
| 			} | ||||
| 			if (this.reactiveCachingHandler != null) { | ||||
| 				Object returnValue = this.reactiveCachingHandler.findInCaches( | ||||
| 						context, cache, key, invoker, method, contexts); | ||||
| 				if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { | ||||
| 					return returnValue; | ||||
| 				} | ||||
| 			} | ||||
| 			Cache.ValueWrapper result = doGet(cache, key); | ||||
| 			if (result != null) { | ||||
| 				return result; | ||||
| 			} | ||||
| 		} | ||||
| 		return null; | ||||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
| 	private Object evaluate(@Nullable Object cacheHit, CacheOperationInvoker invoker, Method method, | ||||
| 			CacheOperationContexts contexts) { | ||||
| 
 | ||||
| 		Object cacheValue; | ||||
| 		Object returnValue; | ||||
|  | @ -452,35 +545,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
| 	private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { | ||||
| 		CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); | ||||
| 		if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { | ||||
| 			Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); | ||||
| 			Cache cache = context.getCaches().iterator().next(); | ||||
| 			if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) { | ||||
| 				return cache.retrieve(key, () -> (CompletableFuture<?>) invokeOperation(invoker)); | ||||
| 			} | ||||
| 			if (this.reactiveCachingHandler != null) { | ||||
| 				Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key); | ||||
| 				if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { | ||||
| 					return returnValue; | ||||
| 				} | ||||
| 			} | ||||
| 			try { | ||||
| 				return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker)))); | ||||
| 			} | ||||
| 			catch (Cache.ValueRetrievalException ex) { | ||||
| 				// Directly propagate ThrowableWrapper from the invoker, | ||||
| 				// or potentially also an IllegalArgumentException etc. | ||||
| 				ReflectionUtils.rethrowRuntimeException(ex.getCause()); | ||||
| 				// Never reached | ||||
| 				return null; | ||||
| 			} | ||||
| 		} | ||||
| 		else { | ||||
| 			// No caching required, just call the underlying method | ||||
| 			return invokeOperation(invoker); | ||||
| 		} | ||||
| 	private Object unwrapCacheValue(@Nullable Object cacheValue) { | ||||
| 		return (cacheValue instanceof Cache.ValueWrapper wrapper ? wrapper.get() : cacheValue); | ||||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
|  | @ -575,34 +641,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * Find a cached value only for {@link CacheableOperation} that passes the condition. | ||||
| 	 * @param contexts the cacheable operations | ||||
| 	 * @return a {@link Cache.ValueWrapper} holding the cached value, | ||||
| 	 * or {@code null} if none is found | ||||
| 	 */ | ||||
| 	@Nullable | ||||
| 	private Object findCachedValue(Collection<CacheOperationContext> contexts) { | ||||
| 		for (CacheOperationContext context : contexts) { | ||||
| 			if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { | ||||
| 				Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); | ||||
| 				Object cached = findInCaches(context, key); | ||||
| 				if (cached != null) { | ||||
| 					if (logger.isTraceEnabled()) { | ||||
| 						logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames()); | ||||
| 					} | ||||
| 					return cached; | ||||
| 				} | ||||
| 				else { | ||||
| 					if (logger.isTraceEnabled()) { | ||||
| 						logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames()); | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		return null; | ||||
| 	} | ||||
| 
 | ||||
| 	/** | ||||
| 	 * Collect the {@link CachePutRequest} for all {@link CacheOperation} using | ||||
| 	 * the specified result value. | ||||
|  | @ -621,23 +659,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	@Nullable | ||||
| 	private Object findInCaches(CacheOperationContext context, Object key) { | ||||
| 		for (Cache cache : context.getCaches()) { | ||||
| 			if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { | ||||
| 				return cache.retrieve(key); | ||||
| 			} | ||||
| 			if (this.reactiveCachingHandler != null) { | ||||
| 				Object returnValue = this.reactiveCachingHandler.findInCaches(context, cache, key); | ||||
| 				if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { | ||||
| 					return returnValue; | ||||
| 				} | ||||
| 			} | ||||
| 			return doGet(cache, key); | ||||
| 		} | ||||
| 		return null; | ||||
| 	} | ||||
| 
 | ||||
| 	private boolean isConditionPassing(CacheOperationContext context, @Nullable Object result) { | ||||
| 		boolean passing = context.isConditionPassing(result); | ||||
| 		if (!passing && logger.isTraceEnabled()) { | ||||
|  | @ -1048,8 +1069,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 			return NOT_HANDLED; | ||||
| 		} | ||||
| 
 | ||||
| 		@SuppressWarnings("unchecked") | ||||
| 		@Nullable | ||||
| 		public Object findInCaches(CacheOperationContext context, Cache cache, Object key) { | ||||
| 		public Object findInCaches(CacheOperationContext context, Cache cache, Object key, | ||||
| 				CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { | ||||
| 
 | ||||
| 			ReactiveAdapter adapter = this.registry.getAdapter(context.getMethod().getReturnType()); | ||||
| 			if (adapter != null) { | ||||
| 				CompletableFuture<?> cachedFuture = cache.retrieve(key); | ||||
|  | @ -1057,11 +1081,16 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker | |||
| 					return null; | ||||
| 				} | ||||
| 				if (adapter.isMultiValue()) { | ||||
| 					return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) | ||||
| 							.flatMap(v -> (v instanceof Iterable<?> iv ? Flux.fromIterable(iv) : Flux.just(v)))); | ||||
| 					return adapter.fromPublisher(Flux.from( | ||||
| 							Mono.fromFuture(cachedFuture) | ||||
| 									.flatMap(value -> (Mono<?>) evaluate(Mono.just(unwrapCacheValue(value)), invoker, method, contexts))) | ||||
| 							.flatMap(v -> (v instanceof Iterable<?> iv ? Flux.fromIterable(iv) : Flux.just(v))) | ||||
| 							.switchIfEmpty(Flux.defer(() -> (Flux<?>) evaluate(null, invoker, method, contexts)))); | ||||
| 				} | ||||
| 				else { | ||||
| 					return adapter.fromPublisher(Mono.fromFuture(cachedFuture)); | ||||
| 					return adapter.fromPublisher(Mono.fromFuture(cachedFuture) | ||||
| 							.flatMap(value -> (Mono<?>) evaluate(Mono.just(unwrapCacheValue(value)), invoker, method, contexts)) | ||||
| 							.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))); | ||||
| 				} | ||||
| 			} | ||||
| 			return NOT_HANDLED; | ||||
|  |  | |||
|  | @ -182,11 +182,13 @@ class CacheReproTests { | |||
| 		Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); | ||||
| 
 | ||||
| 		TestBean tb = bean.findById("tb1").join(); | ||||
| 		assertThat(tb).isNotNull(); | ||||
| 		assertThat(bean.findById("tb1").join()).isSameAs(tb); | ||||
| 		assertThat(cache.get("tb1").get()).isSameAs(tb); | ||||
| 
 | ||||
| 		bean.clear().join(); | ||||
| 		TestBean tb2 = bean.findById("tb1").join(); | ||||
| 		assertThat(tb2).isNotNull(); | ||||
| 		assertThat(tb2).isNotSameAs(tb); | ||||
| 		assertThat(cache.get("tb1").get()).isSameAs(tb2); | ||||
| 
 | ||||
|  | @ -230,11 +232,13 @@ class CacheReproTests { | |||
| 		Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); | ||||
| 
 | ||||
| 		TestBean tb = bean.findById("tb1").block(); | ||||
| 		assertThat(tb).isNotNull(); | ||||
| 		assertThat(bean.findById("tb1").block()).isSameAs(tb); | ||||
| 		assertThat(cache.get("tb1").get()).isSameAs(tb); | ||||
| 
 | ||||
| 		bean.clear().block(); | ||||
| 		TestBean tb2 = bean.findById("tb1").block(); | ||||
| 		assertThat(tb2).isNotNull(); | ||||
| 		assertThat(tb2).isNotSameAs(tb); | ||||
| 		assertThat(cache.get("tb1").get()).isSameAs(tb2); | ||||
| 
 | ||||
|  | @ -278,11 +282,13 @@ class CacheReproTests { | |||
| 		Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); | ||||
| 
 | ||||
| 		List<TestBean> tb = bean.findById("tb1").collectList().block(); | ||||
| 		assertThat(tb).isNotEmpty(); | ||||
| 		assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); | ||||
| 		assertThat(cache.get("tb1").get()).isEqualTo(tb); | ||||
| 
 | ||||
| 		bean.clear().blockLast(); | ||||
| 		List<TestBean> tb2 = bean.findById("tb1").collectList().block(); | ||||
| 		assertThat(tb2).isNotEmpty(); | ||||
| 		assertThat(tb2).isNotEqualTo(tb); | ||||
| 		assertThat(cache.get("tb1").get()).isEqualTo(tb2); | ||||
| 
 | ||||
|  |  | |||
|  | @ -16,14 +16,21 @@ | |||
| 
 | ||||
| package org.springframework.cache.annotation; | ||||
| 
 | ||||
| import java.util.List; | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| import java.util.concurrent.atomic.AtomicLong; | ||||
| 
 | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.junit.jupiter.params.ParameterizedTest; | ||||
| import org.junit.jupiter.params.provider.ValueSource; | ||||
| import reactor.core.publisher.Flux; | ||||
| import reactor.core.publisher.Mono; | ||||
| 
 | ||||
| import org.springframework.cache.Cache; | ||||
| import org.springframework.cache.CacheManager; | ||||
| import org.springframework.cache.concurrent.ConcurrentMapCache; | ||||
| import org.springframework.cache.concurrent.ConcurrentMapCacheManager; | ||||
| import org.springframework.context.ConfigurableApplicationContext; | ||||
| import org.springframework.cache.support.SimpleValueWrapper; | ||||
| import org.springframework.context.ApplicationContext; | ||||
| import org.springframework.context.annotation.AnnotationConfigApplicationContext; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.context.annotation.Configuration; | ||||
|  | @ -34,24 +41,71 @@ import static org.assertj.core.api.Assertions.assertThat; | |||
|  * Tests for annotation-based caching methods that use reactive operators. | ||||
|  * | ||||
|  * @author Stephane Nicoll | ||||
|  * @author Juergen Hoeller | ||||
|  * @since 6.1 | ||||
|  */ | ||||
| public class ReactiveCachingTests { | ||||
| 
 | ||||
| 	private final ConfigurableApplicationContext ctx; | ||||
| 	@ParameterizedTest | ||||
| 	@ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, | ||||
| 			LateCacheHitDeterminationConfig.class, | ||||
| 			LateCacheHitDeterminationWithValueWrapperConfig.class}) | ||||
| 	void cacheHitDetermination(Class<?> configClass) { | ||||
| 		ApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); | ||||
| 		ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); | ||||
| 
 | ||||
| 	private final ReactiveCacheableService service; | ||||
| 
 | ||||
| 	public ReactiveCachingTests() { | ||||
| 		this.ctx = new AnnotationConfigApplicationContext(TestConfig.class); | ||||
| 		this.service = this.ctx.getBean(ReactiveCacheableService.class); | ||||
| 	} | ||||
| 
 | ||||
| 	@Test | ||||
| 	void cache() { | ||||
| 		Object key = new Object(); | ||||
| 		Long r1 = this.service.cache(key).block(); | ||||
| 		Long r2 = this.service.cache(key).block(); | ||||
| 		Long r3 = this.service.cache(key).block(); | ||||
| 
 | ||||
| 		Long r1 = service.cacheFuture(key).join(); | ||||
| 		Long r2 = service.cacheFuture(key).join(); | ||||
| 		Long r3 = service.cacheFuture(key).join(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		r1 = service.cacheMono(key).block(); | ||||
| 		r2 = service.cacheMono(key).block(); | ||||
| 		r3 = service.cacheMono(key).block(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		r1 = service.cacheFlux(key).blockFirst(); | ||||
| 		r2 = service.cacheFlux(key).blockFirst(); | ||||
| 		r3 = service.cacheFlux(key).blockFirst(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		List<Long> l1 = service.cacheFlux(key).collectList().block(); | ||||
| 		List<Long> l2 = service.cacheFlux(key).collectList().block(); | ||||
| 		List<Long> l3 = service.cacheFlux(key).collectList().block(); | ||||
| 
 | ||||
| 		assertThat(l1).isNotNull(); | ||||
| 		assertThat(l1).isEqualTo(l2).isEqualTo(l3); | ||||
| 
 | ||||
| 		key = new Object(); | ||||
| 
 | ||||
| 		r1 = service.cacheMono(key).block(); | ||||
| 		r2 = service.cacheMono(key).block(); | ||||
| 		r3 = service.cacheMono(key).block(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 
 | ||||
| 		// Same key as for Mono, reusing its cached value | ||||
| 
 | ||||
| 		r1 = service.cacheFlux(key).blockFirst(); | ||||
| 		r2 = service.cacheFlux(key).blockFirst(); | ||||
| 		r3 = service.cacheFlux(key).blockFirst(); | ||||
| 
 | ||||
| 		assertThat(r1).isNotNull(); | ||||
| 		assertThat(r1).isSameAs(r2).isSameAs(r3); | ||||
| 	} | ||||
| 
 | ||||
|  | @ -62,27 +116,78 @@ public class ReactiveCachingTests { | |||
| 		private final AtomicLong counter = new AtomicLong(); | ||||
| 
 | ||||
| 		@Cacheable | ||||
| 		Mono<Long> cache(Object arg1) { | ||||
| 		CompletableFuture<Long> cacheFuture(Object arg) { | ||||
| 			return CompletableFuture.completedFuture(this.counter.getAndIncrement()); | ||||
| 		} | ||||
| 
 | ||||
| 		@Cacheable | ||||
| 		Mono<Long> cacheMono(Object arg) { | ||||
| 			return Mono.just(this.counter.getAndIncrement()); | ||||
| 		} | ||||
| 
 | ||||
| 		@Cacheable | ||||
| 		Flux<Long> cacheFlux(Object arg) { | ||||
| 			return Flux.just(this.counter.getAndIncrement(), 0L); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 
 | ||||
| 	@Configuration(proxyBeanMethods = false) | ||||
| 	@EnableCaching | ||||
| 	static class TestConfig { | ||||
| 	static class EarlyCacheHitDeterminationConfig { | ||||
| 
 | ||||
| 		@Bean | ||||
| 		CacheManager cacheManager() { | ||||
| 			return new ConcurrentMapCacheManager("first"); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 
 | ||||
| 	@Configuration(proxyBeanMethods = false) | ||||
| 	@EnableCaching | ||||
| 	static class LateCacheHitDeterminationConfig { | ||||
| 
 | ||||
| 		@Bean | ||||
| 		ReactiveCacheableService reactiveCacheableService() { | ||||
| 			return new ReactiveCacheableService(); | ||||
| 		CacheManager cacheManager() { | ||||
| 			return new ConcurrentMapCacheManager("first") { | ||||
| 				@Override | ||||
| 				protected Cache createConcurrentMapCache(String name) { | ||||
| 					return new ConcurrentMapCache(name, isAllowNullValues()) { | ||||
| 						@Override | ||||
| 						public CompletableFuture<?> retrieve(Object key) { | ||||
| 							return CompletableFuture.completedFuture(lookup(key)); | ||||
| 						} | ||||
| 					}; | ||||
| 				} | ||||
| 			}; | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 
 | ||||
| 	@Configuration(proxyBeanMethods = false) | ||||
| 	@EnableCaching | ||||
| 	static class LateCacheHitDeterminationWithValueWrapperConfig { | ||||
| 
 | ||||
| 		@Bean | ||||
| 		CacheManager cacheManager() { | ||||
| 			return new ConcurrentMapCacheManager("first") { | ||||
| 				@Override | ||||
| 				protected Cache createConcurrentMapCache(String name) { | ||||
| 					return new ConcurrentMapCache(name, isAllowNullValues()) { | ||||
| 						@Override | ||||
| 						public CompletableFuture<?> retrieve(Object key) { | ||||
| 							Object value = lookup(key); | ||||
| 							if (value != null) { | ||||
| 								return CompletableFuture.completedFuture(new SimpleValueWrapper(fromStoreValue(value))); | ||||
| 							} | ||||
| 							else { | ||||
| 								return CompletableFuture.completedFuture(null); | ||||
| 							} | ||||
| 						} | ||||
| 					}; | ||||
| 				} | ||||
| 			}; | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue