Refactor HTTP client contracts

This commit refactors the `ClientHttpRequestFactory` into an
`ClientHttpConnector` abstraction, in order to reflect that
`ClientHttpRequest`s only "exist" once the client is connected
to the origin server.

This is why the HTTP client is now callback-based, containing all
interactions with the request within a
`Function<ClientHttpRequest,Mono<Void>>` that signals when it's done
writing to the request.

The `ClientHttpRequest` contract also adopts `setComplete()`
and promotes that method to the `ReactiveHttpOutputMessage` contract.

This commit also adapts all other APIs to that change and fixes a few
issues, including:

* use `HttpMessageConverter`s instead of `Encoders`/`Decoders`
* better handle type information about request content publishers
* support client cookies in HTTP requests
* temporarily remove the RxNetty client support
This commit is contained in:
Brian Clozel 2016-07-08 11:40:12 +02:00
parent b5bce1f017
commit 4892436efe
30 changed files with 942 additions and 1221 deletions

View File

@ -62,4 +62,15 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
*/
DataBufferFactory bufferFactory();
/**
* Indicate that message handling is complete, allowing for any cleanup or
* end-of-processing tasks to be performed such as applying header changes
* made via {@link #getHeaders()} to the underlying HTTP message (if not
* applied already).
* <p>This method should be automatically invoked at the end of message
* processing so typically applications should not have to invoke it.
* If invoked multiple times it should have no side effects.
*/
Mono<Void> setComplete();
}

View File

@ -46,13 +46,13 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
private final List<Supplier<? extends Mono<Void>>> beforeCommitActions = new ArrayList<>(4);
public AbstractClientHttpRequest(HttpHeaders httpHeaders) {
if (httpHeaders == null) {
this.headers = new HttpHeaders();
}
else {
this.headers = httpHeaders;
}
public AbstractClientHttpRequest() {
this(new HttpHeaders());
}
public AbstractClientHttpRequest(HttpHeaders headers) {
Assert.notNull(headers);
this.headers = headers;
this.cookies = new LinkedMultiValueMap<>();
}
@ -85,8 +85,8 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
})
.then(() -> {
this.state.set(State.COMITTED);
//writeHeaders();
//writeCookies();
writeHeaders();
writeCookies();
return Mono.empty();
});
}
@ -99,5 +99,9 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
this.beforeCommitActions.add(action);
}
protected abstract void writeHeaders();
protected abstract void writeCookies();
private enum State {NEW, COMMITTING, COMITTED}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
/**
* Client abstraction for HTTP client runtimes.
* {@link ClientHttpConnector} drives the underlying HTTP client implementation
* so as to connect to the origin server and provide all the necessary infrastructure
* to send the actual {@link ClientHttpRequest} and receive the {@link ClientHttpResponse}
*
* @author Brian Clozel
*/
public interface ClientHttpConnector {
/**
* Connect to the origin server using the given {@code HttpMethod} and {@code URI},
* then apply the given {@code requestCallback} on the {@link ClientHttpRequest}
* once the connection has been established.
* <p>Return a publisher of the {@link ClientHttpResponse}.
*
* @param method the HTTP request method
* @param uri the HTTP request URI
* @param requestCallback a function that prepares and writes the request,
* returning a publisher that signals when it's done interacting with the request.
* Implementations should return a {@code Mono<Void>} by calling
* {@link ClientHttpRequest#writeWith} or {@link ClientHttpRequest#setComplete}.
* @return a publisher of the {@link ClientHttpResponse}
*/
Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback);
}

View File

@ -18,8 +18,6 @@ package org.springframework.http.client.reactive;
import java.net.URI;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpMethod;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -48,14 +46,4 @@ public interface ClientHttpRequest extends ReactiveHttpOutputMessage {
*/
MultiValueMap<String, HttpCookie> getCookies();
/**
* Execute this request, resulting in a reactive stream of a single
* {@link org.springframework.http.client.ClientHttpResponse}.
*
* @return a {@code Mono<ClientHttpResponse>} that signals when the the response
* status and headers have been received. The response body is made available with
* a separate Publisher within the {@code ClientHttpResponse}.
*/
Mono<ClientHttpResponse> execute();
}
}

View File

@ -13,31 +13,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import reactor.core.publisher.Mono;
/**
* Factory for {@link ClientHttpRequest} objects.
* Reactor-Netty implementation of {@link ClientHttpConnector}
*
* @author Brian Clozel
*/
public interface ClientHttpRequestFactory {
public class ReactorClientHttpConnector implements ClientHttpConnector {
/**
* Create a new {@link ClientHttpRequest} for the specified HTTP method, URI and headers
* <p>The returned request can be {@link ClientHttpRequest#writeWith(Publisher) written to},
* and then executed by calling {@link ClientHttpRequest#execute()}
*
* @param httpMethod the HTTP method to execute
* @param uri the URI to create a request for
* @param headers the HTTP request headers
*/
ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers);
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
}
return reactor.io.netty.http.HttpClient.create(uri.getHost(), uri.getPort())
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()),
uri.toString(),
httpOutbound -> requestCallback.apply(new ReactorClientHttpRequest(method, uri, httpOutbound)))
.map(httpInbound -> new ReactorClientHttpResponse(httpInbound));
}
}

View File

@ -17,7 +17,6 @@
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -26,45 +25,40 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.netty.http.HttpClient;
import reactor.io.netty.http.HttpClientRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpMethod;
/**
* {@link ClientHttpRequest} implementation for the Reactor Net HTTP client
* {@link ClientHttpRequest} implementation for the Reactor-Netty HTTP client
*
* @author Brian Clozel
* @see HttpClient
* @see reactor.io.netty.http.HttpClient
*/
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
private final DataBufferFactory dataBufferFactory;
private final HttpMethod httpMethod;
private final URI uri;
private final HttpClient httpClient;
private final HttpClientRequest httpRequest;
private Flux<ByteBuf> body;
private final NettyDataBufferFactory bufferFactory;
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers) {
super(headers);
//FIXME use Netty factory
this.dataBufferFactory = new DefaultDataBufferFactory();
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClientRequest httpRequest) {
this.httpMethod = httpMethod;
this.uri = uri;
this.httpClient = httpClient;
this.httpRequest = httpRequest;
this.bufferFactory = new NettyDataBufferFactory(httpRequest.delegate().alloc());
}
@Override
public DataBufferFactory bufferFactory() {
return this.dataBufferFactory;
return this.bufferFactory;
}
@Override
@ -77,51 +71,15 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
return this.uri;
}
/**
* Set the body of the message to the given {@link Publisher}.
*
* <p>Since the HTTP channel is not yet created when this method
* is called, the {@code Mono<Void>} return value completes immediately.
* For an event that signals that we're done writing the request, check the
* {@link #execute()} method.
*
* @return a publisher that completes immediately.
* @see #execute()
*/
@Override
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
this.body = Flux.from(body).map(this::toByteBuf);
return Mono.empty();
return applyBeforeCommit()
.then(httpRequest.send(Flux.from(body).map(this::toByteBuf)));
}
@Override
public Mono<ClientHttpResponse> execute() {
return this.httpClient.request(new io.netty.handler.codec.http.HttpMethod(httpMethod.toString()), uri.toString(),
channel -> {
// see https://github.com/reactor/reactor-io/pull/8
if (body == null) {
channel.removeTransferEncodingChunked();
}
return applyBeforeCommit()
.then(() -> {
getHeaders().entrySet().stream().forEach(e ->
channel.headers().set(e.getKey(), e.getValue()));
getCookies().values().stream().flatMap(Collection::stream).forEach(cookie ->
channel.addCookie(new DefaultCookie(cookie.getName(), cookie.getValue())));
return Mono.empty();
})
.then(() -> {
if (body != null) {
return channel.send(body);
}
else {
return channel.sendHeaders();
}
});
}).map(httpChannel -> new ReactorClientHttpResponse(httpChannel,
dataBufferFactory));
public Mono<Void> setComplete() {
return applyBeforeCommit().then(httpRequest.sendHeaders());
}
private ByteBuf toByteBuf(DataBuffer buffer) {
@ -133,5 +91,18 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
}
}
}
@Override
protected void writeHeaders() {
getHeaders().entrySet().stream()
.forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue()));
}
@Override
protected void writeCookies() {
getCookies().values()
.stream().flatMap(cookies -> cookies.stream())
.map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue()))
.forEach(cookie -> this.httpRequest.addCookie(cookie));
}
}

View File

@ -17,12 +17,15 @@
package org.springframework.http.client.reactive;
import java.util.Collection;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Flux;
import reactor.io.netty.http.HttpInbound;
import reactor.io.netty.http.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
@ -31,44 +34,47 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* {@link ClientHttpResponse} implementation for the Reactor Net HTTP client
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client
*
* @author Brian Clozel
* @see reactor.io.netty.http.HttpClient
*/
public class ReactorClientHttpResponse implements ClientHttpResponse {
private final DataBufferFactory dataBufferFactory;
private final NettyDataBufferFactory dataBufferFactory;
private final HttpInbound channel;
private final HttpClientResponse response;
public ReactorClientHttpResponse(HttpInbound channel,
DataBufferFactory dataBufferFactory) {
this.dataBufferFactory = dataBufferFactory;
this.channel = channel;
public ReactorClientHttpResponse(HttpClientResponse response) {
this.response = response;
this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc());
}
@Override
public Flux<DataBuffer> getBody() {
return channel.receiveByteBuffer().map(dataBufferFactory::wrap);
return response.receive()
.map(buf -> {
buf.retain();
return dataBufferFactory.wrap(buf);
});
}
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
this.channel.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
this.response.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
return headers;
}
@Override
public HttpStatus getStatusCode() {
return HttpStatus.valueOf(this.channel.status().code());
return HttpStatus.valueOf(this.response.status().code());
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
this.channel.cookies().values().stream().flatMap(Collection::stream)
this.response.cookies().values().stream().flatMap(Collection::stream)
.forEach(cookie -> {
ResponseCookie responseCookie = ResponseCookie.from(cookie.name(), cookie.value())
.domain(cookie.domain())
@ -85,8 +91,8 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public String toString() {
return "ReactorClientHttpResponse{" +
"request=" + this.channel.method().name() + " " + this.channel.uri() + "," +
"request=" + this.response.method().name() + " " + this.response.uri() + "," +
"status=" + getStatusCode() +
'}';
}
}
}

View File

@ -1,53 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import reactor.io.netty.http.HttpClient;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Create a {@link ClientHttpRequest} for the Reactor Net HTTP client
*
* @author Brian Clozel
*/
public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory {
private final HttpClient httpClient;
public ReactorHttpClientRequestFactory() {
this(reactor.io.netty.http.HttpClient.create());
}
protected ReactorHttpClientRequestFactory(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) {
Assert.notNull(httpMethod, "HTTP method is required");
Assert.notNull(uri, "request URI is required");
Assert.notNull(headers, "request headers are required");
return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers);
}
}

View File

@ -1,137 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
/**
* {@link ClientHttpRequest} implementation for the RxNetty HTTP client
*
* @author Brian Clozel
*/
public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
private final NettyDataBufferFactory dataBufferFactory;
private final HttpMethod httpMethod;
private final URI uri;
private Observable<ByteBuf> body;
public RxNettyClientHttpRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers,
NettyDataBufferFactory dataBufferFactory) {
super(headers);
this.httpMethod = httpMethod;
this.uri = uri;
this.dataBufferFactory = dataBufferFactory;
}
@Override
public DataBufferFactory bufferFactory() {
return this.dataBufferFactory;
}
/**
* Set the body of the message to the given {@link Publisher}.
*
* <p>Since the HTTP channel is not yet created when this method
* is called, the {@code Mono<Void>} return value completes immediately.
* For an event that signals that we're done writing the request, check the
* {@link #execute()} method.
*
* @return a publisher that completes immediately.
* @see #execute()
*/
@Override
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
this.body = RxJava1ObservableConverter.fromPublisher(Flux.from(body)
.map(b -> dataBufferFactory.wrap(b.asByteBuffer()).getNativeBuffer()));
return Mono.empty();
}
@Override
public HttpMethod getMethod() {
return this.httpMethod;
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public Mono<ClientHttpResponse> execute() {
try {
HttpClientRequest<ByteBuf, ByteBuf> request = HttpClient
.newClient(this.uri.getHost(), this.uri.getPort())
.createRequest(io.netty.handler.codec.http.HttpMethod.valueOf(this.httpMethod.name()), uri.getRawPath());
return applyBeforeCommit()
.then(() -> Mono.just(request))
.map(req -> {
for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {
for (String value : entry.getValue()) {
req = req.addHeader(entry.getKey(), value);
}
}
for (Map.Entry<String, List<HttpCookie>> entry : getCookies().entrySet()) {
for (HttpCookie cookie : entry.getValue()) {
req.addCookie(new DefaultCookie(cookie.getName(), cookie.getValue()));
}
}
return req;
})
.map(req -> {
if (this.body != null) {
return RxJava1ObservableConverter.toPublisher(req.writeContent(this.body));
}
else {
return RxJava1ObservableConverter.toPublisher(req);
}
})
.flatMap(resp -> resp)
.next().map(response -> new RxNettyClientHttpResponse(response,
this.dataBufferFactory));
}
catch (IllegalArgumentException exc) {
return Mono.error(exc);
}
}
}

View File

@ -1,101 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* {@link ClientHttpResponse} implementation for the RxNetty HTTP client
*
* @author Brian Clozel
*/
public class RxNettyClientHttpResponse implements ClientHttpResponse {
private final HttpClientResponse<ByteBuf> response;
private final HttpHeaders headers;
private final MultiValueMap<String, ResponseCookie> cookies;
private final NettyDataBufferFactory dataBufferFactory;
public RxNettyClientHttpResponse(HttpClientResponse<ByteBuf> response,
NettyDataBufferFactory dataBufferFactory) {
Assert.notNull("'request', request must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
this.dataBufferFactory = dataBufferFactory;
this.response = response;
this.headers = new HttpHeaders();
this.response.headerIterator().forEachRemaining(e -> this.headers.set(e.getKey().toString(), e.getValue().toString()));
this.cookies = initCookies(response);
}
private static MultiValueMap<String, ResponseCookie> initCookies(HttpClientResponse<ByteBuf> response) {
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
response.getCookies().values().stream().flatMap(Collection::stream)
.forEach(cookie -> {
ResponseCookie responseCookie = ResponseCookie.from(cookie.name(), cookie.value())
.domain(cookie.domain())
.path(cookie.path())
.maxAge(cookie.maxAge())
.secure(cookie.isSecure())
.httpOnly(cookie.isHttpOnly())
.build();
result.add(cookie.name(), responseCookie);
});
return CollectionUtils.unmodifiableMultiValueMap(result);
}
@Override
public HttpStatus getStatusCode() {
return HttpStatus.valueOf(this.response.getStatus().code());
}
@Override
public Flux<DataBuffer> getBody() {
return RxJava1ObservableConverter
.toPublisher(this.response.getContent().map(dataBufferFactory::wrap));
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
return this.cookies;
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Create a {@link ClientHttpRequestFactory} for the RxNetty HTTP client
*
* @author Brian Clozel
*/
public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory {
private final NettyDataBufferFactory dataBufferFactory;
public RxNettyHttpClientRequestFactory(NettyDataBufferFactory dataBufferFactory) {
this.dataBufferFactory = dataBufferFactory;
}
@Override
public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) {
Assert.notNull(httpMethod, "HTTP method is required");
Assert.notNull(uri, "request URI is required");
Assert.notNull(headers, "request headers are required");
return new RxNettyClientHttpRequest(httpMethod, uri, headers,
this.dataBufferFactory);
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.MultiValueMap;
/**
* Holds all the application information required to build an actual HTTP client request.
* <p>The request body is materialized by a {@code Publisher} of Objects and their type
* by a {@code ResolvableType} instance; it should be later converted to a
* {@code Publisher<DataBuffer>} to be written to the actual HTTP client request.
*
* @author Brian Clozel
*/
public class ClientWebRequest {
protected final HttpMethod httpMethod;
protected final URI url;
protected HttpHeaders httpHeaders;
private MultiValueMap<String, HttpCookie> cookies;
protected Publisher body;
protected ResolvableType elementType;
public ClientWebRequest(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.url = url;
}
public HttpMethod getMethod() {
return httpMethod;
}
public URI getUrl() {
return url;
}
public HttpHeaders getHttpHeaders() {
return httpHeaders;
}
public void setHttpHeaders(HttpHeaders httpHeaders) {
this.httpHeaders = httpHeaders;
}
public MultiValueMap<String, HttpCookie> getCookies() {
return cookies;
}
public void setCookies(MultiValueMap<String, HttpCookie> cookies) {
this.cookies = cookies;
}
public Publisher getBody() {
return body;
}
public void setBody(Publisher body) {
this.body = body;
}
public ResolvableType getElementType() {
return elementType;
}
public void setElementType(ResolvableType elementType) {
this.elementType = elementType;
}
}

View File

@ -17,15 +17,12 @@
package org.springframework.web.client.reactive;
/**
* A {@code WebResponseExtractor} extracts the relevant part of a
* raw {@link org.springframework.http.client.reactive.ClientHttpResponse},
* optionally decoding the response body and using a target composition API.
*
* <p>See static factory methods in {@link WebResponseExtractors}.
* Build {@link ClientWebRequest}s
*
* @author Brian Clozel
*/
public interface WebResponseExtractor<T> {
public interface ClientWebRequestBuilder {
T extract(WebResponse webResponse);
}
ClientWebRequest build();
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import org.springframework.http.HttpMethod;
/**
* Static factory methods for {@link DefaultClientWebRequestBuilder ClientWebRequestBuilders}
*
* @author Brian Clozel
*/
public abstract class ClientWebRequestBuilders {
/**
* Create a {@link DefaultClientWebRequestBuilder} for a GET request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder get(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a POST request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder post(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a PUT request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder put(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a PATCH request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder patch(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a DELETE request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder delete(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for an OPTIONS request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder options(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a HEAD request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder head(String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultClientWebRequestBuilder} for a request with the given HTTP method.
*
* @param httpMethod the HTTP method
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultClientWebRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
return new DefaultClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
}
}

View File

@ -17,20 +17,20 @@
package org.springframework.web.client.reactive;
/**
* Allows post processing the {@link DefaultHttpRequestBuilder} for strategy for
* performing more complex operations.
* Allow post processing and/or wrapping the {@link ClientWebRequest} before
* it's sent to the origin server.
*
* @author Rob Winch
* @see DefaultHttpRequestBuilder#apply(RequestPostProcessor)
* @author Brian Clozel
* @see DefaultClientWebRequestBuilder#apply(ClientWebRequestPostProcessor)
*/
public interface RequestPostProcessor {
public interface ClientWebRequestPostProcessor {
/**
* Implementations can modify the {@link DefaultHttpRequestBuilder} passed
* in.
* Implementations can modify and/or wrap the {@link ClientWebRequest} passed in
* and return it
*
* @param toPostProcess
* the {@link DefaultHttpRequestBuilder} to be modified.
* @param request the {@link ClientWebRequest} to be modified and/or wrapped.
*/
void postProcess(DefaultHttpRequestBuilder toPostProcess);
ClientWebRequest postProcess(ClientWebRequest request);
}

View File

@ -0,0 +1,196 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.DefaultUriTemplateHandler;
import org.springframework.web.util.UriTemplateHandler;
/**
* Builds a {@link ClientHttpRequest} using a {@link Publisher}
* as request body.
*
* <p>See static factory methods in {@link ClientWebRequestBuilders}
*
* @author Brian Clozel
* @see ClientWebRequestBuilders
*/
public class DefaultClientWebRequestBuilder implements ClientWebRequestBuilder {
private final UriTemplateHandler uriTemplateHandler = new DefaultUriTemplateHandler();
private HttpMethod httpMethod;
private HttpHeaders httpHeaders;
private URI url;
private final MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
private Publisher body;
private ResolvableType elementType;
private List<ClientWebRequestPostProcessor> postProcessors = new ArrayList<>();
protected DefaultClientWebRequestBuilder() {
}
public DefaultClientWebRequestBuilder(HttpMethod httpMethod, String urlTemplate,
Object... urlVariables) {
this.httpMethod = httpMethod;
this.httpHeaders = new HttpHeaders();
this.url = this.uriTemplateHandler.expand(urlTemplate, urlVariables);
}
public DefaultClientWebRequestBuilder(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.httpHeaders = new HttpHeaders();
this.url = url;
}
/**
* Add an HTTP request header
*/
public DefaultClientWebRequestBuilder header(String name, String... values) {
Arrays.stream(values).forEach(value -> this.httpHeaders.add(name, value));
return this;
}
/**
* Add all provided HTTP request headers
*/
public DefaultClientWebRequestBuilder headers(HttpHeaders httpHeaders) {
this.httpHeaders = httpHeaders;
return this;
}
/**
* Set the Content-Type request header to the given {@link MediaType}
*/
public DefaultClientWebRequestBuilder contentType(MediaType contentType) {
this.httpHeaders.setContentType(contentType);
return this;
}
/**
* Set the Content-Type request header to the given media type
*/
public DefaultClientWebRequestBuilder contentType(String contentType) {
this.httpHeaders.setContentType(MediaType.parseMediaType(contentType));
return this;
}
/**
* Set the Accept request header to the given {@link MediaType}s
*/
public DefaultClientWebRequestBuilder accept(MediaType... mediaTypes) {
this.httpHeaders.setAccept(Arrays.asList(mediaTypes));
return this;
}
/**
* Set the Accept request header to the given media types
*/
public DefaultClientWebRequestBuilder accept(String... mediaTypes) {
this.httpHeaders.setAccept(
Arrays.stream(mediaTypes).map(type -> MediaType.parseMediaType(type))
.collect(Collectors.toList()));
return this;
}
/**
* Add a Cookie to the HTTP request
*/
public DefaultClientWebRequestBuilder cookie(String name, String value) {
return cookie(new HttpCookie(name, value));
}
/**
* Add a Cookie to the HTTP request
*/
public DefaultClientWebRequestBuilder cookie(HttpCookie cookie) {
this.cookies.add(cookie.getName(), cookie);
return this;
}
/**
* Allows performing more complex operations with a strategy. For example, a
* {@link ClientWebRequestPostProcessor} implementation might accept the arguments of username
* and password and set an HTTP Basic authentication header.
*
* @param postProcessor the {@link ClientWebRequestPostProcessor} to use. Cannot be null.
*
* @return this instance for further modifications.
*/
public DefaultClientWebRequestBuilder apply(ClientWebRequestPostProcessor postProcessor) {
Assert.notNull(postProcessor, "`postProcessor` is required");
this.postProcessors.add(postProcessor);
return this;
}
/**
* Use the given object as the request body
*/
public DefaultClientWebRequestBuilder body(Object content) {
this.body = Mono.just(content);
this.elementType = ResolvableType.forInstance(content);
return this;
}
/**
* Use the given {@link Publisher} as the request body and use its {@link ResolvableType}
* as type information for the element published by this reactive stream
*/
public DefaultClientWebRequestBuilder body(Publisher<?> content, ResolvableType publisherType) {
this.body = content;
this.elementType = publisherType;
return this;
}
@Override
public ClientWebRequest build() {
ClientWebRequest clientWebRequest = new ClientWebRequest(this.httpMethod, this.url);
clientWebRequest.setHttpHeaders(this.httpHeaders);
clientWebRequest.setCookies(this.cookies);
clientWebRequest.setBody(this.body);
clientWebRequest.setElementType(this.elementType);
for (ClientWebRequestPostProcessor postProcessor : this.postProcessors) {
clientWebRequest = postProcessor.postProcess(clientWebRequest);
}
return clientWebRequest;
}
}

View File

@ -1,167 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Encoder;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestFactory;
import org.springframework.util.Assert;
import org.springframework.web.client.RestClientException;
import org.springframework.web.util.DefaultUriTemplateHandler;
import org.springframework.web.util.UriTemplateHandler;
/**
* Builds a {@link ClientHttpRequest}
*
* <p>See static factory methods in {@link HttpRequestBuilders}
*
* @author Brian Clozel
* @see HttpRequestBuilders
*/
public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
private final UriTemplateHandler uriTemplateHandler = new DefaultUriTemplateHandler();
protected HttpMethod httpMethod;
protected HttpHeaders httpHeaders;
protected URI url;
protected Publisher contentPublisher;
protected ResolvableType contentType;
protected final List<HttpCookie> cookies = new ArrayList<HttpCookie>();
protected DefaultHttpRequestBuilder() {
}
public DefaultHttpRequestBuilder(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) throws RestClientException {
this.httpMethod = httpMethod;
this.httpHeaders = new HttpHeaders();
this.url = this.uriTemplateHandler.expand(urlTemplate, urlVariables);
}
public DefaultHttpRequestBuilder(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.httpHeaders = new HttpHeaders();
this.url = url;
}
public DefaultHttpRequestBuilder header(String name, String... values) {
Arrays.stream(values).forEach(value -> this.httpHeaders.add(name, value));
return this;
}
public DefaultHttpRequestBuilder headers(HttpHeaders httpHeaders) {
this.httpHeaders = httpHeaders;
return this;
}
public DefaultHttpRequestBuilder contentType(MediaType contentType) {
this.httpHeaders.setContentType(contentType);
return this;
}
public DefaultHttpRequestBuilder contentType(String contentType) {
this.httpHeaders.setContentType(MediaType.parseMediaType(contentType));
return this;
}
public DefaultHttpRequestBuilder accept(MediaType... mediaTypes) {
this.httpHeaders.setAccept(Arrays.asList(mediaTypes));
return this;
}
public DefaultHttpRequestBuilder accept(String... mediaTypes) {
this.httpHeaders.setAccept(Arrays.stream(mediaTypes)
.map(type -> MediaType.parseMediaType(type))
.collect(Collectors.toList()));
return this;
}
public DefaultHttpRequestBuilder content(Object content) {
this.contentPublisher = Mono.just(content);
this.contentType = ResolvableType.forInstance(content);
return this;
}
public DefaultHttpRequestBuilder contentStream(Publisher<?> content, ResolvableType type) {
this.contentPublisher = Flux.from(content);
this.contentType = type;
return this;
}
/**
* Allows performing more complex operations with a strategy. For example, a
* {@link RequestPostProcessor} implementation might accept the arguments of
* username and password and set an HTTP Basic authentication header.
*
* @param postProcessor the {@link RequestPostProcessor} to use. Cannot be null.
*
* @return this instance for further modifications.
*/
public DefaultHttpRequestBuilder apply(RequestPostProcessor postProcessor) {
Assert.notNull(postProcessor, "`postProcessor` is required");
postProcessor.postProcess(this);
return this;
}
public ClientHttpRequest build(ClientHttpRequestFactory factory, List<Encoder<?>> messageEncoders) {
ClientHttpRequest request = factory.createRequest(this.httpMethod, this.url, this.httpHeaders);
request.getHeaders().putAll(this.httpHeaders);
if (this.contentPublisher != null) {
MediaType mediaType = request.getHeaders().getContentType();
Optional<Encoder<?>> messageEncoder = messageEncoders
.stream()
.filter(e -> e.canEncode(this.contentType, mediaType))
.findFirst();
if (messageEncoder.isPresent()) {
request.writeWith(messageEncoder.get()
.encode(this.contentPublisher, request.bufferFactory(),
this.contentType, mediaType));
}
else {
throw new WebClientException("Can't write request body " +
"of type '" + this.contentType.toString() +
"' for content-type '" + mediaType.toString() + "'");
}
}
return request;
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.Decoder;
import org.springframework.http.client.reactive.ClientHttpResponse;
/**
* Default implementation of the {@link WebResponse} interface
*
* @author Brian Clozel
*/
public class DefaultWebResponse implements WebResponse {
private final Mono<ClientHttpResponse> clientResponse;
private final List<Decoder<?>> messageDecoders;
public DefaultWebResponse(Mono<ClientHttpResponse> clientResponse, List<Decoder<?>> messageDecoders) {
this.clientResponse = clientResponse;
this.messageDecoders = messageDecoders;
}
@Override
public Mono<ClientHttpResponse> getClientResponse() {
return this.clientResponse;
}
@Override
public List<Decoder<?>> getMessageDecoders() {
return this.messageDecoders;
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import org.springframework.core.codec.Encoder;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestFactory;
/**
* Build {@link ClientHttpRequest} using a {@link ClientHttpRequestFactory}
* which wraps an HTTP client implementation.
*
* @author Brian Clozel
*/
public interface HttpRequestBuilder {
/**
* Build a {@link ClientHttpRequest}
*
* @param factory the factory that creates the actual {@link ClientHttpRequest}
* @param messageEncoders the {@link Encoder}s to use for encoding the request body
*/
ClientHttpRequest build(ClientHttpRequestFactory factory, List<Encoder<?>> messageEncoders);
}

View File

@ -1,110 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import org.springframework.http.HttpMethod;
/**
* Static factory methods for {@link DefaultHttpRequestBuilder RequestBuilders}.
*
* @author Brian Clozel
*/
public abstract class HttpRequestBuilders {
/**
* Create a {@link DefaultHttpRequestBuilder} for a GET request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder get(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for a POST request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder post(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for a PUT request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder put(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for a PATCH request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder patch(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for a DELETE request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder delete(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for an OPTIONS request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder options(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for a HEAD request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder head(String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
}
/**
* Create a {@link DefaultHttpRequestBuilder} for a request with the given HTTP method.
*
* @param httpMethod the HTTP method
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static DefaultHttpRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
return new DefaultHttpRequestBuilder(httpMethod, urlTemplate, urlVariables);
}
}

View File

@ -20,25 +20,19 @@ import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.Decoder;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.converter.reactive.HttpMessageConverter;
/**
* Result of a {@code ClientHttpRequest} sent to a remote server by the {@code WebClient}
* A {@code ResponseExtractor} extracts the relevant part of a
* raw {@link org.springframework.http.client.reactive.ClientHttpResponse},
* optionally decoding the response body and using a target composition API.
*
* <p>Contains all the required information to extract relevant information from the raw response.
* <p>See static factory methods in {@link ResponseExtractors}.
*
* @author Brian Clozel
*/
public interface WebResponse {
public interface ResponseExtractor<T> {
/**
* Return the raw response received by the {@code WebClient}
*/
Mono<ClientHttpResponse> getClientResponse();
/**
* Return the configured list of {@link Decoder}s that can be used to decode the raw response body
*/
List<Decoder<?>> getMessageDecoders();
T extract(Mono<ClientHttpResponse> clientResponse, List<HttpMessageConverter<?>> messageConverters);
}

View File

@ -0,0 +1,163 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.util.List;
import java.util.Optional;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Static factory methods for {@link ResponseExtractor} based on the {@link Flux} and
* {@link Mono} API.
*
* @author Brian Clozel
*/
public class ResponseExtractors {
private static final Object EMPTY_BODY = new Object();
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Mono<T>> body(ResolvableType bodyType) {
// noinspection unchecked
return (clientResponse, messageConverters) -> (Mono<T>) clientResponse
.flatMap(resp -> decodeResponseBody(resp, bodyType,
messageConverters))
.next();
}
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}
*/
public static <T> ResponseExtractor<Mono<T>> body(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return body(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Flux<T>> bodyStream(ResolvableType bodyType) {
return (clientResponse, messageConverters) -> clientResponse
.flatMap(resp -> decodeResponseBody(resp, bodyType, messageConverters));
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}
*/
public static <T> ResponseExtractor<Flux<T>> bodyStream(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return bodyStream(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a single type {@code T}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<T>>> response(
ResolvableType bodyType) {
return (clientResponse, messageConverters) -> clientResponse.then(response -> {
return Mono.when(
decodeResponseBody(response, bodyType,
messageConverters).next().defaultIfEmpty(
EMPTY_BODY),
Mono.just(response.getHeaders()),
Mono.just(response.getStatusCode()));
}).map(tuple -> {
Object body = (tuple.getT1() != EMPTY_BODY ? tuple.getT1() : null);
// noinspection unchecked
return new ResponseEntity<>((T) body, tuple.getT2(), tuple.getT3());
});
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a single type {@code T}
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<T>>> response(
Class<T> bodyClass) {
ResolvableType bodyType = ResolvableType.forClass(bodyClass);
return response(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a {@code Flux<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<Flux<T>>>> responseStream(
ResolvableType type) {
return (clientResponse, messageConverters) -> clientResponse
.map(response -> new ResponseEntity<>(
decodeResponseBody(response, type,
messageConverters),
response.getHeaders(), response.getStatusCode()));
}
/**
* Extract the full response body as a {@code ResponseEntity} with its body decoded as
* a {@code Flux<T>}
*/
public static <T> ResponseExtractor<Mono<ResponseEntity<Flux<T>>>> responseStream(
Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return responseStream(resolvableType);
}
/**
* Extract the response headers as an {@code HttpHeaders} instance
*/
public static ResponseExtractor<Mono<HttpHeaders>> headers() {
return (clientResponse, messageConverters) -> clientResponse.map(resp -> resp.getHeaders());
}
protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response,
ResolvableType responseType,
List<HttpMessageConverter<?>> messageConverters) {
MediaType contentType = response.getHeaders().getContentType();
Optional<HttpMessageConverter<?>> converter = resolveConverter(messageConverters,
responseType, contentType);
if (!converter.isPresent()) {
return Flux.error(new IllegalStateException(
"Could not decode response body of type '" + contentType
+ "' with target type '" + responseType.toString() + "'"));
}
// noinspection unchecked
return (Flux<T>) converter.get().read(responseType, response);
}
protected static Optional<HttpMessageConverter<?>> resolveConverter(
List<HttpMessageConverter<?>> messageConverters, ResolvableType type,
MediaType mediaType) {
return messageConverters.stream().filter(e -> e.canRead(type, mediaType))
.findFirst();
}
}

View File

@ -1,131 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
/**
* Static factory methods for {@link WebResponseExtractor}
* based on the {@link Observable} and {@link Single} API.
*
* @author Brian Clozel
*/
public class RxJava1WebResponseExtractors {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private static final Object[] HINTS = new Object[] {UTF_8};
/**
* Extract the response body and decode it, returning it as a {@code Single<T>}
*/
public static <T> WebResponseExtractor<Single<T>> body(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
//noinspection unchecked
return webResponse -> (Single<T>) RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
.flatMap(resp -> decodeResponseBody(resp, resolvableType, webResponse.getMessageDecoders()))
.next());
}
/**
* Extract the response body and decode it, returning it as an {@code Observable<T>}
*/
public static <T> WebResponseExtractor<Observable<T>> bodyStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return webResponse -> RxJava1ObservableConverter.fromPublisher(webResponse.getClientResponse()
.flatMap(resp -> decodeResponseBody(resp, resolvableType, webResponse.getMessageDecoders())));
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a single type {@code T}
*/
public static <T> WebResponseExtractor<Single<ResponseEntity<T>>> response(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return webResponse -> (Single<ResponseEntity<T>>)
RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
.then(response ->
Mono.when(
decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders()).next(),
Mono.just(response.getHeaders()),
Mono.just(response.getStatusCode())))
.map(tuple -> {
//noinspection unchecked
return new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3());
}));
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as an {@code Observable<T>}
*/
public static <T> WebResponseExtractor<Single<ResponseEntity<Observable<T>>>> responseStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return webResponse -> RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
.map(response -> new ResponseEntity<>(
RxJava1ObservableConverter
.fromPublisher(decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders())),
response.getHeaders(),
response.getStatusCode())));
}
/**
* Extract the response headers as an {@code HttpHeaders} instance
*/
public static WebResponseExtractor<Single<HttpHeaders>> headers() {
return webResponse -> RxJava1SingleConverter
.fromPublisher(webResponse.getClientResponse().map(resp -> resp.getHeaders()));
}
protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
List<Decoder<?>> messageDecoders) {
MediaType contentType = response.getHeaders().getContentType();
Optional<Decoder<?>> decoder = resolveDecoder(messageDecoders, responseType, contentType);
if (!decoder.isPresent()) {
return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType +
"' with target type '" + responseType.toString() + "'"));
}
//noinspection unchecked
return (Flux<T>) decoder.get().decode(response.getBody(), responseType, contentType, HINTS);
}
protected static Optional<Decoder<?>> resolveDecoder(List<Decoder<?>> messageDecoders, ResolvableType type,
MediaType mediaType) {
return messageDecoders.stream().filter(e -> e.canDecode(type, mediaType)).findFirst();
}
}

View File

@ -16,12 +16,17 @@
package org.springframework.web.client.reactive;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import reactor.core.publisher.Mono;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.ByteBufferDecoder;
@ -31,105 +36,194 @@ import org.springframework.http.codec.json.JacksonJsonEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.codec.StringEncoder;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestFactory;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.codec.xml.Jaxb2Decoder;
import org.springframework.http.codec.xml.Jaxb2Encoder;
import org.springframework.http.converter.reactive.CodecHttpMessageConverter;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import org.springframework.http.converter.reactive.ResourceHttpMessageConverter;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Mono;
/**
* Reactive Web client supporting the HTTP/1.1 protocol
*
* <p>Here is a simple example of a GET request:
*
* <pre class="code">
* WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
* // should be shared between HTTP calls
* WebClient client = new WebClient(new ReactorHttpClient());
*
* Mono&lt;String&gt; result = client
* .perform(HttpRequestBuilders.get("http://example.org/resource")
* .accept(MediaType.TEXT_PLAIN))
* .extract(WebResponseExtractors.body(String.class));
* .perform(ClientWebRequestBuilders.get("http://example.org/resource")
* .accept(MediaType.TEXT_PLAIN))
* .extract(ResponseExtractors.body(String.class));
* </pre>
*
* <p>This Web client relies on
* <ul>
* <li>a {@link ClientHttpRequestFactory} that drives the underlying library (e.g. Reactor-Net, RxNetty...)</li>
* <li>an {@link HttpRequestBuilder} which create a Web request with a builder API (see {@link HttpRequestBuilders})</li>
* <li>an {@link WebResponseExtractor} which extracts the relevant part of the server response
* with the composition API of choice (see {@link WebResponseExtractors}</li>
* <li>an {@link ClientHttpConnector} implementation that drives the underlying library (e.g. Reactor-Netty)</li>
* <li>a {@link ClientWebRequestBuilder} which creates a Web request with a builder API (see
* {@link ClientWebRequestBuilders})</li>
* <li>an {@link ResponseExtractor} which extracts the relevant part of the server
* response with the composition API of choice (see {@link ResponseExtractors}</li>
* </ul>
*
* @author Brian Clozel
* @see HttpRequestBuilders
* @see WebResponseExtractors
* @see ClientWebRequestBuilders
* @see ResponseExtractors
*/
public final class WebClient {
private ClientHttpRequestFactory requestFactory;
private static final ClassLoader classLoader = WebClient.class.getClassLoader();
private List<Encoder<?>> messageEncoders;
private static final boolean jackson2Present = ClassUtils
.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader)
&& ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
classLoader);
private List<Decoder<?>> messageDecoders;
private static final boolean jaxb2Present = ClassUtils
.isPresent("javax.xml.bind.Binder", classLoader);
private ClientHttpConnector clientHttpConnector;
private List<HttpMessageConverter<?>> messageConverters;
/**
* Create a {@code ReactiveRestClient} instance, using the {@link ClientHttpRequestFactory}
* implementation given as an argument to drive the underlying HTTP client implementation.
* Create a {@code WebClient} instance, using the {@link ClientHttpConnector}
* implementation given as an argument to drive the underlying
* implementation.
*
* Register by default the following Encoders and Decoders:
* <ul>
* <li>{@link ByteBufferEncoder} / {@link ByteBufferDecoder}</li>
* <li>{@link StringEncoder} / {@link StringDecoder}</li>
* <li>{@link JacksonJsonEncoder} / {@link JacksonJsonDecoder}</li>
* <li>{@link ByteBufferEncoder} / {@link ByteBufferDecoder}</li>
* <li>{@link StringEncoder} / {@link StringDecoder}</li>
* <li>{@link Jaxb2Encoder} / {@link Jaxb2Decoder}</li>
* <li>{@link JacksonJsonEncoder} / {@link JacksonJsonDecoder}</li>
* </ul>
*
* @param requestFactory the {@code ClientHttpRequestFactory} to use
* @param clientHttpConnector the {@code ClientHttpRequestFactory} to use
*/
public WebClient(ClientHttpRequestFactory requestFactory) {
this.requestFactory = requestFactory;
this.messageEncoders = Arrays.asList(new ByteBufferEncoder(), new StringEncoder(),
new JacksonJsonEncoder());
this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(),
new JacksonJsonDecoder());
public WebClient(ClientHttpConnector clientHttpConnector) {
this.clientHttpConnector = clientHttpConnector;
this.messageConverters = new ArrayList<>();
addDefaultHttpMessageConverters(this.messageConverters);
}
/**
* Set the list of {@link Encoder}s to use for encoding messages
* Adds default HTTP message converters
*/
public void setMessageEncoders(List<Encoder<?>> messageEncoders) {
this.messageEncoders = messageEncoders;
protected final void addDefaultHttpMessageConverters(
List<HttpMessageConverter<?>> converters) {
converters.add(converter(new ByteBufferEncoder(), new ByteBufferDecoder()));
converters.add(converter(new StringEncoder(), new StringDecoder()));
converters.add(new ResourceHttpMessageConverter());
if (jaxb2Present) {
converters.add(converter(new Jaxb2Encoder(), new Jaxb2Decoder()));
}
if (jackson2Present) {
converters.add(converter(new JacksonJsonEncoder(), new JacksonJsonDecoder()));
}
}
private static <T> HttpMessageConverter<T> converter(Encoder<T> encoder,
Decoder<T> decoder) {
return new CodecHttpMessageConverter<>(encoder, decoder);
}
/**
* Set the list of {@link Decoder}s to use for decoding messages
* Set the list of {@link HttpMessageConverter}s to use for encoding and decoding HTTP
* messages
*/
public void setMessageDecoders(List<Decoder<?>> messageDecoders) {
this.messageDecoders = messageDecoders;
public void setMessageConverters(List<HttpMessageConverter<?>> messageConverters) {
this.messageConverters = messageConverters;
}
/**
* Perform the actual HTTP request/response exchange
*
* <p>Pulling demand from the exposed {@code Flux} will result in:
* <p>
* Requesting from the exposed {@code Flux} will result in:
* <ul>
* <li>building the actual HTTP request using the provided {@code RequestBuilder}</li>
* <li>encoding the HTTP request body with the configured {@code Encoder}s</li>
* <li>returning the response with a publisher of the body</li>
* <li>building the actual HTTP request using the provided {@code ClientWebRequestBuilder}</li>
* <li>encoding the HTTP request body with the configured {@code HttpMessageConverter}s</li>
* <li>returning the response with a publisher of the body</li>
* </ul>
*/
public WebResponseActions perform(HttpRequestBuilder builder) {
public WebResponseActions perform(ClientWebRequestBuilder builder) {
ClientHttpRequest request = builder.build(this.requestFactory, this.messageEncoders);
final Mono<ClientHttpResponse> clientResponse = request.execute()
.log("org.springframework.http.client.reactive");
ClientWebRequest clientWebRequest = builder.build();
final Mono<ClientHttpResponse> clientResponse = this.clientHttpConnector
.connect(clientWebRequest.getMethod(), clientWebRequest.getUrl(),
new DefaultRequestCallback(clientWebRequest))
.log("org.springframework.web.client.reactive", Level.FINE);
return new WebResponseActions() {
@Override
public void doWithStatus(Consumer<HttpStatus> consumer) {
// TODO: implement
clientResponse.doOnNext(clientHttpResponse ->
consumer.accept(clientHttpResponse.getStatusCode()));
}
@Override
public <T> T extract(WebResponseExtractor<T> extractor) {
return extractor.extract(new DefaultWebResponse(clientResponse, messageDecoders));
public <T> T extract(ResponseExtractor<T> extractor) {
return extractor.extract(clientResponse, messageConverters);
}
};
}
}
protected class DefaultRequestCallback implements Function<ClientHttpRequest, Mono<Void>> {
private final ClientWebRequest clientWebRequest;
public DefaultRequestCallback(ClientWebRequest clientWebRequest) {
this.clientWebRequest = clientWebRequest;
}
@Override
public Mono<Void> apply(ClientHttpRequest clientHttpRequest) {
clientHttpRequest.getHeaders().putAll(this.clientWebRequest.getHttpHeaders());
if (clientHttpRequest.getHeaders().getAccept().isEmpty()) {
clientHttpRequest.getHeaders().setAccept(
Collections.singletonList(MediaType.ALL));
}
clientWebRequest.getCookies().values()
.stream().flatMap(cookies -> cookies.stream())
.forEach(cookie -> clientHttpRequest.getCookies().add(cookie.getName(), cookie));
if (this.clientWebRequest.getBody() != null) {
return writeRequestBody(this.clientWebRequest.getBody(),
this.clientWebRequest.getElementType(), clientHttpRequest, messageConverters);
}
else {
return clientHttpRequest.setComplete();
}
}
protected Mono<Void> writeRequestBody(Publisher<?> content,
ResolvableType requestType, ClientHttpRequest request,
List<HttpMessageConverter<?>> messageConverters) {
MediaType contentType = request.getHeaders().getContentType();
Optional<HttpMessageConverter<?>> converter = resolveConverter(messageConverters, requestType, contentType);
if (!converter.isPresent()) {
return Mono.error(new IllegalStateException(
"Could not encode request body of type '" + contentType
+ "' with target type '" + requestType.toString() + "'"));
}
// noinspection unchecked
return converter.get().write((Publisher) content, requestType, contentType, request);
}
protected Optional<HttpMessageConverter<?>> resolveConverter(
List<HttpMessageConverter<?>> messageConverters, ResolvableType type,
MediaType mediaType) {
return messageConverters.stream().filter(e -> e.canWrite(type, mediaType)).findFirst();
}
}
}

View File

@ -45,5 +45,5 @@ public interface WebResponseActions {
* .extract(response(String.class));
* </pre>
*/
<T> T extract(WebResponseExtractor<T> extractor);
<T> T extract(ResponseExtractor<T> extractor);
}

View File

@ -1,161 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
/**
* Static factory methods for {@link WebResponseExtractor}
* based on the {@link Flux} and {@link Mono} API.
*
* @author Brian Clozel
*/
public class WebResponseExtractors {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private static final Object[] HINTS = new Object[] {UTF_8};
private static final Object EMPTY_BODY = new Object();
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> WebResponseExtractor<Mono<T>> body(ResolvableType bodyType) {
//noinspection unchecked
return webResponse -> (Mono<T>) webResponse.getClientResponse()
.flatMap(resp -> decodeResponseBody(resp, bodyType, webResponse.getMessageDecoders()))
.next();
}
/**
* Extract the response body and decode it, returning it as a {@code Mono<T>}
*/
public static <T> WebResponseExtractor<Mono<T>> body(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return body(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> WebResponseExtractor<Flux<T>> bodyStream(ResolvableType bodyType) {
return webResponse -> webResponse.getClientResponse()
.flatMap(resp -> decodeResponseBody(resp, bodyType, webResponse.getMessageDecoders()));
}
/**
* Extract the response body and decode it, returning it as a {@code Flux<T>}
*/
public static <T> WebResponseExtractor<Flux<T>> bodyStream(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return bodyStream(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a single type {@code T}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> WebResponseExtractor<Mono<ResponseEntity<T>>> response(ResolvableType bodyType) {
return webResponse -> webResponse.getClientResponse()
.then(response -> {
List<Decoder<?>> decoders = webResponse.getMessageDecoders();
return Mono.when(
decodeResponseBody(response, bodyType, decoders).next().defaultIfEmpty(EMPTY_BODY),
Mono.just(response.getHeaders()),
Mono.just(response.getStatusCode()));
})
.map(tuple -> {
Object body = (tuple.getT1() != EMPTY_BODY ? tuple.getT1() : null);
//noinspection unchecked
return new ResponseEntity<>((T) body, tuple.getT2(), tuple.getT3());
});
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a single type {@code T}
*/
public static <T> WebResponseExtractor<Mono<ResponseEntity<T>>> response(Class<T> bodyClass) {
ResolvableType bodyType = ResolvableType.forClass(bodyClass);
return response(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a {@code Flux<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> WebResponseExtractor<Mono<ResponseEntity<Flux<T>>>> responseStream(ResolvableType type) {
return webResponse -> webResponse.getClientResponse()
.map(response -> new ResponseEntity<>(
decodeResponseBody(response, type, webResponse.getMessageDecoders()),
response.getHeaders(), response.getStatusCode()));
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a {@code Flux<T>}
*/
public static <T> WebResponseExtractor<Mono<ResponseEntity<Flux<T>>>> responseStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return responseStream(resolvableType);
}
/**
* Extract the response headers as an {@code HttpHeaders} instance
*/
public static WebResponseExtractor<Mono<HttpHeaders>> headers() {
return webResponse -> webResponse.getClientResponse().map(resp -> resp.getHeaders());
}
protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
List<Decoder<?>> messageDecoders) {
MediaType contentType = response.getHeaders().getContentType();
Optional<Decoder<?>> decoder = resolveDecoder(messageDecoders, responseType, contentType);
if (!decoder.isPresent()) {
return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType +
"' with target type '" + responseType.toString() + "'"));
}
//noinspection unchecked
return (Flux<T>) decoder.get().decode(response.getBody(), responseType, contentType, HINTS);
}
protected static Optional<Decoder<?>> resolveDecoder(List<Decoder<?>> messageDecoders, ResolvableType type,
MediaType mediaType) {
return messageDecoders.stream().filter(e -> e.canDecode(type, mediaType)).findFirst();
}
}

View File

@ -18,15 +18,16 @@ package org.springframework.http.server.reactive;
import org.junit.Before;
import org.junit.Test;
import static org.springframework.web.client.reactive.HttpRequestBuilders.get;
import static org.springframework.web.client.reactive.WebResponseExtractors.bodyStream;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.get;
import static org.springframework.web.client.reactive.ResponseExtractors.bodyStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer;
import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.client.reactive.WebClient;
/**
@ -39,7 +40,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
@Before
public void setup() throws Exception {
super.setup();
this.webClient = new WebClient(new ReactorHttpClientRequestFactory());
this.webClient = new WebClient(new ReactorClientHttpConnector());
}
@Test

View File

@ -16,33 +16,41 @@
package org.springframework.web.client.reactive;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.http.HttpMethod;
/**
*
* @author Rob Winch
*
*/
public class DefaultHttpRequestBuilderTests {
private DefaultHttpRequestBuilder builder;
public class DefaultWebRequestBuilderTests {
private DefaultClientWebRequestBuilder builder;
@Before
public void setup() {
builder = new DefaultHttpRequestBuilder(HttpMethod.GET, "https://example.com/foo");
builder = new DefaultClientWebRequestBuilder(HttpMethod.GET, "https://example.com/foo");
}
@Test
public void apply() {
RequestPostProcessor postProcessor = mock(RequestPostProcessor.class);
ClientWebRequestPostProcessor postProcessor = mock(ClientWebRequestPostProcessor.class);
when(postProcessor.postProcess(any(ClientWebRequest.class))).thenAnswer(new Answer<ClientWebRequest>() {
@Override
public ClientWebRequest answer(InvocationOnMock invocation) throws Throwable {
return (ClientWebRequest) invocation.getArguments()[0];
}
});
builder.apply(postProcessor);
ClientWebRequest webRequest = builder.apply(postProcessor).build();
verify(postProcessor).postProcess(builder);
verify(postProcessor).postProcess(webRequest);
}
@Test(expected = IllegalArgumentException.class)

View File

@ -17,8 +17,8 @@
package org.springframework.web.client.reactive;
import static org.junit.Assert.*;
import static org.springframework.web.client.reactive.HttpRequestBuilders.*;
import static org.springframework.web.client.reactive.WebResponseExtractors.*;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
import static org.springframework.web.client.reactive.ResponseExtractors.*;
import java.util.function.Consumer;
@ -38,9 +38,11 @@ import org.springframework.http.codec.Pojo;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
/**
* {@link WebClient} integration tests with the {@code Flux} and {@code Mono} API.
*
* @author Brian Clozel
*/
public class WebClientIntegrationTests {
@ -52,7 +54,7 @@ public class WebClientIntegrationTests {
@Before
public void setup() {
this.server = new MockWebServer();
this.webClient = new WebClient(new ReactorHttpClientRequestFactory());
this.webClient = new WebClient(new ReactorClientHttpConnector());
}
@Test
@ -228,12 +230,14 @@ public class WebClientIntegrationTests {
public void shouldPostPojoAsJson() throws Exception {
HttpUrl baseUrl = server.url("/pojo/capitalize");
this.server.enqueue(new MockResponse().setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
this.server.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
Pojo spring = new Pojo("foofoo", "barbar");
Mono<Pojo> result = this.webClient
.perform(post(baseUrl.toString())
.content(spring)
.body(spring)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
@ -252,6 +256,28 @@ public class WebClientIntegrationTests {
assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE));
}
@Test
public void shouldSendCookieHeader() throws Exception {
HttpUrl baseUrl = server.url("/test");
this.server.enqueue(new MockResponse()
.setHeader("Content-Type", "text/plain").setBody("test"));
Mono<String> result = this.webClient
.perform(get(baseUrl.toString())
.cookie("testkey", "testvalue"))
.extract(body(String.class));
TestSubscriber
.subscribe(result)
.awaitAndAssertNextValues("test")
.assertComplete();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/test", request.getPath());
assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE));
}
@Test
public void shouldGetErrorWhen404() throws Exception {
@ -262,7 +288,6 @@ public class WebClientIntegrationTests {
.perform(get(baseUrl.toString()))
.extract(body(String.class));
// TODO: error message should be converted to a ClientException
TestSubscriber
.subscribe(result)
.await()

View File

@ -16,7 +16,11 @@
package org.springframework.web.reactive.result.method.annotation;
import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
import static org.springframework.web.client.reactive.ResponseExtractors.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -29,14 +33,16 @@ import reactor.core.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.codec.StringEncoder;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.SseEventEncoder;
import org.springframework.http.codec.json.JacksonJsonDecoder;
import org.springframework.http.codec.json.JacksonJsonEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
import org.springframework.http.codec.SseEventEncoder;
import org.springframework.http.converter.reactive.CodecHttpMessageConverter;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
@ -49,9 +55,6 @@ import org.springframework.web.reactive.config.WebReactiveConfiguration;
import org.springframework.web.reactive.sse.SseEvent;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import static org.springframework.web.client.reactive.HttpRequestBuilders.get;
import static org.springframework.web.client.reactive.WebResponseExtractors.bodyStream;
/**
* @author Sebastien Deleuze
*/
@ -64,11 +67,12 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Before
public void setup() throws Exception {
super.setup();
this.webClient = new WebClient(new ReactorHttpClientRequestFactory());
this.webClient.setMessageDecoders(Arrays.asList(
new ByteBufferDecoder(),
new StringDecoder(false),
new JacksonJsonDecoder()));
this.webClient = new WebClient(new ReactorClientHttpConnector());
List<HttpMessageConverter<?>> converters = new ArrayList<>();
converters.add(new CodecHttpMessageConverter<>(new ByteBufferEncoder(), new ByteBufferDecoder()));
converters.add(new CodecHttpMessageConverter<>(new StringEncoder(), new StringDecoder(false)));
converters.add(new CodecHttpMessageConverter<>(new JacksonJsonEncoder(), new JacksonJsonDecoder()));
this.webClient.setMessageConverters(converters);
}
@Override