diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 1cf5225376..08ccf3548b 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -33,6 +33,7 @@ import java.util.function.Supplier; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; @@ -173,6 +174,9 @@ class DefaultWebClient implements WebClient { private final Map attributes = new LinkedHashMap<>(4); + @Nullable + private Function contextModifier; + @Nullable private Consumer httpRequestConsumer; @@ -298,6 +302,13 @@ class DefaultWebClient implements WebClient { return this; } + @Override + public RequestBodySpec context(Function contextModifier) { + this.contextModifier = (this.contextModifier != null ? + this.contextModifier.andThen(contextModifier) : contextModifier); + return this; + } + @Override public RequestBodySpec httpRequest(Consumer requestConsumer) { this.httpRequestConsumer = (this.httpRequestConsumer != null ? @@ -412,9 +423,15 @@ class DefaultWebClient implements WebClient { ClientRequest request = (this.inserter != null ? initRequestBuilder().body(this.inserter).build() : initRequestBuilder().build()); - return Mono.defer(() -> exchangeFunction.exchange(request) - .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]") - .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR)); + return Mono.defer(() -> { + Mono responseMono = exchangeFunction.exchange(request) + .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]") + .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR); + if (this.contextModifier != null) { + responseMono = responseMono.contextWrite(this.contextModifier); + } + return responseMono; + }); } private ClientRequest.Builder initRequestBuilder() { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java index 4ac5a107aa..c548e0b48a 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java @@ -29,6 +29,7 @@ import java.util.function.Predicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; @@ -470,6 +471,17 @@ public interface WebClient { */ S attributes(Consumer> attributesConsumer); + /** + * Provide a function to populate the Reactor {@code Context}. In contrast + * to {@link #attribute(String, Object) attributes} which apply only to + * the current request, the Reactor {@code Context} transparently propagates + * to the downstream processing chain which may include other nested or + * successive calls over HTTP or via other reactive clients. + * @param contextModifier the function to modify the context with + * @since 5.3.1 + */ + S context(Function contextModifier); + /** * Callback for access to the {@link ClientHttpRequest} that in turn * provides access to the native request of the underlying HTTP library. diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java index 47b0956d71..8e2a058fd3 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java @@ -129,6 +129,34 @@ public class DefaultWebClientTests { assertThat(request.cookies().getFirst("id")).isEqualTo("123"); } + @Test + public void contextFromThreadLocal() { + WebClient client = this.builder + .filter((request, next) -> + // Async, continue on different thread + Mono.delay(Duration.ofMillis(10)).then(next.exchange(request))) + .filter((request, next) -> + Mono.deferContextual(contextView -> { + String fooValue = contextView.get("foo"); + return next.exchange(ClientRequest.from(request).header("foo", fooValue).build()); + })) + .build(); + + ThreadLocal fooHolder = new ThreadLocal<>(); + fooHolder.set("bar"); + try { + client.get().uri("/path") + .context(context -> context.put("foo", fooHolder.get())) + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); + } + finally { + fooHolder.remove(); + } + + ClientRequest request = verifyAndGetRequest(); + assertThat(request.headers().getFirst("foo")).isEqualTo("bar"); + } + @Test public void httpRequest() { this.builder.build().get().uri("/path") @@ -196,8 +224,6 @@ public class DefaultWebClientTests { request = verifyAndGetRequest(); assertThat(request.headers().getFirst("Accept")).isEqualTo("application/xml"); assertThat(request.cookies().getFirst("id")).isEqualTo("456"); - - } @Test diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index 2ce33a0f5c..124b21acfb 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -831,7 +831,7 @@ inline-style, through the built-in `BodyInserters`, as the following example sho [[webflux-client-filter]] -== Client Filters +== Filters You can register a client filter (`ExchangeFilterFunction`) through the `WebClient.Builder` in order to intercept and modify requests, as the following example shows: @@ -887,9 +887,36 @@ a filter for basic authentication through a static factory method: .build() ---- -Filters apply globally to every request. To change a filter's behavior for a specific -request, you can add request attributes to the `ClientRequest` that can then be accessed -by all filters in the chain, as the following example shows: +You can create a new `WebClient` instance by using another as a starting point. This allows +insert or removing filters without affecting the original `WebClient`. Below is an example +that inserts a basic authentication filter at index 0: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; + + WebClient client = webClient.mutate() + .filters(filterList -> { + filterList.add(0, basicAuthentication("user", "password")); + }) + .build(); +---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + val client = webClient.mutate() + .filters { it.add(0, basicAuthentication("user", "password")) } + .build() +---- + + +[[webflux-client-attributes]] +== Attributes + +You can add attributes to a request. This is convenient if you want to pass information +through the filter chain and influence the behavior of filters for a given request. +For example: [source,java,indent=0,subs="verbatim,quotes",role="primary"] .Java @@ -912,10 +939,11 @@ by all filters in the chain, as the following example shows: .Kotlin ---- val client = WebClient.builder() - .filter { request, _ -> - val usr = request.attributes()["myAttribute"]; - // ... - }.build() + .filter { request, _ -> + val usr = request.attributes()["myAttribute"]; + // ... + } + .build() client.get().uri("https://example.org/") .attribute("myAttribute", "...") @@ -923,29 +951,44 @@ by all filters in the chain, as the following example shows: .awaitBody() ---- -You can also replicate an existing `WebClient`, insert new filters, or remove already -registered filters. The following example, inserts a basic authentication filter at -index 0: + +[[webflux-client-context]] +== Context + +<> provide a convenient way to pass information to the filter +chain but they only influence the current request. If you want to pass information that +propagates to additional requests that are nested, e.g. via `flatMap`, or executed after, +e.g. via `concatMap`, then you'll need to use the Reactor `Context`. + +`WebClient` exposes a method to populate the Reactor `Context` for a given request. +This information is available to filters for the current request and it also propagates +to subsequent requests or other reactive clients participating in the downstream +processing chain. For example: [source,java,indent=0,subs="verbatim,quotes",role="primary"] .Java ---- - import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; - - WebClient client = webClient.mutate() - .filters(filterList -> { - filterList.add(0, basicAuthentication("user", "password")); - }) + WebClient client = WebClient.builder() + .filter((request, next) -> + Mono.deferContextual(contextView -> { + String value = contextView.get("foo"); + // ... + })) .build(); ----- -[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] -.Kotlin ----- - val client = webClient.mutate() - .filters { it.add(0, basicAuthentication("user", "password")) } - .build() + + client.get().uri("https://example.org/") + .context(context -> context.put("foo", ...)) + .retrieve() + .bodyToMono(String.class) + .flatMap(body -> { + // perform nested request (context propagates automatically)... + }); ---- +Note that you can also specify how to populate the context through the `defaultRequest` +method at the level of the `WebClient.Builder` and that applies to all requests. +This could be used for to example to pass information from `ThreadLocal` storage onto +a Reactor processing chain in a Spring MVC application. [[webflux-client-synchronous]]