Align Reactive WebClient with web.reactive.function

This commit refactors the web client to be more similar to
web.reactive.function. Changes include:

- Refactor ClientWebRequest to immutable ClientRequest with builder and
   support for BodyInserters.
- Introduce ClientResponse which exposes headers, status, and support
   for reading from the body with BodyExtractors.
- Removed ResponseErrorHandler, in favor of having a ClientResponse
   with "error" status code (i.e. 4xx or 5xx). Also removed
   WebClientException and subclasses.
- Refactored WebClientConfig to WebClientStrategies.
- Refactored ClientHttpRequestInterceptor to ExchangeFilterFunction.
- Removed ClientWebRequestPostProcessor in favor of
   ExchangeFilterFunction, which allows for asynchronous execution.

Issue: SPR-14827
This commit is contained in:
Arjen Poutsma 2016-10-20 12:13:07 +02:00
parent dc1926a861
commit 0cfb6b37f2
43 changed files with 2191 additions and 2447 deletions

View File

@ -26,13 +26,13 @@ import reactor.core.publisher.Mono;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.BodyExtractors;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.tests.TestSubscriber;
import org.springframework.web.client.reactive.ClientRequest;
import org.springframework.web.client.reactive.WebClient;
import static org.springframework.http.codec.BodyInserters.fromServerSentEvents;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.get;
import static org.springframework.web.client.reactive.ResponseExtractors.bodyStream;
import static org.springframework.web.reactive.function.RouterFunctions.route;
/**
@ -41,11 +41,13 @@ import static org.springframework.web.reactive.function.RouterFunctions.route;
public class SseHandlerFunctionIntegrationTests
extends AbstractRouterFunctionIntegrationTests {
private static final MediaType EVENT_STREAM = new MediaType("text", "event-stream");
private WebClient webClient;
@Before
public void createWebClient() {
this.webClient = new WebClient(new ReactorClientHttpConnector());
this.webClient = WebClient.create(new ReactorClientHttpConnector());
}
@Override
@ -59,10 +61,15 @@ public class SseHandlerFunctionIntegrationTests
@Test
public void sseAsString() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/string", this.port)
.accept(EVENT_STREAM)
.build();
Flux<String> result = this.webClient
.perform(get("http://localhost:" + port + "/string")
.accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> (s.replace("\n", "")))
.take(2);
@ -75,10 +82,15 @@ public class SseHandlerFunctionIntegrationTests
@Test
public void sseAsPerson() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/person", this.port)
.accept(EVENT_STREAM)
.build();
Mono<String> result = this.webClient
.perform(get("http://localhost:" + port + "/person")
.accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> s.replace("\n", ""))
.takeUntil(s -> s.endsWith("foo 1\"}"))
@ -92,10 +104,15 @@ public class SseHandlerFunctionIntegrationTests
@Test
public void sseAsEvent() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/event", this.port)
.accept(EVENT_STREAM)
.build();
Flux<String> result = this.webClient
.perform(get("http://localhost:" + port + "/event")
.accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> s.replace("\n", ""))
.take(2);
@ -107,7 +124,9 @@ public class SseHandlerFunctionIntegrationTests
"id:0:bardata:foo",
"id:1:bardata:foo"
);
;
}
private static class SseHandler {
public Response<Publisher<String>> string(Request request) {
@ -177,5 +196,4 @@ public class SseHandlerFunctionIntegrationTests
}
}
}

View File

@ -28,34 +28,38 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.BodyExtractors;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.tests.TestSubscriber;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.reactive.ClientRequest;
import org.springframework.web.client.reactive.WebClient;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.config.EnableWebReactive;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.get;
import static org.springframework.web.client.reactive.ResponseExtractors.bodyStream;
/**
* @author Sebastien Deleuze
*/
public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private static final MediaType EVENT_STREAM = new MediaType("text", "event-stream");
private AnnotationConfigApplicationContext wac;
private WebClient webClient;
@Override
@Before
public void setup() throws Exception {
super.setup();
this.webClient = new WebClient(new ReactorClientHttpConnector());
this.webClient = WebClient.create(new ReactorClientHttpConnector());
}
@ -70,10 +74,15 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Test
public void sseAsString() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/sse/string", this.port)
.accept(EVENT_STREAM)
.build();
Flux<String> result = this.webClient
.perform(get("http://localhost:" + port + "/sse/string")
.accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> (s.replace("\n", "")))
.take(2);
@ -83,13 +92,17 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.await(Duration.ofSeconds(5))
.assertValues("data:foo 0", "data:foo 1");
}
@Test
public void sseAsPerson() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/sse/person", this.port)
.accept(EVENT_STREAM)
.build();
Mono<String> result = this.webClient
.perform(get("http://localhost:" + port + "/sse/person")
.accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> s.replace("\n", ""))
.takeUntil(s -> s.endsWith("foo 1\"}"))
@ -103,10 +116,14 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Test
public void sseAsEvent() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/sse/event", this.port)
.accept(EVENT_STREAM)
.build();
Flux<String> result = this.webClient
.perform(get("http://localhost:" + port + "/sse/event")
.accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> s.replace("\n", ""))
.take(2);
@ -122,9 +139,15 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Test
public void sseAsEventWithoutAcceptHeader() throws Exception {
ClientRequest<Void> request =
ClientRequest
.GET("http://localhost:{port}/sse/event", this.port)
.accept(EVENT_STREAM)
.build();
Flux<String> result = this.webClient
.perform(get("http://localhost:" + port + "/sse/event"))
.extract(bodyStream(String.class))
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.filter(s -> !s.equals("\n"))
.map(s -> s.replace("\n", ""))
.take(2);

View File

@ -1,44 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Contract to extract the content of a raw {@link ClientHttpResponse} decoding
* the response body and using a target composition API.
*
* <p>See static factory methods in {@link ResponseExtractors} and
* {@link org.springframework.web.client.reactive.support.RxJava1ResponseExtractors}.
*
* @author Brian Clozel
* @since 5.0
*/
public interface BodyExtractor<T> {
/**
* Extract content from the response body
* @param clientResponse the raw HTTP response
* @param messageReaders the message readers that decode the response body
* @return the relevant content
*/
T extract(ClientHttpResponse clientResponse, List<HttpMessageReader<?>> messageReaders);
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.util.function.Consumer;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMessage;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpResponse;
/**
* Delegate to the next {@link ClientHttpRequestInterceptor} in the chain.
*
* @author Brian Clozel
* @since 5.0
*/
public interface ClientHttpRequestInterceptionChain {
/**
* Delegate to the next {@link ClientHttpRequestInterceptor} in the chain.
*
* @param method the HTTP request method
* @param uri the HTTP request URI
* @param requestCallback a function that can customize the request
* by changing the HTTP request headers with {@code HttpMessage.getHeaders()}.
* @return a publisher of the resulting {@link ClientHttpResponse}
*/
Mono<ClientHttpResponse> intercept(HttpMethod method, URI uri, Consumer<? super HttpMessage> requestCallback);
}

View File

@ -1,66 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpResponse;
/**
* Contract for chain-based, interception processing of client http requests
* that may be used to implement cross-cutting requirements such
* as security, timeouts, caching, and others.
*
* <p>Implementations of this interface can be
* {@link WebClient#setInterceptors(List) registered} with the {@link WebClient}.
*
* @author Brian Clozel
* @see org.springframework.web.client.reactive.WebClient
* @since 5.0
*/
@FunctionalInterface
public interface ClientHttpRequestInterceptor {
/**
* Intercept the client HTTP request
*
* <p>The provided {@link ClientHttpRequestInterceptionChain}
* instance allows the interceptor to delegate the request
* to the next interceptor in the chain.
*
* <p>An implementation might follow this pattern:
* <ol>
* <li>Examine the {@link HttpMethod method} and {@link URI uri}</li>
* <li>Optionally change those when delegating to the next interceptor
* with the {@code ClientHttpRequestInterceptionChain}.</li>
* <li>Optionally transform the HTTP message given as an
* argument of the request callback in
* {@code chain.intercept(method, uri, requestCallback)}.</li>
* <li>Optionally transform the response before returning it.</li>
* </ol>
*
* @param method the HTTP request method
* @param uri the HTTP request URI
* @param chain the request interception chain
* @return a publisher of the {@link ClientHttpResponse}
*/
Mono<ClientHttpResponse> intercept(HttpMethod method, URI uri, ClientHttpRequestInterceptionChain chain);
}

View File

@ -0,0 +1,334 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.nio.charset.Charset;
import java.time.ZonedDateTime;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.codec.BodyInserter;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.DefaultUriTemplateHandler;
import org.springframework.web.util.UriTemplateHandler;
/**
* Represents a typed, immutable, client-side HTTP request, as executed by the {@link WebClient}.
* Instances of this interface are created via static builder methods:
* {@link #method(HttpMethod, String, Object...)}, {@link #GET(String, Object...)}, etc.
*
* @param <T> the type of the body that this request contains
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0
*/
public interface ClientRequest<T> {
// Instance methods
/**
* Return the HTTP method.
*/
HttpMethod method();
/**
* Return the request URI.
*/
URI url();
/**
* Return the headers of this request.
*/
HttpHeaders headers();
/**
* Return the cookies of this request.
*/
MultiValueMap<String, String> cookies();
/**
* Return the body of this request.
*/
T body();
/**
* Return the body inserter of this request.
*/
BodyInserter<T, ? super ClientHttpRequest> inserter();
/**
* Writes this request to the given {@link ClientHttpRequest}.
*
* @param request the client http request to write to
* @param strategies the strategies to use when writing
* @return {@code Mono<Void>} to indicate when writing is complete
*/
Mono<Void> writeTo(ClientHttpRequest request, WebClientStrategies strategies);
// Static builder methods
/**
* Create a builder with the method, URI, headers, and cookies of the given request.
*
* @param other the request to copy the method, URI, headers, and cookies from
* @return the created builder
*/
static BodyBuilder from(ClientRequest<?> other) {
Assert.notNull(other, "'other' must not be null");
return new DefaultClientRequestBuilder(other.method(), other.url())
.headers(other.headers())
.cookies(other.cookies());
}
/**
* Create a builder with the given method and url.
* @param method the HTTP method (GET, POST, etc)
* @param url the URL
* @return the created builder
*/
static BodyBuilder method(HttpMethod method, URI url) {
return new DefaultClientRequestBuilder(method, url);
}
/**
* Create a builder with the given method and url template.
* @param method the HTTP method (GET, POST, etc)
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static BodyBuilder method(HttpMethod method, String urlTemplate, Object... urlVariables) {
UriTemplateHandler templateHandler = new DefaultUriTemplateHandler();
URI url = templateHandler.expand(urlTemplate, urlVariables);
return new DefaultClientRequestBuilder(method, url);
}
/**
* Create an HTTP GET builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static HeadersBuilder<?> GET(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.GET, urlTemplate, urlVariables);
}
/**
* Create an HTTP HEAD builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static HeadersBuilder<?> HEAD(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.HEAD, urlTemplate, urlVariables);
}
/**
* Create an HTTP POST builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static BodyBuilder POST(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.POST, urlTemplate, urlVariables);
}
/**
* Create an HTTP PUT builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static BodyBuilder PUT(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.PUT, urlTemplate, urlVariables);
}
/**
* Create an HTTP PATCH builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static BodyBuilder PATCH(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.PATCH, urlTemplate, urlVariables);
}
/**
* Create an HTTP DELETE builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static HeadersBuilder<?> DELETE(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.DELETE, urlTemplate, urlVariables);
}
/**
* Creates an HTTP OPTIONS builder with the given url template.
* @param urlTemplate the URL template
* @param urlVariables optional variable to expand the template
* @return the created builder
*/
static HeadersBuilder<?> OPTIONS(String urlTemplate, Object... urlVariables) {
return method(HttpMethod.OPTIONS, urlTemplate, urlVariables);
}
/**
* Defines a builder that adds headers to the request.
*
* @param <B> the builder subclass
*/
interface HeadersBuilder<B extends HeadersBuilder<B>> {
/**
* Add the given, single header value under the given name.
* @param headerName the header name
* @param headerValues the header value(s)
* @return this builder
* @see HttpHeaders#add(String, String)
*/
B header(String headerName, String... headerValues);
/**
* Copy the given headers into the entity's headers map.
*
* @param headers the existing HttpHeaders to copy from
* @return this builder
*/
B headers(HttpHeaders headers);
/**
* Set the list of acceptable {@linkplain MediaType media types}, as
* specified by the {@code Accept} header.
* @param acceptableMediaTypes the acceptable media types
* @return this builder
*/
B accept(MediaType... acceptableMediaTypes);
/**
* Set the list of acceptable {@linkplain Charset charsets}, as specified
* by the {@code Accept-Charset} header.
* @param acceptableCharsets the acceptable charsets
* @return this builder
*/
B acceptCharset(Charset... acceptableCharsets);
/**
* Set the value of the {@code If-Modified-Since} header.
* <p>The date should be specified as the number of milliseconds since
* January 1, 1970 GMT.
* @param ifModifiedSince the new value of the header
* @return this builder
*/
B ifModifiedSince(ZonedDateTime ifModifiedSince);
/**
* Set the values of the {@code If-None-Match} header.
* @param ifNoneMatches the new value of the header
* @return this builder
*/
B ifNoneMatch(String... ifNoneMatches);
/**
* Add a cookie with the given name and value.
* @param name the cookie name
* @param value the cookie value
* @return this builder
*/
B cookie(String name, String value);
/**
* Copy the given cookies into the entity's cookies map.
*
* @param cookies the existing cookies to copy from
* @return this builder
*/
B cookies(MultiValueMap<String, String> cookies);
/**
* Builds the request entity with no body.
* @return the request entity
*/
ClientRequest<Void> build();
}
/**
* Defines a builder that adds a body to the request entity.
*/
interface BodyBuilder extends HeadersBuilder<BodyBuilder> {
/**
* Set the length of the body in bytes, as specified by the
* {@code Content-Length} header.
* @param contentLength the content length
* @return this builder
* @see HttpHeaders#setContentLength(long)
*/
BodyBuilder contentLength(long contentLength);
/**
* Set the {@linkplain MediaType media type} of the body, as specified
* by the {@code Content-Type} header.
* @param contentType the content type
* @return this builder
* @see HttpHeaders#setContentType(MediaType)
*/
BodyBuilder contentType(MediaType contentType);
/**
* Set the body of the request to the given {@code BodyInserter} and return it.
* @param inserter the {@code BodyInserter} that writes to the request
* @param <T> the type contained in the body
* @return the built request
*/
<T> ClientRequest<T> body(BodyInserter<T, ? super ClientHttpRequest> inserter);
/**
* Set the body of the request to the given {@code Publisher} and return it.
* @param publisher the {@code Publisher} to write to the request
* @param elementClass the class of elements contained in the publisher
* @param <T> the type of the elements contained in the publisher
* @param <S> the type of the {@code Publisher}
* @return the built request
*/
<T, S extends Publisher<T>> ClientRequest<S> body(S publisher, Class<T> elementClass);
/**
* Set the body of the request to the given {@code Publisher} and return it.
* @param publisher the {@code Publisher} to write to the request
* @param elementType the type of elements contained in the publisher
* @param <T> the type of the elements contained in the publisher
* @param <S> the type of the {@code Publisher}.
* @return the built request
*/
<T, S extends Publisher<T>> ClientRequest<S> body(S publisher, ResolvableType elementType);
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.BodyExtractor;
/**
* Represents an HTTP response, as returned by the {@link WebClient}.
* Access to headers and body is offered by {@link Headers} and
* {@link #body(BodyExtractor)} respectively.
*
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0
*/
public interface ClientResponse {
/**
* Return the status code of this response.
*/
HttpStatus statusCode();
/**
* Return the headers of this response.
*/
Headers headers();
/**
* Extract the body with the given {@code BodyExtractor}.
* @param extractor the {@code BodyExtractor} that reads from the response
* @param <T> the type of the body returned
* @return the extracted body
*/
<T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor);
/**
* Represents the headers of the HTTP response.
* @see ClientResponse#headers()
*/
interface Headers {
/**
* Return the length of the body in bytes, as specified by the
* {@code Content-Length} header.
*/
OptionalLong contentLength();
/**
* Return the {@linkplain MediaType media type} of the body, as specified
* by the {@code Content-Type} header.
*/
Optional<MediaType> contentType();
/**
* Return the header value(s), if any, for the header of the given name.
* <p>Return an empty list if no header values are found.
*
* @param headerName the header name
*/
List<String> header(String headerName);
/**
* Return the headers as a {@link HttpHeaders} instance.
*/
HttpHeaders asHttpHeaders();
}
}

View File

@ -1,101 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.MultiValueMap;
/**
* Simple container for application-level information required to perform an
* HTTP client request.
*
* <p>The request body is provided through a {@code Publisher<Object>} where the
* type of each Object is indicated through a {@link ResolvableType} which
* subsequently is used to correctly serialize into the
* {@code Publisher<DataBuffer>} actually written to request body.
*
* @author Brian Clozel
* @since 5.0
*/
public class ClientWebRequest {
protected final HttpMethod httpMethod;
protected final URI url;
protected HttpHeaders httpHeaders;
private MultiValueMap<String, HttpCookie> cookies;
protected Publisher<?> body;
protected ResolvableType elementType;
public ClientWebRequest(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.url = url;
}
public HttpMethod getMethod() {
return httpMethod;
}
public URI getUrl() {
return url;
}
public HttpHeaders getHttpHeaders() {
return httpHeaders;
}
public void setHttpHeaders(HttpHeaders httpHeaders) {
this.httpHeaders = httpHeaders;
}
public MultiValueMap<String, HttpCookie> getCookies() {
return cookies;
}
public void setCookies(MultiValueMap<String, HttpCookie> cookies) {
this.cookies = cookies;
}
public Publisher<?> getBody() {
return body;
}
public void setBody(Publisher<?> body) {
this.body = body;
}
public ResolvableType getElementType() {
return elementType;
}
public void setElementType(ResolvableType elementType) {
this.elementType = elementType;
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
/**
* Build {@link ClientWebRequest}s.
*
* @author Brian Clozel
* @since 5.0
*/
public interface ClientWebRequestBuilder {
ClientWebRequest build();
}

View File

@ -1,103 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import org.springframework.http.HttpMethod;
/**
* Static factory methods for {@link DefaultClientWebRequestBuilder
* ClientWebRequestBuilders}.
*
* @author Brian Clozel
* @since 5.0
*/
public abstract class ClientWebRequestBuilders {
/**
* Create a {@link DefaultClientWebRequestBuilder} for a GET request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder get(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a POST request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder post(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a PUT request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder put(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a PATCH request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder patch(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a DELETE request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder delete(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for an OPTIONS request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder options(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a HEAD request.
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder head(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a request with the given HTTP method.
* @param httpMethod the HTTP method
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
/**
* Contract to post-process the {@link ClientWebRequest} after it is created
* an initialized in order to mofidy or even wrap it. This may be used for
* example to pre-package specific modifications to the request.
*
* @author Rob Winch
* @author Brian Clozel
* @since 5.0
* @see DefaultClientWebRequestBuilder#apply(ClientWebRequestPostProcessor)
*/
public interface ClientWebRequestPostProcessor {
/**
* Implementations can modify and/or wrap the {@link ClientWebRequest}
* passed in and return it
* @param request the {@link ClientWebRequest} to be modified and/or wrapped.
*/
ClientWebRequest postProcess(ClientWebRequest request);
}

View File

@ -1,68 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Base64.Encoder;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* Static factory methods for creating {@link ClientWebRequestPostProcesor} instances.
*
* @author Rob Winch
* @since 5.0
* @see DefaultClientWebRequestBuilder#apply(ClientWebRequestPostProcessors)
*/
public abstract class ClientWebRequestPostProcessors {
/**
* Adds an Authorization header for HTTP Basic
* @param username the username to add
* @param password the password to add
* @return the {@link ClientWebRequestPostProcessor} that adds the Authorization header
*/
public static ClientWebRequestPostProcessor httpBasic(String username, String password) {
Assert.notNull(username, "username cannot be null");
Assert.notNull(password, "password cannot be null");
return new ClientWebRequestPostProcessor() {
@Override
public ClientWebRequest postProcess(ClientWebRequest toPostProcess) {
String authorization = authorization(username, password);
toPostProcess.getHttpHeaders().set(HttpHeaders.AUTHORIZATION, authorization);
return toPostProcess;
}
private String authorization(String username, String password) {
String credentials = username + ":" + password;
return authorization(credentials);
}
private String authorization(String credentials) {
byte[] credentialBytes = credentials.getBytes(Charset.defaultCharset());
Encoder encoder = Base64.getEncoder();
String encodedCredentials = encoder.encodeToString(credentialBytes);
return "Basic " + encodedCredentials;
}
};
}
}

View File

@ -0,0 +1,240 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.nio.charset.Charset;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.codec.BodyInserter;
import org.springframework.http.codec.BodyInserters;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* Default implementation of {@link ClientRequest.BodyBuilder}.
*
* @author Arjen Poutsma
* @since 5.0
*/
class DefaultClientRequestBuilder implements ClientRequest.BodyBuilder {
private final HttpMethod method;
private final URI url;
private final HttpHeaders headers = new HttpHeaders();
private final MultiValueMap<String, String> cookies = new LinkedMultiValueMap<>();
public DefaultClientRequestBuilder(HttpMethod method, URI url) {
this.method = method;
this.url = url;
}
@Override
public ClientRequest.BodyBuilder header(String headerName, String... headerValues) {
for (String headerValue : headerValues) {
this.headers.add(headerName, headerValue);
}
return this;
}
@Override
public ClientRequest.BodyBuilder headers(HttpHeaders headers) {
if (headers != null) {
this.headers.putAll(headers);
}
return this;
}
@Override
public ClientRequest.BodyBuilder accept(MediaType... acceptableMediaTypes) {
this.headers.setAccept(Arrays.asList(acceptableMediaTypes));
return this;
}
@Override
public ClientRequest.BodyBuilder acceptCharset(Charset... acceptableCharsets) {
this.headers.setAcceptCharset(Arrays.asList(acceptableCharsets));
return this;
}
@Override
public ClientRequest.BodyBuilder ifModifiedSince(ZonedDateTime ifModifiedSince) {
ZonedDateTime gmt = ifModifiedSince.withZoneSameInstant(ZoneId.of("GMT"));
String headerValue = DateTimeFormatter.RFC_1123_DATE_TIME.format(gmt);
this.headers.set(HttpHeaders.IF_MODIFIED_SINCE, headerValue);
return this;
}
@Override
public ClientRequest.BodyBuilder ifNoneMatch(String... ifNoneMatches) {
this.headers.setIfNoneMatch(Arrays.asList(ifNoneMatches));
return this;
}
@Override
public ClientRequest.BodyBuilder cookie(String name, String value) {
this.cookies.add(name, value);
return this;
}
@Override
public ClientRequest.BodyBuilder cookies(MultiValueMap<String, String> cookies) {
if (cookies != null) {
this.cookies.putAll(cookies);
}
return this;
}
@Override
public ClientRequest<Void> build() {
return body(BodyInserter.of(
(response, configuration) -> response.setComplete(),
() -> null));
}
@Override
public ClientRequest.BodyBuilder contentLength(long contentLength) {
this.headers.setContentLength(contentLength);
return this;
}
@Override
public ClientRequest.BodyBuilder contentType(MediaType contentType) {
this.headers.setContentType(contentType);
return this;
}
@Override
public <T> ClientRequest<T> body(BodyInserter<T, ? super ClientHttpRequest> inserter) {
Assert.notNull(inserter, "'inserter' must not be null");
return new BodyInserterRequest<T>(this.method, this.url, this.headers, this.cookies,
inserter);
}
@Override
public <T, S extends Publisher<T>> ClientRequest<S> body(S publisher, Class<T> elementClass) {
return body(BodyInserters.fromPublisher(publisher, elementClass));
}
@Override
public <T, S extends Publisher<T>> ClientRequest<S> body(S publisher,
ResolvableType elementType) {
return body(BodyInserters.fromPublisher(publisher, elementType));
}
private static class BodyInserterRequest<T> implements ClientRequest<T> {
private final HttpMethod method;
private final URI url;
private final HttpHeaders headers;
private final MultiValueMap<String, String> cookies;
private final BodyInserter<T, ? super ClientHttpRequest> inserter;
public BodyInserterRequest(HttpMethod method, URI url, HttpHeaders headers,
MultiValueMap<String, String> cookies,
BodyInserter<T, ? super ClientHttpRequest> inserter) {
this.method = method;
this.url = url;
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
this.cookies = CollectionUtils.unmodifiableMultiValueMap(cookies);
this.inserter = inserter;
}
@Override
public HttpMethod method() {
return this.method;
}
@Override
public URI url() {
return this.url;
}
@Override
public HttpHeaders headers() {
return this.headers;
}
@Override
public MultiValueMap<String, String> cookies() {
return this.cookies;
}
@Override
public T body() {
return this.inserter.t();
}
@Override
public BodyInserter<T, ? super ClientHttpRequest> inserter() {
return this.inserter;
}
@Override
public Mono<Void> writeTo(ClientHttpRequest request, WebClientStrategies strategies) {
HttpHeaders requestHeaders = request.getHeaders();
if (!this.headers.isEmpty()) {
this.headers.entrySet().stream()
.filter(entry -> !requestHeaders.containsKey(entry.getKey()))
.forEach(entry -> requestHeaders
.put(entry.getKey(), entry.getValue()));
}
MultiValueMap<String, HttpCookie> requestCookies = request.getCookies();
if (!this.cookies.isEmpty()) {
this.cookies.entrySet().forEach(entry -> {
String name = entry.getKey();
entry.getValue().forEach(value -> {
HttpCookie cookie = new HttpCookie(name, value);
requestCookies.add(name, cookie);
});
});
}
return this.inserter.insert(request, new BodyInserter.Context() {
@Override
public Supplier<Stream<HttpMessageWriter<?>>> messageWriters() {
return strategies.messageWriters();
}
});
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.BodyExtractor;
import org.springframework.http.codec.HttpMessageReader;
/**
* Default implementation of {@link ClientResponse}.
*
* @author Arjen Poutsma
* @since 5.0
*/
class DefaultClientResponse implements ClientResponse {
private final ClientHttpResponse response;
private final Headers headers;
private final WebClientStrategies strategies;
public DefaultClientResponse(ClientHttpResponse response, WebClientStrategies strategies) {
this.response = response;
this.strategies = strategies;
this.headers = new DefaultHeaders();
}
@Override
public HttpStatus statusCode() {
return this.response.getStatusCode();
}
@Override
public Headers headers() {
return this.headers;
}
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return extractor.extract(this.response, new BodyExtractor.Context() {
@Override
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
return strategies.messageReaders();
}
});
}
public ClientHttpResponse clientHttpResponse() {
return this.response;
}
private class DefaultHeaders implements Headers {
private HttpHeaders delegate() {
return response.getHeaders();
}
@Override
public OptionalLong contentLength() {
return toOptionalLong(delegate().getContentLength());
}
@Override
public Optional<MediaType> contentType() {
return Optional.ofNullable(delegate().getContentType());
}
@Override
public List<String> header(String headerName) {
List<String> headerValues = delegate().get(headerName);
return headerValues != null ? headerValues : Collections.emptyList();
}
@Override
public HttpHeaders asHttpHeaders() {
return HttpHeaders.readOnlyHttpHeaders(delegate());
}
private OptionalLong toOptionalLong(long value) {
return value != -1 ? OptionalLong.of(value) : OptionalLong.empty();
}
}
}

View File

@ -1,197 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.DefaultUriTemplateHandler;
import org.springframework.web.util.UriTemplateHandler;
/**
* Builds a {@link ClientHttpRequest} using a {@link Publisher}
* as request body.
*
* <p>See static factory methods in {@link ClientWebRequestBuilders}.
*
* @author Brian Clozel
* @since 5.0
* @see ClientWebRequestBuilders
*/
public class DefaultClientWebRequestBuilder implements ClientWebRequestBuilder {
private final UriTemplateHandler uriTemplateHandler = new DefaultUriTemplateHandler();
private HttpMethod httpMethod;
private HttpHeaders httpHeaders;
private URI url;
private final MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
private Publisher<?> body;
private ResolvableType elementType;
private List<ClientWebRequestPostProcessor> postProcessors = new ArrayList<>();
protected DefaultClientWebRequestBuilder() {
}
public DefaultClientWebRequestBuilder(HttpMethod httpMethod, String urlTemplate,
Object... urlVariables) {
this.httpMethod = httpMethod;
this.httpHeaders = new HttpHeaders();
this.url = this.uriTemplateHandler.expand(urlTemplate, urlVariables);
}
public DefaultClientWebRequestBuilder(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.httpHeaders = new HttpHeaders();
this.url = url;
}
/**
* Add an HTTP request header
*/
public DefaultClientWebRequestBuilder header(String name, String... values) {
Arrays.stream(values).forEach(value -> this.httpHeaders.add(name, value));
return this;
}
/**
* Add all provided HTTP request headers
*/
public DefaultClientWebRequestBuilder headers(HttpHeaders httpHeaders) {
this.httpHeaders = httpHeaders;
return this;
}
/**
* Set the Content-Type request header to the given {@link MediaType}
*/
public DefaultClientWebRequestBuilder contentType(MediaType contentType) {
this.httpHeaders.setContentType(contentType);
return this;
}
/**
* Set the Content-Type request header to the given media type
*/
public DefaultClientWebRequestBuilder contentType(String contentType) {
this.httpHeaders.setContentType(MediaType.parseMediaType(contentType));
return this;
}
/**
* Set the Accept request header to the given {@link MediaType}s
*/
public DefaultClientWebRequestBuilder accept(MediaType... mediaTypes) {
this.httpHeaders.setAccept(Arrays.asList(mediaTypes));
return this;
}
/**
* Set the Accept request header to the given media types
*/
public DefaultClientWebRequestBuilder accept(String... mediaTypes) {
this.httpHeaders.setAccept(
Arrays.stream(mediaTypes).map(type -> MediaType.parseMediaType(type))
.collect(Collectors.toList()));
return this;
}
/**
* Add a Cookie to the HTTP request
*/
public DefaultClientWebRequestBuilder cookie(String name, String value) {
return cookie(new HttpCookie(name, value));
}
/**
* Add a Cookie to the HTTP request
*/
public DefaultClientWebRequestBuilder cookie(HttpCookie cookie) {
this.cookies.add(cookie.getName(), cookie);
return this;
}
/**
* Allows performing more complex operations with a strategy. For example, a
* {@link ClientWebRequestPostProcessor} implementation might accept the arguments of username
* and password and set an HTTP Basic authentication header.
*
* @param postProcessor the {@link ClientWebRequestPostProcessor} to use. Cannot be null.
*
* @return this instance for further modifications.
*/
public DefaultClientWebRequestBuilder apply(ClientWebRequestPostProcessor postProcessor) {
Assert.notNull(postProcessor, "`postProcessor` is required");
this.postProcessors.add(postProcessor);
return this;
}
/**
* Use the given object as the request body
*/
public DefaultClientWebRequestBuilder body(Object content) {
this.body = Mono.just(content);
this.elementType = ResolvableType.forInstance(content);
return this;
}
/**
* Use the given {@link Publisher} as the request body and use its {@link ResolvableType}
* as type information for the element published by this reactive stream
*/
public DefaultClientWebRequestBuilder body(Publisher<?> content, ResolvableType publisherType) {
this.body = content;
this.elementType = publisherType;
return this;
}
@Override
public ClientWebRequest build() {
ClientWebRequest clientWebRequest = new ClientWebRequest(this.httpMethod, this.url);
clientWebRequest.setHttpHeaders(this.httpHeaders);
clientWebRequest.setCookies(this.cookies);
clientWebRequest.setBody(this.body);
clientWebRequest.setElementType(this.elementType);
for (ClientWebRequestPostProcessor postProcessor : this.postProcessors) {
clientWebRequest = postProcessor.postProcess(clientWebRequest);
}
return clientWebRequest;
}
}

View File

@ -1,45 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Default implementation of the {@link ResponseErrorHandler} interface
* that throws {@link WebClientErrorException}s for HTTP 4xx responses
* and {@link WebServerErrorException}s for HTTP 5xx responses.
*
* @author Brian Clozel
* @since 5.0
*/
public class DefaultResponseErrorHandler implements ResponseErrorHandler {
@Override
public void handleError(ClientHttpResponse response, List<HttpMessageReader<?>> messageReaders) {
HttpStatus responseStatus = response.getStatusCode();
if (responseStatus.is4xxClientError()) {
throw new WebClientErrorException(response, messageReaders);
}
if (responseStatus.is5xxServerError()) {
throw new WebServerErrorException(response, messageReaders);
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.logging.Level;
import reactor.core.publisher.Mono;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.util.Assert;
/**
* Default implementation of {@link WebClient.Builder}.
*
* @author Arjen Poutsma
* @since 5.0
*/
class DefaultWebClientBuilder implements WebClient.Builder {
private ClientHttpConnector clientHttpConnector;
private WebClientStrategies strategies = WebClientStrategies.withDefaults();
private ExchangeFilterFunction filter = new NoOpFilter();
public DefaultWebClientBuilder(ClientHttpConnector clientHttpConnector) {
this.clientHttpConnector = clientHttpConnector;
}
@Override
public WebClient.Builder strategies(WebClientStrategies strategies) {
Assert.notNull(strategies, "'strategies' must not be null");
this.strategies = strategies;
return this;
}
@Override
public WebClient.Builder filter(ExchangeFilterFunction filter) {
Assert.notNull(filter, "'filter' must not be null");
this.filter = filter.andThen(this.filter);
return this;
}
@Override
public WebClient build() {
return new DefaultWebClient(this.clientHttpConnector, this.strategies, this.filter);
}
private final static class DefaultWebClient implements WebClient {
private final ClientHttpConnector clientHttpConnector;
private final WebClientStrategies strategies;
private final ExchangeFilterFunction filter;
public DefaultWebClient(
ClientHttpConnector clientHttpConnector,
WebClientStrategies strategies,
ExchangeFilterFunction filter) {
this.clientHttpConnector = clientHttpConnector;
this.strategies = strategies;
this.filter = filter;
}
@Override
public Mono<ClientResponse> exchange(ClientRequest<?> request) {
Assert.notNull(request, "'request' must not be null");
return this.filter.filter(request, this::exchangeInternal);
}
private Mono<ClientResponse> exchangeInternal(ClientRequest<?> request) {
return this.clientHttpConnector
.connect(request.method(), request.url(),
clientHttpRequest -> request
.writeTo(clientHttpRequest, this.strategies))
.log("org.springframework.web.client.reactive", Level.FINE)
.map(clientHttpResponse -> new DefaultClientResponse(clientHttpResponse,
this.strategies));
}
}
private class NoOpFilter implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(ClientRequest<?> request, ExchangeFunction next) {
return next.exchange(request);
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.springframework.context.ApplicationContext;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Default implementation of {@link WebClientStrategies.Builder}.
*
* @author Arjen Poutsma
* @since 5.0
*/
class DefaultWebClientStrategiesBuilder implements WebClientStrategies.Builder {
private static final boolean jackson2Present =
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper",
DefaultWebClientStrategiesBuilder.class.getClassLoader()) &&
ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
DefaultWebClientStrategiesBuilder.class.getClassLoader());
private static final boolean jaxb2Present =
ClassUtils.isPresent("javax.xml.bind.Binder",
DefaultWebClientStrategiesBuilder.class.getClassLoader());
private final List<HttpMessageReader<?>> messageReaders = new ArrayList<>();
private final List<HttpMessageWriter<?>> messageWriters = new ArrayList<>();
public void defaultConfiguration() {
messageReader(new DecoderHttpMessageReader<>(new ByteBufferDecoder()));
messageReader(new DecoderHttpMessageReader<>(new StringDecoder(false)));
messageWriter(new EncoderHttpMessageWriter<>(new ByteBufferEncoder()));
messageWriter(new EncoderHttpMessageWriter<>(new CharSequenceEncoder()));
if (jaxb2Present) {
messageReader(new DecoderHttpMessageReader<>(new Jaxb2XmlDecoder()));
messageWriter(new EncoderHttpMessageWriter<>(new Jaxb2XmlEncoder()));
}
if (jackson2Present) {
messageReader(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder()));
messageWriter(new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder()));
}
}
public void applicationContext(ApplicationContext applicationContext) {
applicationContext.getBeansOfType(HttpMessageReader.class).values().forEach(this::messageReader);
applicationContext.getBeansOfType(HttpMessageWriter.class).values().forEach(this::messageWriter);
}
@Override
public WebClientStrategies.Builder messageReader(HttpMessageReader<?> messageReader) {
Assert.notNull(messageReader, "'messageReader' must not be null");
this.messageReaders.add(messageReader);
return this;
}
@Override
public WebClientStrategies.Builder decoder(Decoder<?> decoder) {
Assert.notNull(decoder, "'decoder' must not be null");
return messageReader(new DecoderHttpMessageReader<>(decoder));
}
@Override
public WebClientStrategies.Builder messageWriter(HttpMessageWriter<?> messageWriter) {
Assert.notNull(messageWriter, "'messageWriter' must not be null");
this.messageWriters.add(messageWriter);
return this;
}
@Override
public WebClientStrategies.Builder encoder(Encoder<?> encoder) {
Assert.notNull(encoder, "'encoder' must not be null");
return messageWriter(new EncoderHttpMessageWriter<>(encoder));
}
@Override
public WebClientStrategies build() {
return new DefaultWebClientStrategies(this.messageReaders, this.messageWriters);
}
private static class DefaultWebClientStrategies implements WebClientStrategies {
private final List<HttpMessageReader<?>> messageReaders;
private final List<HttpMessageWriter<?>> messageWriters;
public DefaultWebClientStrategies(
List<HttpMessageReader<?>> messageReaders,
List<HttpMessageWriter<?>> messageWriters) {
this.messageReaders = unmodifiableCopy(messageReaders);
this.messageWriters = unmodifiableCopy(messageWriters);
}
private static <T> List<T> unmodifiableCopy(List<? extends T> list) {
return Collections.unmodifiableList(new ArrayList<>(list));
}
@Override
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
return this.messageReaders::stream;
}
@Override
public Supplier<Stream<HttpMessageWriter<?>>> messageWriters() {
return this.messageWriters::stream;
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import reactor.core.publisher.Mono;
import org.springframework.util.Assert;
/**
* Represents a function that filters an {@linkplain ExchangeFunction exchange function}.
*
* @author Arjen Poutsma
* @since 5.0
*/
@FunctionalInterface
public interface ExchangeFilterFunction {
/**
* Apply this filter to the given request and exchange function. The given
* {@linkplain ExchangeFunction exchange function} represents the next entity in the
* chain, and can be {@linkplain ExchangeFunction#exchange(ClientRequest) invoked} in order
* to proceed to the exchange, or not invoked to block the chain.
*
* @param request the request
* @param next the next exchange function in the chain
* @return the filtered response
*/
Mono<ClientResponse> filter(ClientRequest<?> request, ExchangeFunction next);
/**
* Return a composed filter function that first applies this filter, and then applies the
* {@code after} filter.
* @param after the filter to apply after this filter is applied
* @return a composed filter that first applies this function and then applies the
* {@code after} function
*/
default ExchangeFilterFunction andThen(ExchangeFilterFunction after) {
Assert.notNull(after, "'after' must not be null");
return (request, next) -> {
ExchangeFunction nextExchange = exchangeRequest -> after.filter(exchangeRequest, next);
return filter(request, nextExchange);
};
}
/**
* Apply this filter to the given exchange function, resulting in a filtered exchange function.
* @param exchange the exchange function to filter
* @return the filtered exchange function
*/
default ExchangeFunction apply(ExchangeFunction exchange) {
Assert.notNull(exchange, "'exchange' must not be null");
return request -> this.filter(request, exchange);
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* Implementations of {@link ExchangeFilterFunction} that provide various useful request filter
* operations, such as basic authentication.
*
* @author Rob Winch
* @author Arjen Poutsma
* @since 5.0
*/
public abstract class ExchangeFilterFunctions {
private static final Base64.Encoder BASE_64_ENCODER = Base64.getEncoder();
/**
* Return a filter that adds an Authorization header for HTTP Basic.
* @param username the username to use
* @param password the password to use
* @return the {@link ExchangeFilterFunction} that adds the Authorization header
*/
public static ExchangeFilterFunction basicAuthentication(String username, String password) {
Assert.notNull(username, "'username' must not be null");
Assert.notNull(password, "'password' must not be null");
return new ExchangeFilterFunction() {
@Override
public Mono<ClientResponse> filter(ClientRequest<?> request, ExchangeFunction next) {
String authorization = authorization(username, password);
ClientRequest<?> authorizedRequest = ClientRequest.from(request)
.header(HttpHeaders.AUTHORIZATION, authorization)
.body(request.inserter());
return next.exchange(authorizedRequest);
}
private String authorization(String username, String password) {
String credentials = username + ":" + password;
return authorization(credentials);
}
private String authorization(String credentials) {
byte[] credentialBytes = credentials.getBytes(StandardCharsets.ISO_8859_1);
byte[] encodedBytes = BASE_64_ENCODER.encode(credentialBytes);
String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1);
return "Basic " + encodedCredentials;
}
};
}
}

View File

@ -16,23 +16,23 @@
package org.springframework.web.client.reactive;
import org.springframework.core.NestedRuntimeException;
import reactor.core.publisher.Mono;
/**
* Base class for exceptions thrown by {@link WebClient}.
* Represents a function that exchanges a {@linkplain ClientRequest request} for a (delayed)
* {@linkplain ClientResponse}.
*
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0
*/
@SuppressWarnings("serial")
public class WebClientException extends NestedRuntimeException {
@FunctionalInterface
public interface ExchangeFunction {
public WebClientException(String msg) {
super(msg);
}
public WebClientException(String msg, Throwable cause) {
super(msg, cause);
}
/**
* Exchange the given request for a response mono.
* @param request the request to exchange
* @return the response, wrapped in a {@code Mono}
*/
Mono<ClientResponse> exchange(ClientRequest<?> request);
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Strategy interface used by the {@link WebClient} to handle errors in
* {@link ClientHttpResponse}s if needed.
*
* @author Brian Clozel
* @see DefaultResponseErrorHandler
* @since 5.0
*/
public interface ResponseErrorHandler {
/**
* Handle the error in the given response.
* Implementations will typically inspect the
* {@link ClientHttpResponse#getStatusCode() HttpStatus} of the response and
* throw {@link WebClientException}s in case of errors.
*/
void handleError(ClientHttpResponse response, List<HttpMessageReader<?>> messageReaders);
}

View File

@ -1,43 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import reactor.core.publisher.Mono;
import org.springframework.http.client.reactive.ClientHttpResponse;
/**
* A {@code ResponseExtractor} extracts the relevant part of a
* raw {@link org.springframework.http.client.reactive.ClientHttpResponse},
* optionally decoding the response body and using a target composition API.
*
* <p>See static factory methods in {@link ResponseExtractors} and
* {@link org.springframework.web.client.reactive.support.RxJava1ResponseExtractors}.
*
* @author Brian Clozel
* @since 5.0
*/
public interface ResponseExtractor<T> {
/**
* Extract content from the response
* @param clientResponse the raw HTTP response
* @param webClientConfig the {@link WebClient} configuration information
* @return the relevant part of the response
*/
T extract(Mono<ClientHttpResponse> clientResponse, WebClientConfig webClientConfig);
}

View File

@ -1,207 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.Collections;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Static factory methods for {@link ResponseExtractor} and {@link BodyExtractor},
* based on the {@link Flux} and {@link Mono} APIs.
*
* @author Brian Clozel
* @since 5.0
*/
public abstract class ResponseExtractors {
private static final Object EMPTY_BODY = new Object();
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> ResponseExtractor<Mono<T>> body(ResolvableType bodyType) {
return (clientResponse, webClientConfig) -> (Mono<T>) clientResponse
.doOnNext(response -> webClientConfig.getResponseErrorHandler()
.handleError(response, webClientConfig.getMessageReaders()))
.flatMap(resp -> decodeResponseBodyAsMono(resp, bodyType,
webClientConfig.getMessageReaders()))
.next();
}
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}.
*/
public static <T> ResponseExtractor<Mono<T>> body(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return body(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> BodyExtractor<Mono<T>> as(ResolvableType bodyType) {
return (clientResponse, messageConverters) ->
decodeResponseBodyAsMono(clientResponse, bodyType, messageConverters);
}
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}
*/
public static <T> BodyExtractor<Mono<T>> as(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return as(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Flux<T>> bodyStream(ResolvableType bodyType) {
return (clientResponse, webClientConfig) -> clientResponse
.doOnNext(response -> webClientConfig.getResponseErrorHandler()
.handleError(response, webClientConfig.getMessageReaders()))
.flatMap(resp -> decodeResponseBody(resp, bodyType, webClientConfig.getMessageReaders()));
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}.
*/
public static <T> ResponseExtractor<Flux<T>> bodyStream(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return bodyStream(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> BodyExtractor<Flux<T>> asStream(ResolvableType bodyType) {
return (clientResponse, messageConverters) ->
(Flux<T>) decodeResponseBody(clientResponse, bodyType, messageConverters);
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}
*/
public static <T> BodyExtractor<Flux<T>> asStream(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return asStream(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a single type {@code T}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> ResponseExtractor<Mono<ResponseEntity<T>>> response(ResolvableType bodyType) {
return (clientResponse, webClientConfig) -> clientResponse.then(response ->
Mono.when(
decodeResponseBodyAsMono(response, bodyType,
webClientConfig.getMessageReaders()).defaultIfEmpty(EMPTY_BODY),
Mono.just(response.getHeaders()),
Mono.just(response.getStatusCode()))
).map(tuple -> {
Object body = (tuple.getT1() != EMPTY_BODY ? tuple.getT1() : null);
return new ResponseEntity<>((T) body, tuple.getT2(), tuple.getT3());
});
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a single type {@code T}.
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<T>>> response(Class<T> bodyClass) {
ResolvableType bodyType = ResolvableType.forClass(bodyClass);
return response(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a {@code Flux<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<Flux<T>>>> responseStream(ResolvableType type) {
return (clientResponse, webClientConfig) -> clientResponse
.map(response -> new ResponseEntity<>(
// ResponseExtractors.<T> is required for Eclipse JDT.
ResponseExtractors.<T> decodeResponseBody(response, type, webClientConfig.getMessageReaders()),
response.getHeaders(), response.getStatusCode()));
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a {@code Flux<T>}.
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<Flux<T>>>> responseStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return responseStream(resolvableType);
}
/**
* Extract the response headers as an {@code HttpHeaders} instance.
*/
public static ResponseExtractor<Mono<HttpHeaders>> headers() {
return (clientResponse, webClientConfig) -> clientResponse.map(resp -> resp.getHeaders());
}
@SuppressWarnings("unchecked")
protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response,
ResolvableType responseType, List<HttpMessageReader<?>> messageReaders) {
MediaType contentType = response.getHeaders().getContentType();
HttpMessageReader<?> reader = resolveMessageReader(messageReaders, responseType, contentType);
return (Flux<T>) reader.read(responseType, response, Collections.emptyMap());
}
@SuppressWarnings("unchecked")
protected static <T> Mono<T> decodeResponseBodyAsMono(ClientHttpResponse response,
ResolvableType responseType, List<HttpMessageReader<?>> messageReaders) {
MediaType contentType = response.getHeaders().getContentType();
HttpMessageReader<?> reader = resolveMessageReader(messageReaders, responseType, contentType);
return (Mono<T>) reader.readMono(responseType, response, Collections.emptyMap());
}
protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
ResolvableType responseType, MediaType contentType) {
return messageReaders.stream()
.filter(e -> e.canRead(responseType, contentType))
.findFirst()
.orElseThrow(() ->
new WebClientException(
"Could not decode response body of type '" + contentType
+ "' with target type '" + responseType.toString() + "'"));
}
}

View File

@ -16,347 +16,85 @@
package org.springframework.web.client.reactive;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.ResourceDecoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.http.HttpMessage;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.ResourceHttpMessageWriter;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Reactive Web client supporting the HTTP/1.1 protocol
*
* <p>Here is a simple example of a GET request:
* Reactive Web client supporting the HTTP/1.1 protocol. Main entry point is throught the
* {@link #exchange(ClientRequest)} method.
*
* <p>For example:
* <pre class="code">
* static imports: ClientWebRequestBuilder.*, ResponseExtractors.*
*
* // should be shared between HTTP calls
* WebClient client = new WebClient(new ReactorClientHttpConnector());
* WebClient client = WebClient.create(new ReactorClientHttpConnector());
* ClientRequest&lt;Void&gt; request = ClientRequest.GET("http://example.com/resource").build();
*
* Mono&lt;String&gt; result = client
* .perform(get("http://example.org/resource").accept(MediaType.TEXT_PLAIN))
* .extract(body(String.class));
* .exchange(request)
* .then(response -> response.body(BodyExtractors.toMono(String.class)));
* </pre>
*
* <p>This Web client relies on the following:
* <ul>
* <li>{@link ClientHttpConnector} implementation to drive the underlying
* library (e.g. Reactor-Netty)</li>
* <li>{@link ClientWebRequestBuilder} to create a Web request with a builder
* API (see {@link ClientWebRequestBuilders})</li>
* <li>{@link ResponseExtractor} to extract the relevant part of the server
* response with the composition API of choice (see {@link ResponseExtractors}</li>
* </ul>
*
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0
* @see ClientWebRequestBuilders
* @see ResponseExtractors
*/
public final class WebClient {
public interface WebClient {
private static final boolean jackson2Present =
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", WebClient.class.getClassLoader()) &&
ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", WebClient.class.getClassLoader());
private static final boolean jaxb2Present =
ClassUtils.isPresent("javax.xml.bind.Binder", WebClient.class.getClassLoader());
private ClientHttpConnector clientHttpConnector;
private List<ClientHttpRequestInterceptor> interceptors;
private final DefaultWebClientConfig webClientConfig;
/**
* Exchange the given request for a response mono. Invoking this method performs the actual
* HTTP request/response exchange.
* @param request the request to exchange
* @return the response, wrapped in a {@code Mono}
*/
Mono<ClientResponse> exchange(ClientRequest<?> request);
/**
* Create a {@code WebClient} instance, using the {@link ClientHttpConnector}
* implementation given as an argument to drive the underlying
* implementation.
* Register by default the following Encoders and Decoders:
* <ul>
* <li>{@link ByteBufferEncoder} / {@link ByteBufferDecoder}</li>
* <li>{@link CharSequenceEncoder} / {@link StringDecoder}</li>
* <li>{@link Jaxb2XmlEncoder} / {@link Jaxb2XmlDecoder}</li>
* <li>{@link Jackson2JsonEncoder} / {@link Jackson2JsonDecoder}</li>
* </ul>
* @param clientHttpConnector the {@code ClientHttpRequestFactory} to use
* Create a new instance of {@code WebClient} with the given connector. This method uses
* {@linkplain WebClientStrategies#withDefaults() default strategies}.
* @param connector the connector to create connections
* @return the created client
*/
public WebClient(ClientHttpConnector clientHttpConnector) {
this.clientHttpConnector = clientHttpConnector;
this.webClientConfig = new DefaultWebClientConfig();
this.webClientConfig.setResponseErrorHandler(new DefaultResponseErrorHandler());
}
/**
* Add default HTTP message readers.
*/
protected final void addDefaultHttpMessageReaders(List<HttpMessageReader<?>> messageReaders) {
messageReaders.add(new DecoderHttpMessageReader<>(new ByteBufferDecoder()));
messageReaders.add(new DecoderHttpMessageReader<>(new StringDecoder(false)));
messageReaders.add(new DecoderHttpMessageReader<>(new ResourceDecoder()));
if (jaxb2Present) {
messageReaders.add(new DecoderHttpMessageReader<>(new Jaxb2XmlDecoder()));
}
if (jackson2Present) {
messageReaders.add(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder()));
}
static WebClient create(ClientHttpConnector connector) {
return builder(connector).build();
}
/**
* Add default HTTP message writers.
* Return a builder for a {@code WebClient}.
* @param connector the connector to create connections
* @return a web client builder
*/
protected final void addDefaultHttpMessageWriters(List<HttpMessageWriter<?>> messageWriters) {
messageWriters.add(new EncoderHttpMessageWriter<>(new ByteBufferEncoder()));
messageWriters.add(new EncoderHttpMessageWriter<>(new CharSequenceEncoder()));
messageWriters.add(new ResourceHttpMessageWriter());
if (jaxb2Present) {
messageWriters.add(new EncoderHttpMessageWriter<>(new Jaxb2XmlEncoder()));
}
if (jackson2Present) {
messageWriters.add(new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder()));
}
}
/**
* Set the list of {@link HttpMessageReader}s to use for decoding the HTTP
* response body.
*/
public void setMessageReaders(List<HttpMessageReader<?>> messageReaders) {
this.webClientConfig.setMessageReaders(messageReaders);
static Builder builder(ClientHttpConnector connector) {
Assert.notNull(connector, "'connector' must not be null");
return new DefaultWebClientBuilder(connector);
}
/**
* Set the list of {@link HttpMessageWriter}s to use for encoding the HTTP
* request body.
* A mutable builder for a {@link WebClient}.
*/
public void setMessageWriters(List<HttpMessageWriter<?>> messageWrters) {
this.webClientConfig.setMessageWriters(messageWrters);
interface Builder {
/**
* Replaces the default strategies with the ones provided by the given
* {@code WebClientStrategies}.
* @param strategies the strategies to use
* @return this builder
*/
Builder strategies(WebClientStrategies strategies);
/**
* Adds a filter function <strong>before</strong> the currently registered filters (if any).
* @param filter the filter to add
* @return this builder
*/
Builder filter(ExchangeFilterFunction filter);
/**
* Builds the {@code WebClient}.
* @return the built client
*/
WebClient build();
}
/**
* Set the {@link ResponseErrorHandler} to use for handling HTTP response errors
*/
public void setResponseErrorHandler(ResponseErrorHandler responseErrorHandler) {
this.webClientConfig.setResponseErrorHandler(responseErrorHandler);
}
/**
* Set the list of {@link ClientHttpRequestInterceptor} to use
* for intercepting client HTTP requests
*/
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
this.interceptors = (interceptors != null ?
Collections.unmodifiableList(interceptors) : Collections.emptyList());
}
/**
* Perform the actual HTTP request/response exchange
* <p>Requesting from the exposed {@code Flux} will result in:
* <ul>
* <li>building the actual HTTP request using the provided {@code ClientWebRequestBuilder}</li>
* <li>encoding the HTTP request body with the configured {@code HttpMessageWriter}s</li>
* <li>returning the response with a publisher of the body</li>
* </ul>
*/
public WebResponseActions perform(ClientWebRequestBuilder builder) {
ClientWebRequest clientWebRequest = builder.build();
DefaultClientHttpRequestInterceptionChain interception =
new DefaultClientHttpRequestInterceptionChain(this.clientHttpConnector,
this.interceptors, clientWebRequest);
final Mono<ClientHttpResponse> clientResponse = interception
.intercept(clientWebRequest.getMethod(), clientWebRequest.getUrl(), null)
.log("org.springframework.web.client.reactive", Level.FINE);
return new WebResponseActions() {
@Override
public void doWithStatus(Consumer<HttpStatus> consumer) {
clientResponse.doOnNext(clientHttpResponse -> consumer.accept(clientHttpResponse.getStatusCode()));
}
@Override
public <T> T extract(ResponseExtractor<T> extractor) {
return extractor.extract(clientResponse, webClientConfig);
}
};
}
protected class DefaultWebClientConfig implements WebClientConfig {
private List<HttpMessageReader<?>> messageReaders;
private List<HttpMessageWriter<?>> messageWriters;
private ResponseErrorHandler responseErrorHandler;
public DefaultWebClientConfig() {
this.messageReaders = new ArrayList<>();
addDefaultHttpMessageReaders(this.messageReaders);
this.messageWriters = new ArrayList<>();
addDefaultHttpMessageWriters(this.messageWriters);
}
@Override
public List<HttpMessageReader<?>> getMessageReaders() {
return this.messageReaders;
}
public void setMessageReaders(List<HttpMessageReader<?>> messageReaders) {
this.messageReaders = messageReaders;
}
@Override
public List<HttpMessageWriter<?>> getMessageWriters() {
return this.messageWriters;
}
public void setMessageWriters(List<HttpMessageWriter<?>> messageWriters) {
this.messageWriters = messageWriters;
}
@Override
public ResponseErrorHandler getResponseErrorHandler() {
return responseErrorHandler;
}
public void setResponseErrorHandler(ResponseErrorHandler responseErrorHandler) {
this.responseErrorHandler = responseErrorHandler;
}
}
protected class DefaultRequestCallback implements Function<ClientHttpRequest, Mono<Void>> {
private final ClientWebRequest clientWebRequest;
private final List<Consumer<? super HttpMessage>> requestCustomizers;
public DefaultRequestCallback(ClientWebRequest clientWebRequest,
List<Consumer<? super HttpMessage>> requestCustomizers) {
this.clientWebRequest = clientWebRequest;
this.requestCustomizers = requestCustomizers;
}
@Override
public Mono<Void> apply(ClientHttpRequest clientHttpRequest) {
clientHttpRequest.getHeaders().putAll(this.clientWebRequest.getHttpHeaders());
if (clientHttpRequest.getHeaders().getAccept().isEmpty()) {
clientHttpRequest.getHeaders().setAccept(
Collections.singletonList(MediaType.ALL));
}
this.clientWebRequest.getCookies().values()
.stream().flatMap(cookies -> cookies.stream())
.forEach(cookie -> clientHttpRequest.getCookies().add(cookie.getName(), cookie));
this.requestCustomizers.forEach(customizer -> customizer.accept(clientHttpRequest));
if (this.clientWebRequest.getBody() != null) {
return writeRequestBody(this.clientWebRequest.getBody(),
this.clientWebRequest.getElementType(),
clientHttpRequest, WebClient.this.webClientConfig.getMessageWriters());
}
else {
return clientHttpRequest.setComplete();
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected Mono<Void> writeRequestBody(Publisher<?> content, ResolvableType requestType,
ClientHttpRequest request, List<HttpMessageWriter<?>> messageWriters) {
MediaType contentType = request.getHeaders().getContentType();
Optional<HttpMessageWriter<?>> messageWriter = resolveWriter(messageWriters, requestType, contentType);
if (!messageWriter.isPresent()) {
return Mono.error(new IllegalStateException(
"Could not encode request body of type '" + contentType
+ "' with target type '" + requestType.toString() + "'"));
}
return messageWriter.get().write((Publisher) content, requestType, contentType, request, Collections.emptyMap());
}
protected Optional<HttpMessageWriter<?>> resolveWriter(List<HttpMessageWriter<?>> messageWriters,
ResolvableType type, MediaType mediaType) {
return messageWriters.stream().filter(e -> e.canWrite(type, mediaType)).findFirst();
}
}
protected class DefaultClientHttpRequestInterceptionChain implements ClientHttpRequestInterceptionChain {
private final ClientHttpConnector connector;
private final List<ClientHttpRequestInterceptor> interceptors;
private final ClientWebRequest clientWebRequest;
private final List<Consumer<? super HttpMessage>> requestCustomizers;
private int index;
public DefaultClientHttpRequestInterceptionChain(ClientHttpConnector connector,
List<ClientHttpRequestInterceptor> interceptors, ClientWebRequest clientWebRequest) {
Assert.notNull(connector, "ClientHttpConnector should not be null");
this.connector = connector;
this.interceptors = interceptors;
this.clientWebRequest = clientWebRequest;
this.requestCustomizers = new ArrayList<>();
this.index = 0;
}
@Override
public Mono<ClientHttpResponse> intercept(HttpMethod method, URI uri,
Consumer<? super HttpMessage> requestCustomizer) {
if (requestCustomizer != null) {
this.requestCustomizers.add(requestCustomizer);
}
if (this.interceptors != null && this.index < this.interceptors.size()) {
ClientHttpRequestInterceptor interceptor = this.interceptors.get(this.index++);
return interceptor.intercept(method, uri, this);
}
else {
return this.connector.connect(method, uri,
new DefaultRequestCallback(this.clientWebRequest, this.requestCustomizers));
}
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
/**
* Interface that makes the {@link WebClient} configuration information
* available to downstream infrastructure such as {@link ResponseErrorHandler}s.
*
* @author Brian Clozel
* @since 5.0
*/
public interface WebClientConfig {
/**
* Return the message readers that can help decoding the HTTP response body
*/
List<HttpMessageReader<?>> getMessageReaders();
/**
* Return the message writers that can help encode the HTTP request body
*/
List<HttpMessageWriter<?>> getMessageWriters();
/**
* Return the configured {@link ResponseErrorHandler}
*/
ResponseErrorHandler getResponseErrorHandler();
}

View File

@ -1,50 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Exception thrown when an HTTP 4xx is received.
*
* @author Brian Clozel
* @since 5.0
*/
@SuppressWarnings("serial")
public class WebClientErrorException extends WebClientResponseException {
/**
* Construct a new instance of {@code HttpClientErrorException} based on a
* {@link ClientHttpResponse} and {@link HttpMessageReader}s to optionally
* help decoding the response body
*
* @param response the HTTP response
* @param messageReaders the message converters that may decode the HTTP response body
*/
public WebClientErrorException(ClientHttpResponse response, List<HttpMessageReader<?>> messageReaders) {
super(initMessage(response), response, messageReaders);
}
private static String initMessage(ClientHttpResponse response) {
return response.getStatusCode().value() + " " + response.getStatusCode().getReasonPhrase();
}
}

View File

@ -1,81 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Base class for exceptions associated with specific HTTP client response
* status codes.
*
* @author Brian Clozel
* @since 5.0
*/
@SuppressWarnings("serial")
public class WebClientResponseException extends WebClientException {
private final ClientHttpResponse clientResponse;
private final List<HttpMessageReader<?>> messageReaders;
/**
* Construct a new instance of {@code WebClientResponseException} with the given response data
* @param message the given error message
* @param clientResponse the HTTP response
* @param messageReaders the message converters that maay decode the HTTP response body
*/
public WebClientResponseException(String message, ClientHttpResponse clientResponse,
List<HttpMessageReader<?>> messageReaders) {
super(message);
this.clientResponse = clientResponse;
this.messageReaders = messageReaders;
}
/**
* Return the HTTP status
*/
public HttpStatus getStatus() {
return this.clientResponse.getStatusCode();
}
/**
* Return the HTTP response headers
*/
public HttpHeaders getResponseHeaders() {
return this.clientResponse.getHeaders();
}
/**
* Perform an extraction of the response body into a higher level representation.
*
* <pre class="code">
* static imports: ResponseExtractors.*
*
* String responseBody = clientResponse.getResponseBody(as(String.class));
* </pre>
*/
public <T> T getResponseBody(BodyExtractor<T> extractor) {
return extractor.extract(this.clientResponse, this.messageReaders);
}
}

View File

@ -0,0 +1,189 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.springframework.context.ApplicationContext;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.util.Assert;
/**
* Defines the strategies to be used by the {@link WebClient}. An instance of
* this class is immutable; instances are typically created through the mutable {@link Builder}:
* either through {@link #builder()} to set up default strategies, or {@link #empty()} to start from
* scratch. Alternatively, {@code WebClientStrategies} instances can be created through
* {@link #of(Supplier, Supplier)}.
*
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0
*/
public interface WebClientStrategies {
// Instance methods
/**
* Supply a {@linkplain Stream stream} of {@link HttpMessageReader}s to be used for request
* body conversion.
* @return the stream of message readers
*/
Supplier<Stream<HttpMessageReader<?>>> messageReaders();
/**
* Supply a {@linkplain Stream stream} of {@link HttpMessageWriter}s to be used for response
* body conversion.
* @return the stream of message writers
*/
Supplier<Stream<HttpMessageWriter<?>>> messageWriters();
// Static methods
/**
* Return a new {@code WebClientStrategies} with default initialization.
* @return the new {@code WebClientStrategies}
*/
static WebClientStrategies withDefaults() {
return builder().build();
}
/**
* Return a new {@code WebClientStrategies} based on the given
* {@linkplain ApplicationContext application context}.
* The returned supplier will search for all {@link HttpMessageReader}, and
* {@link HttpMessageWriter} instances in the given application context and return them for
* {@link #messageReaders()}, and {@link #messageWriters()} respectively.
* @param applicationContext the application context to base the strategies on
* @return the new {@code WebClientStrategies}
*/
static WebClientStrategies of(ApplicationContext applicationContext) {
return builder(applicationContext).build();
}
/**
* Return a new {@code WebClientStrategies} described by the given supplier functions.
* All provided supplier function parameters can be {@code null} to indicate an empty
* stream is to be returned.
* @param messageReaders the supplier function for {@link HttpMessageReader} instances (can be {@code null})
* @param messageWriters the supplier function for {@link HttpMessageWriter} instances (can be {@code null})
* @return the new {@code WebClientStrategies}
*/
static WebClientStrategies of(Supplier<Stream<HttpMessageReader<?>>> messageReaders,
Supplier<Stream<HttpMessageWriter<?>>> messageWriters) {
return new WebClientStrategies() {
@Override
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
return checkForNull(messageReaders);
}
@Override
public Supplier<Stream<HttpMessageWriter<?>>> messageWriters() {
return checkForNull(messageWriters);
}
private <T> Supplier<Stream<T>> checkForNull(Supplier<Stream<T>> supplier) {
return supplier != null ? supplier : Stream::empty;
}
};
}
// Builder methods
/**
* Return a mutable builder for a {@code WebClientStrategies} with default initialization.
* @return the builder
*/
static Builder builder() {
DefaultWebClientStrategiesBuilder builder = new DefaultWebClientStrategiesBuilder();
builder.defaultConfiguration();
return builder;
}
/**
* Return a mutable builder based on the given {@linkplain ApplicationContext application context}.
* The returned builder will search for all {@link HttpMessageReader}, and
* {@link HttpMessageWriter} instances in the given application context and return them for
* {@link #messageReaders()}, and {@link #messageWriters()}.
* @param applicationContext the application context to base the strategies on
* @return the builder
*/
static Builder builder(ApplicationContext applicationContext) {
Assert.notNull(applicationContext, "ApplicationContext must not be null");
DefaultWebClientStrategiesBuilder builder = new DefaultWebClientStrategiesBuilder();
builder.applicationContext(applicationContext);
return builder;
}
/**
* Return a mutable, empty builder for a {@code WebClientStrategies}.
* @return the builder
*/
static Builder empty() {
return new DefaultWebClientStrategiesBuilder();
}
/**
* A mutable builder for a {@link WebClientStrategies}.
*/
interface Builder {
/**
* Add the given message reader to this builder.
* @param messageReader the message reader to add
* @return this builder
*/
Builder messageReader(HttpMessageReader<?> messageReader);
/**
* Add the given decoder to this builder. This is a convenient alternative to adding a
* {@link org.springframework.http.codec.DecoderHttpMessageReader} that wraps the given
* decoder.
* @param decoder the decoder to add
* @return this builder
*/
Builder decoder(Decoder<?> decoder);
/**
* Add the given message writer to this builder.
* @param messageWriter the message writer to add
* @return this builder
*/
Builder messageWriter(HttpMessageWriter<?> messageWriter);
/**
* Add the given encoder to this builder. This is a convenient alternative to adding a
* {@link org.springframework.http.codec.EncoderHttpMessageWriter} that wraps the given
* encoder.
* @param encoder the encoder to add
* @return this builder
*/
Builder encoder(Encoder<?> encoder);
/**
* Builds the {@link WebClientStrategies}.
* @return the built strategies
*/
WebClientStrategies build();
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.function.Consumer;
import org.springframework.http.HttpStatus;
/**
* Allows applying actions, such as extractors, on the result of an executed
* {@link WebClient} request.
*
* @author Brian Clozel
* @since 5.0
*/
public interface WebResponseActions {
/**
* Apply synchronous operations once the HTTP response status
* has been received.
*/
void doWithStatus(Consumer<HttpStatus> consumer);
/**
* Perform an extraction of the response body into a higher level representation.
*
* <pre class="code">
* static imports: ClientWebRequestBuilder.*, ResponseExtractors.*
*
* webClient
* .perform(get(url).accept(MediaType.TEXT_PLAIN))
* .extract(body(String.class));
* </pre>
*/
<T> T extract(ResponseExtractor<T> extractor);
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
/**
* Exception thrown when an HTTP 5xx is received.
*
* @author Brian Clozel
* @since 5.0
*/
@SuppressWarnings("serial")
public class WebServerErrorException extends WebClientResponseException {
/**
* Construct a new instance of {@code HttpServerErrorException} based on a
* {@link ClientHttpResponse} and {@link HttpMessageReader}s to optionally
* help decoding the response body
* @param response the HTTP response
* @param messageReaders the message converters that may decode the HTTP response body
*/
public WebServerErrorException(ClientHttpResponse response, List<HttpMessageReader<?>> messageReaders) {
super(initMessage(response), response, messageReaders);
}
private static String initMessage(ClientHttpResponse response) {
return response.getStatusCode().value() + " " + response.getStatusCode().getReasonPhrase();
}
}

View File

@ -1,6 +1,6 @@
/**
* Provides a reactive {@link org.springframework.web.client.reactive.WebClient}
* that builds on top of the
* {@code org.springframework.http.client.reactive} reactive HTTP adapter} layer.
* {@code org.springframework.http.client.reactive} reactive HTTP adapter layer.
*/
package org.springframework.web.client.reactive;

View File

@ -16,6 +16,8 @@
package org.springframework.http.server.reactive;
import java.time.Duration;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
@ -24,13 +26,11 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.BodyExtractors;
import org.springframework.tests.TestSubscriber;
import org.springframework.web.client.reactive.ClientWebRequestBuilders;
import org.springframework.web.client.reactive.ResponseExtractors;
import org.springframework.web.client.reactive.ClientRequest;
import org.springframework.web.client.reactive.WebClient;
import java.time.Duration;
/**
* @author Sebastien Deleuze
*/
@ -41,17 +41,19 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
@Before
public void setup() throws Exception {
super.setup();
this.webClient = new WebClient(new ReactorClientHttpConnector());
this.webClient = WebClient.create(new ReactorClientHttpConnector());
}
@Test
public void testFlushing() throws Exception {
ClientRequest<Void> request = ClientRequest.GET("http://localhost:" + port).build();
Mono<String> result = this.webClient
.perform(ClientWebRequestBuilders.get("http://localhost:" + port))
.extract(ResponseExtractors.bodyStream(String.class))
.takeUntil(s -> {
return s.endsWith("data1");
})
.exchange(request)
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.takeUntil(s -> s.endsWith("data1"))
.reduce((s1, s2) -> s1 + s2);
TestSubscriber

View File

@ -1,165 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
import static org.springframework.web.client.reactive.ResponseExtractors.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.tests.TestSubscriber;
import org.springframework.web.client.reactive.test.MockClientHttpRequest;
import org.springframework.web.client.reactive.test.MockClientHttpResponse;
/**
* @author Brian Clozel
*/
public class ClientHttpRequestInterceptorTests {
private MockClientHttpRequest mockRequest;
private MockClientHttpResponse mockResponse;
private MockClientHttpConnector mockClientHttpConnector;
private WebClient webClient;
@Before
public void setUp() throws Exception {
this.mockClientHttpConnector = new MockClientHttpConnector();
this.webClient = new WebClient(this.mockClientHttpConnector);
this.mockResponse = new MockClientHttpResponse();
this.mockResponse.setStatus(HttpStatus.OK);
this.mockResponse.getHeaders().setContentType(MediaType.TEXT_PLAIN);
this.mockResponse.setBody("Spring Framework");
}
@Test
public void shouldExecuteInterceptors() throws Exception {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();
interceptors.add(new NoOpInterceptor());
interceptors.add(new NoOpInterceptor());
interceptors.add(new NoOpInterceptor());
this.webClient.setInterceptors(interceptors);
Mono<String> result = this.webClient.perform(get("http://example.org/resource"))
.extract(body(String.class));
TestSubscriber.subscribe(result)
.assertNoError()
.assertValues("Spring Framework")
.assertComplete();
interceptors.stream().forEach(interceptor -> {
Assert.assertTrue(((NoOpInterceptor) interceptor).invoked);
});
}
@Test
public void shouldChangeRequest() throws Exception {
ClientHttpRequestInterceptor interceptor = new ClientHttpRequestInterceptor() {
@Override
public Mono<ClientHttpResponse> intercept(HttpMethod method, URI uri,
ClientHttpRequestInterceptionChain interception) {
return interception.intercept(HttpMethod.POST, URI.create("http://example.org/other"),
(request) -> {
request.getHeaders().set("X-Custom", "Spring Framework");
});
}
};
this.webClient.setInterceptors(Collections.singletonList(interceptor));
Mono<String> result = this.webClient.perform(get("http://example.org/resource"))
.extract(body(String.class));
TestSubscriber.subscribe(result)
.assertNoError()
.assertValues("Spring Framework")
.assertComplete();
assertThat(this.mockRequest.getMethod(), is(HttpMethod.POST));
assertThat(this.mockRequest.getURI().toString(), is("http://example.org/other"));
assertThat(this.mockRequest.getHeaders().getFirst("X-Custom"), is("Spring Framework"));
}
@Test
public void shouldShortCircuitConnector() throws Exception {
MockClientHttpResponse otherResponse = new MockClientHttpResponse();
otherResponse.setStatus(HttpStatus.OK);
otherResponse.setBody("Other content");
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();
interceptors.add((method, uri, interception) -> Mono.just(otherResponse));
interceptors.add(new NoOpInterceptor());
this.webClient.setInterceptors(interceptors);
Mono<String> result = this.webClient.perform(get("http://example.org/resource"))
.extract(body(String.class));
TestSubscriber.subscribe(result)
.assertNoError()
.assertValues("Other content")
.assertComplete();
assertFalse(((NoOpInterceptor) interceptors.get(1)).invoked);
}
private class MockClientHttpConnector implements ClientHttpConnector {
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
mockRequest = new MockClientHttpRequest(method, uri);
return requestCallback.apply(mockRequest).then(Mono.just(mockResponse));
}
}
private static class NoOpInterceptor implements ClientHttpRequestInterceptor {
public boolean invoked = false;
@Override
public Mono<ClientHttpResponse> intercept(HttpMethod method, URI uri,
ClientHttpRequestInterceptionChain interception) {
this.invoked = true;
return interception.intercept(method, uri, (request) -> { });
}
}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Base64.Encoder;
import org.junit.Test;
import org.springframework.http.HttpHeaders;
import static org.junit.Assert.*;
import static org.springframework.web.client.reactive.ClientWebRequestPostProcessors.*;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
/**
*
* @author Rob Winch
* @since 5.0
*/
public class ClientWebRequestPostProcessorsTests {
@Test
public void httpBasicWhenUsernamePasswordThenHeaderSet() {
ClientWebRequest request = get("/").apply(httpBasic("user", "password")).build();
assertEquals(request.getHttpHeaders().getFirst(HttpHeaders.AUTHORIZATION), basic("user:password"));
}
@Test
public void httpBasicWhenUsernameEmptyThenHeaderSet() {
ClientWebRequest request = get("/").apply(httpBasic("", "password")).build();
assertEquals(request.getHttpHeaders().getFirst(HttpHeaders.AUTHORIZATION), basic(":password"));
}
@Test
public void httpBasicWhenPasswordEmptyThenHeaderSet() {
ClientWebRequest request = get("/").apply(httpBasic("user", "")).build();
assertEquals(request.getHttpHeaders().getFirst(HttpHeaders.AUTHORIZATION), basic("user:"));
}
@Test(expected = IllegalArgumentException.class)
public void httpBasicWhenUsernameNullThenIllegalArgumentException() {
httpBasic(null, "password");
}
@Test(expected = IllegalArgumentException.class)
public void httpBasicWhenPasswordNullThenIllegalArgumentException() {
httpBasic("username", null);
}
private static String basic(String string) {
Encoder encoder = Base64.getEncoder();
byte[] bytes = string.getBytes(Charset.defaultCharset());
return "Basic " + encoder.encodeToString(bytes);
}
}

View File

@ -0,0 +1,215 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.codec.BodyInserter;
import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.web.client.reactive.test.MockClientHttpRequest;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author Arjen Poutsma
*/
public class DefaultClientRequestBuilderTests {
@Test
public void from() throws Exception {
ClientRequest<Void> other = ClientRequest.GET("http://example.com")
.header("foo", "bar")
.cookie("baz", "qux").build();
ClientRequest<Void> result = ClientRequest.from(other).build();
assertEquals(new URI("http://example.com"), result.url());
assertEquals(HttpMethod.GET, result.method());
assertEquals("bar", result.headers().getFirst("foo"));
assertEquals("qux", result.cookies().getFirst("baz"));
}
@Test
public void method() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.method(HttpMethod.DELETE, url).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.DELETE, result.method());
}
@Test
public void GET() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.GET(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.GET, result.method());
}
@Test
public void HEAD() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.HEAD(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.HEAD, result.method());
}
@Test
public void POST() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.POST(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.POST, result.method());
}
@Test
public void PUT() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.PUT(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.PUT, result.method());
}
@Test
public void PATCH() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.PATCH(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.PATCH, result.method());
}
@Test
public void DELETE() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.DELETE(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.DELETE, result.method());
}
@Test
public void OPTIONS() throws Exception {
URI url = new URI("http://example.com");
ClientRequest<Void> result = ClientRequest.OPTIONS(url.toString()).build();
assertEquals(url, result.url());
assertEquals(HttpMethod.OPTIONS, result.method());
}
@Test
public void accept() throws Exception {
MediaType json = MediaType.APPLICATION_JSON;
ClientRequest<Void> result = ClientRequest.GET("http://example.com").accept(json).build();
assertEquals(Collections.singletonList(json), result.headers().getAccept());
}
@Test
public void acceptCharset() throws Exception {
Charset charset = Charset.defaultCharset();
ClientRequest<Void> result = ClientRequest.GET("http://example.com")
.acceptCharset(charset).build();
assertEquals(Collections.singletonList(charset), result.headers().getAcceptCharset());
}
@Test
public void ifModifiedSince() throws Exception {
ZonedDateTime now = ZonedDateTime.now();
ClientRequest<Void> result = ClientRequest.GET("http://example.com")
.ifModifiedSince(now).build();
assertEquals(now.toInstant().toEpochMilli()/1000, result.headers().getIfModifiedSince()/1000);
}
@Test
public void ifNoneMatch() throws Exception {
ClientRequest<Void> result = ClientRequest.GET("http://example.com")
.ifNoneMatch("\"v2.7\"", "\"v2.8\"").build();
assertEquals(Arrays.asList("\"v2.7\"", "\"v2.8\""), result.headers().getIfNoneMatch());
}
@Test
public void cookie() throws Exception {
ClientRequest<Void> result = ClientRequest.GET("http://example.com")
.cookie("foo", "bar").build();
assertEquals("bar", result.cookies().getFirst("foo"));
}
@Test
public void build() throws Exception {
ClientRequest<Void> result = ClientRequest.GET("http://example.com")
.header("MyKey", "MyValue")
.cookie("foo", "bar")
.build();
MockClientHttpRequest request = new MockClientHttpRequest();
WebClientStrategies strategies = mock(WebClientStrategies.class);
result.writeTo(request, strategies).block();
assertEquals("MyValue", request.getHeaders().getFirst("MyKey"));
assertEquals("bar", request.getCookies().getFirst("foo").getValue());
assertNull(request.getBody());
}
@Test
public void bodyInserter() throws Exception {
String body = "foo";
Supplier<String> supplier = () -> body;
BiFunction<ClientHttpRequest, BodyInserter.Context, Mono<Void>> writer =
(response, strategies) -> {
byte[] bodyBytes = body.getBytes(UTF_8);
ByteBuffer byteBuffer = ByteBuffer.wrap(bodyBytes);
DataBuffer buffer = new DefaultDataBufferFactory().wrap(byteBuffer);
return response.writeWith(Mono.just(buffer));
};
ClientRequest<String> result = ClientRequest.POST("http://example.com")
.body(BodyInserter.of(writer, supplier));
assertEquals(body, result.body());
MockClientHttpRequest request = new MockClientHttpRequest();
List<HttpMessageWriter<?>> messageWriters = new ArrayList<>();
messageWriters.add(new EncoderHttpMessageWriter<CharSequence>(new CharSequenceEncoder()));
WebClientStrategies strategies = mock(WebClientStrategies.class);
when(strategies.messageWriters()).thenReturn(messageWriters::stream);
result.writeTo(request, strategies).block();
assertNotNull(request.getBody());
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRange;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.HttpMessageReader;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.http.codec.BodyExtractors.toMono;
/**
* @author Arjen Poutsma
*/
public class DefaultClientResponseTests {
private ClientHttpResponse mockResponse;
private WebClientStrategies mockWebClientStrategies;
private DefaultClientResponse defaultClientResponse;
@Before
public void createMocks() {
mockResponse = mock(ClientHttpResponse.class);
mockWebClientStrategies = mock(WebClientStrategies.class);
defaultClientResponse = new DefaultClientResponse(mockResponse, mockWebClientStrategies);
}
@Test
public void statusCode() throws Exception {
HttpStatus status = HttpStatus.CONTINUE;
when(mockResponse.getStatusCode()).thenReturn(status);
assertEquals(status, defaultClientResponse.statusCode());
}
@Test
public void header() throws Exception {
HttpHeaders httpHeaders = new HttpHeaders();
long contentLength = 42L;
httpHeaders.setContentLength(contentLength);
MediaType contentType = MediaType.TEXT_PLAIN;
httpHeaders.setContentType(contentType);
InetSocketAddress host = InetSocketAddress.createUnresolved("localhost", 80);
httpHeaders.setHost(host);
List<HttpRange> range = Collections.singletonList(HttpRange.createByteRange(0, 42));
httpHeaders.setRange(range);
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
ClientResponse.Headers headers = defaultClientResponse.headers();
assertEquals(OptionalLong.of(contentLength), headers.contentLength());
assertEquals(Optional.of(contentType), headers.contentType());
assertEquals(httpHeaders, headers.asHttpHeaders());
}
@Test
public void body() throws Exception {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
when(mockResponse.getBody()).thenReturn(body);
Set<HttpMessageReader<?>> messageReaders = Collections
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()));
when(mockWebClientStrategies.messageReaders()).thenReturn(messageReaders::stream);
Mono<String> resultMono = defaultClientResponse.body(toMono(String.class));
assertEquals("foo", resultMono.block());
}
}

View File

@ -1,99 +0,0 @@
package org.springframework.web.client.reactive;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.tests.TestSubscriber;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.springframework.web.client.reactive.ResponseExtractors.as;
/**
* Unit tests for {@link DefaultResponseErrorHandler}.
*
* @author Brian Clozel
*/
public class DefaultResponseErrorHandlerTests {
private DefaultResponseErrorHandler errorHandler;
private ClientHttpResponse response;
private List<HttpMessageReader<?>> messageReaders;
@Before
public void setUp() throws Exception {
this.errorHandler = new DefaultResponseErrorHandler();
this.response = mock(ClientHttpResponse.class);
this.messageReaders = Collections
.singletonList(new DecoderHttpMessageReader<>(new StringDecoder()));
}
@Test
public void noError() throws Exception {
given(this.response.getStatusCode()).willReturn(HttpStatus.OK);
this.errorHandler.handleError(this.response, this.messageReaders);
}
@Test
public void clientError() throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.TEXT_PLAIN);
DataBuffer buffer = new DefaultDataBufferFactory().allocateBuffer();
buffer.write(new String("Page Not Found").getBytes("UTF-8"));
given(this.response.getStatusCode()).willReturn(HttpStatus.NOT_FOUND);
given(this.response.getHeaders()).willReturn(headers);
given(this.response.getBody()).willReturn(Flux.just(buffer));
try {
this.errorHandler.handleError(this.response, this.messageReaders);
fail("expected HttpClientErrorException");
}
catch (WebClientErrorException exc) {
assertThat(exc.getMessage(), is("404 Not Found"));
assertThat(exc.getStatus(), is(HttpStatus.NOT_FOUND));
TestSubscriber.subscribe(exc.getResponseBody(as(String.class)))
.awaitAndAssertNextValues("Page Not Found")
.assertComplete();
}
}
@Test
public void serverError() throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.TEXT_PLAIN);
DataBuffer buffer = new DefaultDataBufferFactory().allocateBuffer();
buffer.write(new String("Internal Server Error").getBytes("UTF-8"));
given(this.response.getStatusCode()).willReturn(HttpStatus.INTERNAL_SERVER_ERROR);
given(this.response.getHeaders()).willReturn(headers);
given(this.response.getBody()).willReturn(Flux.just(buffer));
try {
this.errorHandler.handleError(this.response, this.messageReaders);
fail("expected HttpServerErrorException");
}
catch (WebServerErrorException exc) {
assertThat(exc.getMessage(), is("500 Internal Server Error"));
assertThat(exc.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR));
TestSubscriber.subscribe(exc.getResponseBody(as(String.class)))
.awaitAndAssertNextValues("Internal Server Error")
.assertComplete();
}
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.http.HttpMethod;
/**
*
* @author Rob Winch
*/
public class DefaultWebRequestBuilderTests {
private DefaultClientWebRequestBuilder builder;
@Before
public void setup() {
builder = new DefaultClientWebRequestBuilder(HttpMethod.GET, "https://example.com/foo");
}
@Test
public void apply() {
ClientWebRequestPostProcessor postProcessor = mock(ClientWebRequestPostProcessor.class);
when(postProcessor.postProcess(any(ClientWebRequest.class))).thenAnswer(new Answer<ClientWebRequest>() {
@Override
public ClientWebRequest answer(InvocationOnMock invocation) throws Throwable {
return (ClientWebRequest) invocation.getArguments()[0];
}
});
ClientWebRequest webRequest = builder.apply(postProcessor).build();
verify(postProcessor).postProcess(webRequest);
}
@Test(expected = IllegalArgumentException.class)
public void applyNullPostProcessorThrowsIllegalArgumentException() {
builder.apply(null);
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpHeaders;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* @author Arjen Poutsma
*/
public class ExchangeFilterFunctionsTests {
@Test
public void andThen() throws Exception {
ClientRequest<Void> request = ClientRequest.GET("http://example.com").build();
ClientResponse response = mock(ClientResponse.class);
ExchangeFunction exchange = r -> Mono.just(response);
boolean[] filtersInvoked = new boolean[2];
ExchangeFilterFunction filter1 = (r, n) -> {
assertFalse(filtersInvoked[0]);
assertFalse(filtersInvoked[1]);
filtersInvoked[0] = true;
assertFalse(filtersInvoked[1]);
return n.exchange(r);
};
ExchangeFilterFunction filter2 = (r, n) -> {
assertTrue(filtersInvoked[0]);
assertFalse(filtersInvoked[1]);
filtersInvoked[1] = true;
return n.exchange(r);
};
ExchangeFilterFunction filter = filter1.andThen(filter2);
ClientResponse result = filter.filter(request, exchange).block();
assertEquals(response, result);
assertTrue(filtersInvoked[0]);
assertTrue(filtersInvoked[1]);
}
@Test
public void apply() throws Exception {
ClientRequest<Void> request = ClientRequest.GET("http://example.com").build();
ClientResponse response = mock(ClientResponse.class);
ExchangeFunction exchange = r -> Mono.just(response);
boolean[] filterInvoked = new boolean[1];
ExchangeFilterFunction filter = (r, n) -> {
assertFalse(filterInvoked[0]);
filterInvoked[0] = true;
return n.exchange(r);
};
ExchangeFunction filteredExchange = filter.apply(exchange);
ClientResponse result = filteredExchange.exchange(request).block();
assertEquals(response, result);
assertTrue(filterInvoked[0]);
}
@Test
public void basicAuthentication() throws Exception {
ClientRequest<Void> request = ClientRequest.GET("http://example.com").build();
ClientResponse response = mock(ClientResponse.class);
ExchangeFunction exchange = r -> {
assertTrue(r.headers().containsKey(HttpHeaders.AUTHORIZATION));
assertTrue(r.headers().getFirst(HttpHeaders.AUTHORIZATION).startsWith("Basic "));
return Mono.just(response);
};
ExchangeFilterFunction auth = ExchangeFilterFunctions.basicAuthentication("foo", "bar");
assertFalse(request.headers().containsKey(HttpHeaders.AUTHORIZATION));
ClientResponse result = auth.filter(request, exchange).block();
assertEquals(response, result);
}
}

View File

@ -1,226 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.tests.TestSubscriber;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.mockito.BDDMockito.eq;
import static org.mockito.BDDMockito.*;
import static org.mockito.Mockito.mock;
/**
* Unit tests for {@link ResponseExtractors}.
*
* @author Brian Clozel
*/
public class ResponseExtractorsTests {
private HttpHeaders headers;
private ClientHttpResponse response;
private List<HttpMessageReader<?>> messageReaders;
private WebClientConfig webClientConfig;
private ResponseErrorHandler errorHandler;
@Before
public void setup() throws Exception {
this.headers = new HttpHeaders();
this.response = mock(ClientHttpResponse.class);
given(this.response.getHeaders()).willReturn(headers);
this.messageReaders = Arrays.asList(
new DecoderHttpMessageReader<>(new StringDecoder()),
new DecoderHttpMessageReader<>(new Jackson2JsonDecoder()));
this.webClientConfig = mock(WebClientConfig.class);
this.errorHandler = mock(ResponseErrorHandler.class);
given(this.webClientConfig.getMessageReaders()).willReturn(this.messageReaders);
given(this.webClientConfig.getResponseErrorHandler()).willReturn(this.errorHandler);
}
@Test
public void shouldExtractResponseEntityMono() throws Exception {
this.headers.setContentType(MediaType.TEXT_PLAIN);
given(this.response.getStatusCode()).willReturn(HttpStatus.OK);
given(this.response.getBody()).willReturn(createFluxBody("test content"));
Mono<ResponseEntity<String>> result = ResponseExtractors.response(String.class)
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.awaitAndAssertNextValuesWith(entity -> {
assertThat(entity.getStatusCode(), is(HttpStatus.OK));
assertThat(entity.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN));
assertThat(entity.getBody(), is("test content"));
})
.assertComplete();
}
@Test
public void shouldExtractResponseEntityFlux() throws Exception {
this.headers.setContentType(MediaType.TEXT_PLAIN);
given(this.response.getStatusCode()).willReturn(HttpStatus.OK);
given(this.response.getBody()).willReturn(createFluxBody("test", " content"));
Mono<ResponseEntity<String>> result = ResponseExtractors.response(String.class)
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.awaitAndAssertNextValuesWith(entity -> {
assertThat(entity.getStatusCode(), is(HttpStatus.OK));
assertThat(entity.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN));
assertThat(entity.getBody(), is("test content"));
})
.assertComplete();
}
@Test
public void shouldExtractResponseEntityWithEmptyBody() throws Exception {
given(this.response.getStatusCode()).willReturn(HttpStatus.NO_CONTENT);
given(this.response.getBody()).willReturn(Flux.empty());
Mono<ResponseEntity<String>> result = ResponseExtractors.response(String.class)
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.awaitAndAssertNextValuesWith(entity -> {
assertThat(entity.getStatusCode(), is(HttpStatus.NO_CONTENT));
assertNull(entity.getBody());
})
.assertComplete();
}
@Test
public void shouldExtractResponseEntityAsStream() throws Exception {
this.headers.setContentType(MediaType.TEXT_PLAIN);
given(this.response.getStatusCode()).willReturn(HttpStatus.OK);
given(this.response.getBody()).willReturn(createFluxBody("test", " content"));
Mono<ResponseEntity<Flux<String>>> result = ResponseExtractors.responseStream(String.class)
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.awaitAndAssertNextValuesWith(entity -> {
assertThat(entity.getStatusCode(), is(HttpStatus.OK));
assertThat(entity.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN));
TestSubscriber.subscribe(entity.getBody())
.awaitAndAssertNextValues("test", " content")
.assertComplete();
})
.assertComplete();
}
@Test
public void shouldGetErrorWhenExtractingWithMissingConverter() throws Exception {
this.headers.setContentType(MediaType.APPLICATION_XML);
given(this.response.getStatusCode()).willReturn(HttpStatus.OK);
given(this.response.getBody()).willReturn(createFluxBody("test content"));
Mono<ResponseEntity<SomePojo>> result = ResponseExtractors.response(SomePojo.class)
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.assertErrorWith(t -> {
assertThat(t, instanceOf(WebClientException.class));
WebClientException exc = (WebClientException) t;
assertThat(exc.getMessage(), containsString("Could not decode response body of type 'application/xml'"));
assertThat(exc.getMessage(), containsString("$SomePojo"));
});
}
@Test
public void shouldExtractResponseHeaders() throws Exception {
this.headers.setContentType(MediaType.TEXT_PLAIN);
this.headers.setETag("\"Spring\"");
given(this.response.getStatusCode()).willReturn(HttpStatus.OK);
Mono<HttpHeaders> result = ResponseExtractors.headers()
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.awaitAndAssertNextValuesWith(headers -> {
assertThat(headers.getContentType(), is(MediaType.TEXT_PLAIN));
assertThat(headers.getETag(), is("\"Spring\""));
})
.assertComplete();
}
@Test
public void shouldExecuteResponseHandler() throws Exception {
this.headers.setContentType(MediaType.TEXT_PLAIN);
given(this.response.getStatusCode()).willReturn(HttpStatus.NOT_FOUND);
given(this.response.getBody()).willReturn(createFluxBody("test", " content"));
Mono<String> result = ResponseExtractors.body(String.class)
.extract(Mono.just(this.response), this.webClientConfig);
TestSubscriber.subscribe(result)
.assertValueCount(1)
.assertComplete();
then(this.errorHandler).should().handleError(eq(this.response), eq(this.messageReaders));
}
private Flux<DataBuffer> createFluxBody(String... items) throws Exception {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
return Flux.just(items)
.map(item -> {
DataBuffer buffer = factory.allocateBuffer();
try {
buffer.write(new String(item).getBytes("UTF-8"));
}
catch (UnsupportedEncodingException exc) {
Exceptions.propagate(exc);
}
return buffer;
});
}
protected class SomePojo {
public String foo;
}
}

View File

@ -16,12 +16,7 @@
package org.springframework.web.client.reactive;
import static org.junit.Assert.*;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
import static org.springframework.web.client.reactive.ResponseExtractors.*;
import java.time.Duration;
import java.util.function.Consumer;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse;
@ -35,12 +30,19 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.BodyExtractors;
import org.springframework.http.codec.BodyInserters;
import org.springframework.http.codec.Pojo;
import org.springframework.tests.TestSubscriber;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.springframework.http.codec.BodyExtractors.toFlux;
import static org.springframework.http.codec.BodyExtractors.toMono;
/**
* {@link WebClient} integration tests with the {@code Flux} and {@code Mono} API.
*
@ -55,18 +57,18 @@ public class WebClientIntegrationTests {
@Before
public void setup() {
this.server = new MockWebServer();
this.webClient = new WebClient(new ReactorClientHttpConnector());
this.webClient = WebClient.create(new ReactorClientHttpConnector());
}
@Test
public void shouldGetHeaders() throws Exception {
public void headers() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString()).build();
Mono<HttpHeaders> result = this.webClient
.perform(get(baseUrl.toString()))
.extract(headers());
.exchange(request)
.map(response -> response.headers().asHttpHeaders());
TestSubscriber
.subscribe(result)
@ -77,116 +79,101 @@ public class WebClientIntegrationTests {
})
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
assertEquals("*/*", recordedRequest.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", recordedRequest.getPath());
}
@Test
public void shouldGetPlainTextResponseAsObject() throws Exception {
public void plainText() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setBody("Hello Spring!"));
Mono<String> result = this.webClient
.perform(get(baseUrl.toString())
.header("X-Test-Header", "testvalue"))
.extract(body(String.class));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString())
.header("X-Test-Header", "testvalue")
.build();
Mono<String> result = this.webClient
.exchange(request)
.then(response -> response.body(toMono(String.class)));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValues("Hello Spring!")
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("testvalue", request.getHeader("X-Test-Header"));
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
assertEquals("testvalue", recordedRequest.getHeader("X-Test-Header"));
assertEquals("*/*", recordedRequest.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", recordedRequest.getPath());
}
@Test
public void shouldGetPlainTextResponse() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
Mono<ResponseEntity<String>> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.TEXT_PLAIN))
.extract(response(String.class));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValuesWith((Consumer<ResponseEntity<String>>) response -> {
assertEquals(200, response.getStatusCode().value());
assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType());
assertEquals("Hello Spring!", response.getBody());
});
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/greeting?name=Spring", request.getPath());
assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsMonoOfString() throws Exception {
public void jsonString() throws Exception {
HttpUrl baseUrl = server.url("/json");
String content = "{\"bar\":\"barbar\",\"foo\":\"foofoo\"}";
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody(content));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON)
.build();
Mono<String> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(body(String.class));
.exchange(request)
.then(response -> response.body(toMono(String.class)));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValues(content)
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/json", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/json", recordedRequest.getPath());
assertEquals("application/json", recordedRequest.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsMonoOfPojo() throws Exception {
public void jsonPojoMono() throws Exception {
HttpUrl baseUrl = server.url("/pojo");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody("{\"bar\":\"barbar\",\"foo\":\"foofoo\"}"));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON)
.build();
Mono<Pojo> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
.exchange(request)
.then(response -> response.body(toMono(Pojo.class)));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValuesWith(p -> assertEquals("barbar", p.getBar()))
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojo", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/pojo", recordedRequest.getPath());
assertEquals("application/json", recordedRequest.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsFluxOfPojos() throws Exception {
public void jsonPojoFlux() throws Exception {
HttpUrl baseUrl = server.url("/pojos");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody("[{\"bar\":\"bar1\",\"foo\":\"foo1\"},{\"bar\":\"bar2\",\"foo\":\"foo2\"}]"));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON)
.build();
Flux<Pojo> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(bodyStream(Pojo.class));
.exchange(request)
.flatMap(response -> response.body(toFlux(Pojo.class)));
TestSubscriber
.subscribe(result)
@ -195,153 +182,124 @@ public class WebClientIntegrationTests {
p -> assertThat(p.getBar(), Matchers.is("bar2")))
.assertValueCount(2)
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojos", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/pojos", recordedRequest.getPath());
assertEquals("application/json", recordedRequest.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsResponseOfPojosStream() throws Exception {
HttpUrl baseUrl = server.url("/pojos");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody("[{\"bar\":\"bar1\",\"foo\":\"foo1\"},{\"bar\":\"bar2\",\"foo\":\"foo2\"}]"));
Mono<ResponseEntity<Flux<Pojo>>> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(responseStream(Pojo.class));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValuesWith(
response -> {
assertEquals(200, response.getStatusCode().value());
assertEquals(MediaType.APPLICATION_JSON, response.getHeaders().getContentType());
})
.assertComplete();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojos", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldPostPojoAsJson() throws Exception {
public void postJsonPojo() throws Exception {
HttpUrl baseUrl = server.url("/pojo/capitalize");
this.server.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
Pojo spring = new Pojo("foofoo", "barbar");
ClientRequest<Pojo> request = ClientRequest.POST(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(spring));
Mono<Pojo> result = this.webClient
.perform(post(baseUrl.toString())
.body(spring)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
.exchange(request)
.then(response -> response.body(BodyExtractors.toMono(Pojo.class)));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValuesWith(p -> assertEquals("BARBAR", p.getBar()))
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojo/capitalize", request.getPath());
assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8());
assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING));
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE));
assertEquals("/pojo/capitalize", recordedRequest.getPath());
assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", recordedRequest.getBody().readUtf8());
assertEquals("chunked", recordedRequest.getHeader(HttpHeaders.TRANSFER_ENCODING));
assertEquals("application/json", recordedRequest.getHeader(HttpHeaders.ACCEPT));
assertEquals("application/json", recordedRequest.getHeader(HttpHeaders.CONTENT_TYPE));
}
@Test
public void shouldSendCookieHeader() throws Exception {
public void cookies() throws Exception {
HttpUrl baseUrl = server.url("/test");
this.server.enqueue(new MockResponse()
.setHeader("Content-Type", "text/plain").setBody("test"));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString())
.cookie("testkey", "testvalue")
.build();
Mono<String> result = this.webClient
.perform(get(baseUrl.toString())
.cookie("testkey", "testvalue"))
.extract(body(String.class));
.exchange(request)
.then(response -> response.body(toMono(String.class)));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValues("test")
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/test", request.getPath());
assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE));
assertEquals("/test", recordedRequest.getPath());
assertEquals("testkey=testvalue", recordedRequest.getHeader(HttpHeaders.COOKIE));
}
@Test
public void shouldGetErrorWhen404() throws Exception {
public void notFound() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setResponseCode(404)
.setHeader("Content-Type", "text/plain").setBody("Not Found"));
Mono<String> result = this.webClient
.perform(get(baseUrl.toString()))
.extract(body(String.class));
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString()).build();
Mono<ClientResponse> result = this.webClient
.exchange(request);
TestSubscriber
.subscribe(result)
.await(Duration.ofSeconds(3))
.assertErrorWith(t -> {
assertThat(t, Matchers.instanceOf(WebClientErrorException.class));
WebClientErrorException exc = (WebClientErrorException) t;
assertEquals(404, exc.getStatus().value());
assertEquals(MediaType.TEXT_PLAIN, exc.getResponseHeaders().getContentType());
Mono<String> body = exc.getResponseBody(as(String.class));
TestSubscriber.subscribe(body)
.awaitAndAssertNextValues("Not Found")
.assertComplete();
.assertValuesWith(response -> {
assertEquals(HttpStatus.NOT_FOUND, response.statusCode());
});
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
assertEquals("*/*", recordedRequest.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", recordedRequest.getPath());
}
@Test
public void shouldGetErrorWhen500() throws Exception {
public void filter() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setResponseCode(500)
.setHeader("Content-Type", "text/plain").setBody("Server Error"));
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
Mono<String> result = this.webClient
.perform(get(baseUrl.toString()))
.extract(body(String.class));
ExchangeFilterFunction filter = (request, next) -> {
ClientRequest<?> filteredRequest = ClientRequest.from(request)
.header("foo", "bar").build();
return next.exchange(filteredRequest);
};
WebClient filteredClient = WebClient.builder(new ReactorClientHttpConnector())
.filter(filter).build();
ClientRequest<Void> request = ClientRequest.GET(baseUrl.toString()).build();
Mono<String> result = filteredClient.exchange(request)
.then(response -> response.body(toMono(String.class)));
TestSubscriber
.subscribe(result)
.await(Duration.ofSeconds(3))
.assertErrorWith(t -> {
assertThat(t, Matchers.instanceOf(WebServerErrorException.class));
WebServerErrorException exc = (WebServerErrorException) t;
assertEquals(500, exc.getStatus().value());
assertEquals(MediaType.TEXT_PLAIN, exc.getResponseHeaders().getContentType());
});
.awaitAndAssertNextValues("Hello Spring!")
.assertComplete();
RecordedRequest request = server.takeRequest();
RecordedRequest recordedRequest = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
assertEquals("bar", recordedRequest.getHeader("foo"));
}
@After
public void tearDown() throws Exception {
this.server.shutdown();
}
}

View File

@ -0,0 +1,132 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* @author Arjen Poutsma
*/
public class WebClientStrategiesTests {
@Test
public void empty() {
WebClientStrategies strategies = WebClientStrategies.empty().build();
assertEquals(Optional.empty(), strategies.messageReaders().get().findFirst());
assertEquals(Optional.empty(), strategies.messageWriters().get().findFirst());
}
@Test
public void ofSuppliers() {
HttpMessageReader<?> messageReader = new DummyMessageReader();
HttpMessageWriter<?> messageWriter = new DummyMessageWriter();
WebClientStrategies strategies = WebClientStrategies.of(
() -> Stream.of(messageReader),
() -> Stream.of(messageWriter));
assertEquals(1L, strategies.messageReaders().get().collect(Collectors.counting()).longValue());
assertEquals(Optional.of(messageReader), strategies.messageReaders().get().findFirst());
assertEquals(1L, strategies.messageWriters().get().collect(Collectors.counting()).longValue());
assertEquals(Optional.of(messageWriter), strategies.messageWriters().get().findFirst());
}
@Test
public void toConfiguration() throws Exception {
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("messageWriter", DummyMessageWriter.class);
applicationContext.registerSingleton("messageReader", DummyMessageReader.class);
applicationContext.refresh();
WebClientStrategies strategies = WebClientStrategies.of(applicationContext);
assertTrue(strategies.messageReaders().get()
.allMatch(r -> r instanceof DummyMessageReader));
assertTrue(strategies.messageWriters().get()
.allMatch(r -> r instanceof DummyMessageWriter));
}
private static class DummyMessageWriter implements HttpMessageWriter<Object> {
@Override
public boolean canWrite(ResolvableType type, MediaType mediaType) {
return false;
}
@Override
public List<MediaType> getWritableMediaTypes() {
return Collections.emptyList();
}
@Override
public Mono<Void> write(Publisher<?> inputStream, ResolvableType type,
MediaType contentType,
ReactiveHttpOutputMessage outputMessage,
Map<String, Object> hints) {
return Mono.empty();
}
}
private static class DummyMessageReader implements HttpMessageReader<Object> {
@Override
public boolean canRead(ResolvableType type, MediaType mediaType) {
return false;
}
@Override
public List<MediaType> getReadableMediaTypes() {
return Collections.emptyList();
}
@Override
public Flux<Object> read(ResolvableType type, ReactiveHttpInputMessage inputMessage,
Map<String, Object> hints) {
return Flux.empty();
}
@Override
public Mono<Object> readMono(ResolvableType type, ReactiveHttpInputMessage inputMessage,
Map<String, Object> hints) {
return Mono.empty();
}
}
}