Add BlockingExecutionConfigurer to WebFlux config

Closes gh-30678
This commit is contained in:
rstoyanchev 2023-07-12 16:54:45 +01:00
parent f40d1f2329
commit b016f385e1
10 changed files with 334 additions and 20 deletions

View File

@ -700,9 +700,8 @@ Java::
@Override
public void configurePathMatch(PathMatchConfigurer configurer) {
configurer
.setUseCaseSensitiveMatch(true)
.addPathPrefix("/api", HandlerTypePredicate.forAnnotation(RestController.class));
configurer.addPathPrefix(
"/api", HandlerTypePredicate.forAnnotation(RestController.class));
}
}
----
@ -717,9 +716,8 @@ Kotlin::
@Override
fun configurePathMatch(configurer: PathMatchConfigurer) {
configurer
.setUseCaseSensitiveMatch(true)
.addPathPrefix("/api", HandlerTypePredicate.forAnnotation(RestController::class.java))
configurer.addPathPrefix(
"/api", HandlerTypePredicate.forAnnotation(RestController::class.java))
}
}
----
@ -740,6 +738,59 @@ reliance on it.
[[webflux-config-blocking-execution]]
== Blocking Execution
The WebFlux Java config lets you to customize blocking execution in WebFlux.
You can have blocking controller methods called on a separate thread by providing
an `Executor` such as the
{api-spring-framework}/core/task/VirtualThreadTaskExecutor.html[`VirtualThreadTaskExecutor`]
as follows:
[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
@Override
public void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
Executor executor = ...
configurer.setExecutor(executor);
}
}
----
Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
@Configuration
@EnableWebFlux
class WebConfig : WebFluxConfigurer {
@Override
fun configureBlockingExecution(configurer: BlockingExecutionConfigurer) {
val executor = ...
configurer.setExecutor(executor)
}
}
----
======
By default, controller methods whose return type is not recognized by the configured
`ReactiveAdapterRegistry` are considered blocking, but you can set a custom controller
method predicate via `BlockingExecutionConfigurer`.
[[webflux-config-websocket-service]]
== WebSocketService

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.validation.Validator;
import org.springframework.web.reactive.accept.RequestedContentTypeResolverBuilder;
import org.springframework.web.reactive.config.BlockingExecutionConfigurer;
import org.springframework.web.reactive.config.CorsRegistry;
import org.springframework.web.reactive.config.DelegatingWebFluxConfiguration;
import org.springframework.web.reactive.config.PathMatchConfigurer;
@ -122,6 +123,11 @@ class DefaultControllerSpec extends AbstractMockServerSpec<WebTestClient.Control
return this;
}
@Override
public WebTestClient.ControllerSpec blockingExecution(Consumer<BlockingExecutionConfigurer> consumer) {
this.configurer.executionConsumer = consumer;
return this;
}
@Override
protected WebHttpHandlerBuilder initHttpHandlerBuilder() {
@ -145,7 +151,7 @@ class DefaultControllerSpec extends AbstractMockServerSpec<WebTestClient.Control
}
private class TestWebFluxConfigurer implements WebFluxConfigurer {
private static class TestWebFluxConfigurer implements WebFluxConfigurer {
@Nullable
private Consumer<RequestedContentTypeResolverBuilder> contentTypeResolverConsumer;
@ -171,6 +177,9 @@ class DefaultControllerSpec extends AbstractMockServerSpec<WebTestClient.Control
@Nullable
private Consumer<ViewResolverRegistry> viewResolversConsumer;
@Nullable
private Consumer<BlockingExecutionConfigurer> executionConsumer;
@Override
public void configureContentTypeResolver(RequestedContentTypeResolverBuilder builder) {
if (this.contentTypeResolverConsumer != null) {
@ -225,6 +234,13 @@ class DefaultControllerSpec extends AbstractMockServerSpec<WebTestClient.Control
this.viewResolversConsumer.accept(registry);
}
}
@Override
public void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
if (this.executionConsumer != null) {
this.executionConsumer.accept(configurer);
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -43,6 +43,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.MultiValueMap;
import org.springframework.validation.Validator;
import org.springframework.web.reactive.accept.RequestedContentTypeResolverBuilder;
import org.springframework.web.reactive.config.BlockingExecutionConfigurer;
import org.springframework.web.reactive.config.CorsRegistry;
import org.springframework.web.reactive.config.PathMatchConfigurer;
import org.springframework.web.reactive.config.ViewResolverRegistry;
@ -353,6 +354,13 @@ public interface WebTestClient {
* @see WebFluxConfigurer#configureViewResolvers
*/
ControllerSpec viewResolvers(Consumer<ViewResolverRegistry> consumer);
/**
* Configure blocking execution options.
* @since 6.1
* @see WebFluxConfigurer#configureBlockingExecution
*/
ControllerSpec blockingExecution(Consumer<BlockingExecutionConfigurer> consumer);
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.config;
import java.util.function.Predicate;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.web.method.HandlerMethod;
/**
* Helps to configure options related to blocking execution in WebFlux.
*
* @author Rossen Stoyanchev
* @since 6.1
*/
public class BlockingExecutionConfigurer {
@Nullable
private AsyncTaskExecutor executor;
@Nullable
private Predicate<HandlerMethod> blockingControllerMethodPredicate;
/**
* Configure an executor to invoke blocking controller methods with.
* <p>By default, this is not set in which case controller methods are
* invoked without the use of an Executor.
* @param executor the task executor to use
*/
public BlockingExecutionConfigurer setExecutor(AsyncTaskExecutor executor) {
this.executor = executor;
return this;
}
/**
* Configure a predicate to decide if a controller method is blocking and
* should be called on a separate thread if an executor is
* {@link #setExecutor configured}.
* <p>The default predicate matches controller methods whose return type is
* not recognized by the configured
* {@link org.springframework.core.ReactiveAdapterRegistry}.
* @param predicate the predicate to use
*/
public BlockingExecutionConfigurer setControllerMethodPredicate(Predicate<HandlerMethod> predicate) {
this.blockingControllerMethodPredicate = predicate;
return this;
}
@Nullable
protected AsyncTaskExecutor getExecutor() {
return this.executor;
}
@Nullable
protected Predicate<HandlerMethod> getBlockingControllerMethodPredicate() {
return this.blockingControllerMethodPredicate;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -110,4 +110,8 @@ public class DelegatingWebFluxConfiguration extends WebFluxConfigurationSupport
this.configurers.configureViewResolvers(registry);
}
@Override
protected void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
this.configurers.configureBlockingExecution(configurer);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -95,6 +95,9 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware {
@Nullable
private PathMatchConfigurer pathMatchConfigurer;
@Nullable
private BlockingExecutionConfigurer blockingExecutionConfigurer;
@Nullable
private ViewResolverRegistry viewResolverRegistry;
@ -282,6 +285,14 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware {
adapter.setWebBindingInitializer(getConfigurableWebBindingInitializer(conversionService, validator));
adapter.setReactiveAdapterRegistry(reactiveAdapterRegistry);
BlockingExecutionConfigurer executorConfigurer = getBlockingExecutionConfigurer();
if (executorConfigurer.getExecutor() != null) {
adapter.setBlockingExecutor(executorConfigurer.getExecutor());
}
if (executorConfigurer.getBlockingControllerMethodPredicate() != null) {
adapter.setBlockingMethodPredicate(executorConfigurer.getBlockingControllerMethodPredicate());
}
ArgumentResolverConfigurer configurer = new ArgumentResolverConfigurer();
configureArgumentResolvers(configurer);
adapter.setArgumentResolverConfigurer(configurer);
@ -419,6 +430,27 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware {
return null;
}
/**
* Callback to build and cache the {@link BlockingExecutionConfigurer}.
* This method is final, but subclasses can override
* {@link #configureBlockingExecution}.
* @since 6.1
*/
protected final BlockingExecutionConfigurer getBlockingExecutionConfigurer() {
if (this.blockingExecutionConfigurer == null) {
this.blockingExecutionConfigurer = new BlockingExecutionConfigurer();
configureBlockingExecution(this.blockingExecutionConfigurer);
}
return this.blockingExecutionConfigurer;
}
/**
* Override this method to configure blocking execution.
* @since 6.1
*/
protected void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
}
@Bean
public HandlerFunctionAdapter handlerFunctionAdapter() {
return new HandlerFunctionAdapter();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -155,4 +155,11 @@ public interface WebFluxConfigurer {
default void configureViewResolvers(ViewResolverRegistry registry) {
}
/**
* Configure settings related to blocking execution in WebFlux.
* @since 6.1
*/
default void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -106,6 +106,11 @@ public class WebFluxConfigurerComposite implements WebFluxConfigurer {
this.delegates.forEach(delegate -> delegate.configureViewResolvers(registry));
}
@Override
public void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
this.delegates.forEach(delegate -> delegate.configureBlockingExecution(configurer));
}
@Nullable
private <T> T createSingleBean(Function<WebFluxConfigurer, T> factory, Class<T> beanType) {
List<T> result = this.delegates.stream().map(factory).filter(Objects::nonNull).toList();

View File

@ -19,10 +19,14 @@ package org.springframework.web.reactive.result.method.annotation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
@ -66,6 +70,12 @@ public class RequestMappingHandlerAdapter
@Nullable
private ArgumentResolverConfigurer argumentResolverConfigurer;
@Nullable
private Scheduler scheduler;
@Nullable
private Predicate<HandlerMethod> blockingMethodPredicate;
@Nullable
private ReactiveAdapterRegistry reactiveAdapterRegistry;
@ -126,6 +136,30 @@ public class RequestMappingHandlerAdapter
return this.argumentResolverConfigurer;
}
/**
* Configure an executor to invoke blocking controller methods with.
* <p>By default, this is not set in which case controller methods are
* invoked without the use of an Executor.
* @param executor the task executor to use
* @since 6.1
*/
public void setBlockingExecutor(@Nullable Executor executor) {
this.scheduler = (executor != null ? Schedulers.fromExecutor(executor) : null);
}
/**
* Provide a predicate to decide which controller methods to invoke through
* the configured {@link #setBlockingExecutor blockingExecutor}.
* <p>If an executor is configured, the default predicate matches controller
* methods whose return type not recognized by the configured
* {@link org.springframework.core.ReactiveAdapterRegistry}.
* @param predicate the predicate to use
* @since 6.1
*/
public void setBlockingMethodPredicate(Predicate<HandlerMethod> predicate) {
this.blockingMethodPredicate = predicate;
}
/**
* Configure the registry for adapting various reactive types.
* <p>By default this is an instance of {@link ReactiveAdapterRegistry} with
@ -164,13 +198,19 @@ public class RequestMappingHandlerAdapter
ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
this.messageReaders = codecConfigurer.getReaders();
}
if (this.argumentResolverConfigurer == null) {
this.argumentResolverConfigurer = new ArgumentResolverConfigurer();
}
if (this.reactiveAdapterRegistry == null) {
this.reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
}
if (this.scheduler != null && this.blockingMethodPredicate == null) {
this.blockingMethodPredicate = new NonReactiveHandlerMethodPredicate(this.reactiveAdapterRegistry);
}
this.methodResolver = new ControllerMethodResolver(
this.argumentResolverConfigurer, this.reactiveAdapterRegistry, this.applicationContext,
this.messageReaders, this.webBindingInitializer);
@ -202,11 +242,20 @@ public class RequestMappingHandlerAdapter
DispatchExceptionHandler exceptionHandler =
(exchange2, ex) -> handleException(exchange, ex, handlerMethod, bindingContext);
return this.modelInitializer
Mono<HandlerResult> resultMono = this.modelInitializer
.initModel(handlerMethod, bindingContext, exchange)
.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.onErrorResume(ex -> exceptionHandler.handleError(exchange, ex));
if (this.scheduler != null) {
Assert.state(this.blockingMethodPredicate != null, "Expected HandlerMethod Predicate");
if (this.blockingMethodPredicate.test(handlerMethod)) {
resultMono = resultMono.subscribeOn(this.scheduler);
}
}
return resultMono;
}
private Mono<HandlerResult> handleException(
@ -264,4 +313,18 @@ public class RequestMappingHandlerAdapter
return handleException(exchange, ex, null, null);
}
/**
* Match methods with a return type without an adapter in {@link ReactiveAdapterRegistry}.
*/
private record NonReactiveHandlerMethodPredicate(ReactiveAdapterRegistry adapterRegistry)
implements Predicate<HandlerMethod> {
@Override
public boolean test(HandlerMethod handlerMethod) {
Class<?> returnType = handlerMethod.getReturnType().getParameterType();
return (this.adapterRegistry.getAdapter(returnType) == null);
}
}
}

View File

@ -18,6 +18,8 @@ package org.springframework.web.reactive.result.method.annotation;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
@ -25,6 +27,8 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
@ -33,7 +37,10 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.config.BlockingExecutionConfigurer;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.server.adapter.ForwardedHeaderTransformer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
@ -70,6 +77,9 @@ class RequestMappingIntegrationTests extends AbstractRequestMappingIntegrationTe
url += "/";
assertThat(getRestTemplate().getForObject(url, String.class)).isEqualTo("root");
assertThat(getApplicationContext().getBean(TestExecutor.class).invocationCount.get()).isEqualTo(2);
assertThat(getApplicationContext().getBean(TestPredicate.class).invocationCount.get()).isEqualTo(2);
}
@ParameterizedHttpServerTest
@ -109,7 +119,34 @@ class RequestMappingIntegrationTests extends AbstractRequestMappingIntegrationTe
@Configuration
@EnableWebFlux
static class WebConfig {
static class WebConfig implements WebFluxConfigurer {
@Override
public void configureBlockingExecution(BlockingExecutionConfigurer configurer) {
configurer.setExecutor(executor());
configurer.setControllerMethodPredicate(predicate());
}
@Bean
TestExecutor executor() {
return new TestExecutor();
}
@Bean
TestPredicate predicate() {
return new TestPredicate();
}
}
@Configuration
static class LocalConfig {
@Bean
public ForwardedHeaderTransformer forwardedHeaderTransformer() {
return new ForwardedHeaderTransformer();
}
}
@ -145,12 +182,27 @@ class RequestMappingIntegrationTests extends AbstractRequestMappingIntegrationTe
}
@Configuration
static class LocalConfig {
private static class TestExecutor implements AsyncTaskExecutor {
@Bean
public ForwardedHeaderTransformer forwardedHeaderTransformer() {
return new ForwardedHeaderTransformer();
private final AtomicInteger invocationCount = new AtomicInteger();
@Override
public void execute(Runnable task) {
this.invocationCount.incrementAndGet();
task.run();
}
}
private static class TestPredicate implements Predicate<HandlerMethod> {
private final AtomicInteger invocationCount = new AtomicInteger();
@Override
public boolean test(HandlerMethod handlerMethod) {
this.invocationCount.incrementAndGet();
Class<?> returnType = handlerMethod.getReturnType().getParameterType();
return (ReactiveAdapterRegistry.getSharedInstance().getAdapter(returnType) == null);
}
}