WebClient method to populate the Reactor Context

The alternative is to use a filter but this makes it a little easier
and also guarantees that it will be downstream from all filters
regardless of their order, and therefore the Context will be visible
to all of them.

Closes gh-25710
This commit is contained in:
Rossen Stoyanchev 2020-11-09 22:04:57 +00:00
parent bd2640a9d6
commit 79f79e9306
4 changed files with 127 additions and 29 deletions

View File

@ -33,6 +33,7 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
@ -173,6 +174,9 @@ class DefaultWebClient implements WebClient {
private final Map<String, Object> attributes = new LinkedHashMap<>(4);
@Nullable
private Function<Context, Context> contextModifier;
@Nullable
private Consumer<ClientHttpRequest> httpRequestConsumer;
@ -298,6 +302,13 @@ class DefaultWebClient implements WebClient {
return this;
}
@Override
public RequestBodySpec context(Function<Context, Context> contextModifier) {
this.contextModifier = (this.contextModifier != null ?
this.contextModifier.andThen(contextModifier) : contextModifier);
return this;
}
@Override
public RequestBodySpec httpRequest(Consumer<ClientHttpRequest> requestConsumer) {
this.httpRequestConsumer = (this.httpRequestConsumer != null ?
@ -412,9 +423,15 @@ class DefaultWebClient implements WebClient {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
return Mono.defer(() -> {
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
return responseMono;
});
}
private ClientRequest.Builder initRequestBuilder() {

View File

@ -29,6 +29,7 @@ import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
@ -470,6 +471,17 @@ public interface WebClient {
*/
S attributes(Consumer<Map<String, Object>> attributesConsumer);
/**
* Provide a function to populate the Reactor {@code Context}. In contrast
* to {@link #attribute(String, Object) attributes} which apply only to
* the current request, the Reactor {@code Context} transparently propagates
* to the downstream processing chain which may include other nested or
* successive calls over HTTP or via other reactive clients.
* @param contextModifier the function to modify the context with
* @since 5.3.1
*/
S context(Function<Context, Context> contextModifier);
/**
* Callback for access to the {@link ClientHttpRequest} that in turn
* provides access to the native request of the underlying HTTP library.

View File

@ -129,6 +129,34 @@ public class DefaultWebClientTests {
assertThat(request.cookies().getFirst("id")).isEqualTo("123");
}
@Test
public void contextFromThreadLocal() {
WebClient client = this.builder
.filter((request, next) ->
// Async, continue on different thread
Mono.delay(Duration.ofMillis(10)).then(next.exchange(request)))
.filter((request, next) ->
Mono.deferContextual(contextView -> {
String fooValue = contextView.get("foo");
return next.exchange(ClientRequest.from(request).header("foo", fooValue).build());
}))
.build();
ThreadLocal<String> fooHolder = new ThreadLocal<>();
fooHolder.set("bar");
try {
client.get().uri("/path")
.context(context -> context.put("foo", fooHolder.get()))
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
}
finally {
fooHolder.remove();
}
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("foo")).isEqualTo("bar");
}
@Test
public void httpRequest() {
this.builder.build().get().uri("/path")
@ -196,8 +224,6 @@ public class DefaultWebClientTests {
request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/xml");
assertThat(request.cookies().getFirst("id")).isEqualTo("456");
}
@Test

View File

@ -831,7 +831,7 @@ inline-style, through the built-in `BodyInserters`, as the following example sho
[[webflux-client-filter]]
== Client Filters
== Filters
You can register a client filter (`ExchangeFilterFunction`) through the `WebClient.Builder`
in order to intercept and modify requests, as the following example shows:
@ -887,9 +887,36 @@ a filter for basic authentication through a static factory method:
.build()
----
Filters apply globally to every request. To change a filter's behavior for a specific
request, you can add request attributes to the `ClientRequest` that can then be accessed
by all filters in the chain, as the following example shows:
You can create a new `WebClient` instance by using another as a starting point. This allows
insert or removing filters without affecting the original `WebClient`. Below is an example
that inserts a basic authentication filter at index 0:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
.build();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val client = webClient.mutate()
.filters { it.add(0, basicAuthentication("user", "password")) }
.build()
----
[[webflux-client-attributes]]
== Attributes
You can add attributes to a request. This is convenient if you want to pass information
through the filter chain and influence the behavior of filters for a given request.
For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
@ -912,10 +939,11 @@ by all filters in the chain, as the following example shows:
.Kotlin
----
val client = WebClient.builder()
.filter { request, _ ->
val usr = request.attributes()["myAttribute"];
// ...
}.build()
.filter { request, _ ->
val usr = request.attributes()["myAttribute"];
// ...
}
.build()
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
@ -923,29 +951,44 @@ by all filters in the chain, as the following example shows:
.awaitBody<Unit>()
----
You can also replicate an existing `WebClient`, insert new filters, or remove already
registered filters. The following example, inserts a basic authentication filter at
index 0:
[[webflux-client-context]]
== Context
<<webflux-client-attributes>> provide a convenient way to pass information to the filter
chain but they only influence the current request. If you want to pass information that
propagates to additional requests that are nested, e.g. via `flatMap`, or executed after,
e.g. via `concatMap`, then you'll need to use the Reactor `Context`.
`WebClient` exposes a method to populate the Reactor `Context` for a given request.
This information is available to filters for the current request and it also propagates
to subsequent requests or other reactive clients participating in the downstream
processing chain. For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
WebClient client = WebClient.builder()
.filter((request, next) ->
Mono.deferContextual(contextView -> {
String value = contextView.get("foo");
// ...
}))
.build();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val client = webClient.mutate()
.filters { it.add(0, basicAuthentication("user", "password")) }
.build()
client.get().uri("https://example.org/")
.context(context -> context.put("foo", ...))
.retrieve()
.bodyToMono(String.class)
.flatMap(body -> {
// perform nested request (context propagates automatically)...
});
----
Note that you can also specify how to populate the context through the `defaultRequest`
method at the level of the `WebClient.Builder` and that applies to all requests.
This could be used for to example to pass information from `ThreadLocal` storage onto
a Reactor processing chain in a Spring MVC application.
[[webflux-client-synchronous]]