Refactoring in the JDK HttpClient support

See gh-23432
This commit is contained in:
Rossen Stoyanchev 2021-11-19 07:28:59 +00:00
parent dcc7154641
commit b3b50f8f4b
3 changed files with 82 additions and 84 deletions

View File

@ -18,6 +18,12 @@ package org.springframework.http.client.reactive;
import java.net.URI; import java.net.URI;
import java.net.http.HttpClient; import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Function; import java.util.function.Function;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -27,9 +33,10 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
/** /**
* {@link ClientHttpConnector} for Java's {@link HttpClient}. * {@link ClientHttpConnector} for the Java {@link HttpClient}.
* *
* @author Julien Eyraud * @author Julien Eyraud
* @author Rossen Stoyanchev
* @since 6.0 * @since 6.0
* @see <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpClient.html">HttpClient</a> * @see <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpClient.html">HttpClient</a>
*/ */
@ -60,8 +67,17 @@ public class JdkClientHttpConnector implements ClientHttpConnector {
public Mono<ClientHttpResponse> connect( public Mono<ClientHttpResponse> connect(
HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
JdkClientHttpRequest request = new JdkClientHttpRequest(this.httpClient, method, uri, this.bufferFactory); JdkClientHttpRequest jdkClientHttpRequest = new JdkClientHttpRequest(method, uri, this.bufferFactory);
return requestCallback.apply(request).then(Mono.defer(request::getResponse));
return requestCallback.apply(jdkClientHttpRequest).then(Mono.defer(() -> {
HttpRequest httpRequest = jdkClientHttpRequest.getNativeRequest();
CompletableFuture<HttpResponse<Flow.Publisher<List<ByteBuffer>>>> future =
this.httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofPublisher());
return Mono.fromCompletionStage(future)
.map(response -> new JdkClientHttpResponse(response, this.bufferFactory));
}));
} }
} }

View File

@ -19,11 +19,9 @@ package org.springframework.http.client.reactive;
import java.net.URI; import java.net.URI;
import java.net.http.HttpClient; import java.net.http.HttpClient;
import java.net.http.HttpRequest; import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -35,50 +33,38 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* {@link ClientHttpRequest} implementation for Java's {@link HttpClient}. * {@link ClientHttpRequest} for the Java {@link HttpClient}.
* *
* @author Julien Eyraud * @author Julien Eyraud
* @author Rossen Stoyanchev
* @since 6.0 * @since 6.0
*/ */
class JdkClientHttpRequest extends AbstractClientHttpRequest { class JdkClientHttpRequest extends AbstractClientHttpRequest {
private static final Set<String> DISALLOWED_HEADERS =
Set.of("connection", "content-length", "date", "expect", "from", "host", "upgrade", "via", "warning");
private final HttpClient httpClient;
private final HttpMethod method; private final HttpMethod method;
private final URI uri; private final URI uri;
private final HttpRequest.Builder builder;
private final DataBufferFactory bufferFactory; private final DataBufferFactory bufferFactory;
@Nullable private final HttpRequest.Builder builder;
private Mono<ClientHttpResponse> response;
public JdkClientHttpRequest( public JdkClientHttpRequest(HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory) {
HttpClient httpClient, HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory) { Assert.notNull(httpMethod, "HttpMethod is required");
Assert.notNull(uri, "URI is required");
Assert.notNull(bufferFactory, "DataBufferFactory is required");
Assert.notNull(httpClient, "HttpClient should not be null");
Assert.notNull(httpMethod, "HttpMethod should not be null");
Assert.notNull(uri, "URI should not be null");
Assert.notNull(bufferFactory, "DataBufferFactory should not be null");
this.httpClient = httpClient;
this.method = httpMethod; this.method = httpMethod;
this.uri = uri; this.uri = uri;
this.builder = HttpRequest.newBuilder(uri);
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
this.builder = HttpRequest.newBuilder(uri);
} }
@ -103,20 +89,16 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
return (T) this.builder.build(); return (T) this.builder.build();
} }
Mono<ClientHttpResponse> getResponse() {
Assert.notNull(this.response, "Response is not set");
return this.response;
}
@Override @Override
protected void applyHeaders() { protected void applyHeaders() {
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) { for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {
if (DISALLOWED_HEADERS.contains(header.getKey().toLowerCase())) { if (entry.getKey().equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) {
// content-length is specified when writing
continue; continue;
} }
for (String value : header.getValue()) { for (String value : entry.getValue()) {
this.builder.header(header.getKey(), value); this.builder.header(entry.getKey(), value);
} }
} }
if (!getHeaders().containsKey(HttpHeaders.ACCEPT)) { if (!getHeaders().containsKey(HttpHeaders.ACCEPT)) {
@ -126,33 +108,30 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
@Override @Override
protected void applyCookies() { protected void applyCookies() {
this.builder.header(HttpHeaders.COOKIE, this.builder.header(HttpHeaders.COOKIE, getCookies().values().stream()
getCookies().values().stream() .flatMap(List::stream).map(HttpCookie::toString).collect(Collectors.joining(";")));
.flatMap(List::stream)
.map(cookie -> cookie.getName() + "=" + cookie.getValue())
.collect(Collectors.joining("; ")));
} }
@Override @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> { return doCommit(() -> {
Flow.Publisher<ByteBuffer> flow = this.builder.method(this.method.name(), toBodyPublisher(body));
JdkFlowAdapter.publisherToFlowPublisher(Flux.from(body).map(DataBuffer::asByteBuffer));
HttpRequest.BodyPublisher bodyPublisher = (getHeaders().getContentLength() >= 0 ?
HttpRequest.BodyPublishers.fromPublisher(flow, getHeaders().getContentLength()) :
HttpRequest.BodyPublishers.fromPublisher(flow));
this.response = Mono.fromCompletionStage(() -> {
HttpRequest request = this.builder.method(this.method.name(), bodyPublisher).build();
return this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher());
})
.map(response -> new JdkClientHttpResponse(response, this.bufferFactory));
return Mono.empty(); return Mono.empty();
}); });
} }
private HttpRequest.BodyPublisher toBodyPublisher(Publisher<? extends DataBuffer> body) {
Publisher<ByteBuffer> byteBufferBody = (body instanceof Mono ?
Mono.from(body).map(DataBuffer::asByteBuffer) :
Flux.from(body).map(DataBuffer::asByteBuffer));
Flow.Publisher<ByteBuffer> bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody);
return (getHeaders().getContentLength() > 0 ?
HttpRequest.BodyPublishers.fromPublisher(bodyFlow, getHeaders().getContentLength()) :
HttpRequest.BodyPublishers.fromPublisher(bodyFlow));
}
@Override @Override
public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) { public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(Function.identity())); return writeWith(Flux.from(body).flatMap(Function.identity()));
@ -160,18 +139,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
@Override @Override
public Mono<Void> setComplete() { public Mono<Void> setComplete() {
if (isCommitted()) {
return Mono.empty();
}
return doCommit(() -> { return doCommit(() -> {
this.response = Mono.fromCompletionStage(() -> { this.builder.method(this.method.name(), HttpRequest.BodyPublishers.noBody());
HttpRequest.BodyPublisher bodyPublisher = HttpRequest.BodyPublishers.noBody();
HttpRequest request = this.builder.method(this.method.name(), bodyPublisher).build();
return this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher());
})
.map(response -> new JdkClientHttpResponse(response, this.bufferFactory));
return Mono.empty(); return Mono.empty();
}); });
} }

View File

@ -21,6 +21,8 @@ import java.net.http.HttpClient;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -36,13 +38,16 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedCaseInsensitiveMap;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
/** /**
* {@link ClientHttpResponse} implementation for Java's {@link HttpClient}. * {@link ClientHttpResponse} for the Java {@link HttpClient}.
* *
* @author Julien Eyraud * @author Julien Eyraud
* @author Rossen Stoyanchev
* @since 6.0 * @since 6.0
*/ */
class JdkClientHttpResponse implements ClientHttpResponse { class JdkClientHttpResponse implements ClientHttpResponse {
@ -54,12 +59,23 @@ class JdkClientHttpResponse implements ClientHttpResponse {
private final DataBufferFactory bufferFactory; private final DataBufferFactory bufferFactory;
private final HttpHeaders headers;
public JdkClientHttpResponse( public JdkClientHttpResponse(
HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, DataBufferFactory bufferFactory) { HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, DataBufferFactory bufferFactory) {
this.response = response; this.response = response;
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
this.headers = adaptHeaders(response);
}
private static HttpHeaders adaptHeaders(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) {
Map<String, List<String>> rawHeaders = response.headers().map();
Map<String, List<String>> map = new LinkedCaseInsensitiveMap<>(rawHeaders.size(), Locale.ENGLISH);
MultiValueMap<String, String> multiValueMap = CollectionUtils.toMultiValueMap(map);
multiValueMap.putAll(rawHeaders);
return HttpHeaders.readOnlyHttpHeaders(multiValueMap);
} }
@ -75,34 +91,31 @@ class JdkClientHttpResponse implements ClientHttpResponse {
@Override @Override
public HttpHeaders getHeaders() { public HttpHeaders getHeaders() {
return this.response.headers().map().entrySet().stream() return this.headers;
.collect(HttpHeaders::new,
(headers, entry) -> headers.addAll(entry.getKey(), entry.getValue()),
HttpHeaders::addAll);
} }
@Override @Override
public MultiValueMap<String, ResponseCookie> getCookies() { public MultiValueMap<String, ResponseCookie> getCookies() {
return this.response.headers().allValues(HttpHeaders.SET_COOKIE).stream() return this.response.headers().allValues(HttpHeaders.SET_COOKIE).stream()
.flatMap(header -> .flatMap(header -> {
HttpCookie.parse(header).stream().map(cookie -> Matcher matcher = SAME_SITE_PATTERN.matcher(header);
ResponseCookie.from(cookie.getName(), cookie.getValue()) String sameSite = (matcher.matches() ? matcher.group(1) : null);
return HttpCookie.parse(header).stream().map(cookie -> toResponseCookie(cookie, sameSite));
})
.collect(LinkedMultiValueMap::new,
(cookies, cookie) -> cookies.add(cookie.getName(), cookie),
LinkedMultiValueMap::addAll);
}
private ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String sameSite) {
return ResponseCookie.from(cookie.getName(), cookie.getValue())
.domain(cookie.getDomain()) .domain(cookie.getDomain())
.httpOnly(cookie.isHttpOnly()) .httpOnly(cookie.isHttpOnly())
.maxAge(cookie.getMaxAge()) .maxAge(cookie.getMaxAge())
.path(cookie.getPath()) .path(cookie.getPath())
.secure(cookie.getSecure()) .secure(cookie.getSecure())
.sameSite(parseSameSite(header)) .sameSite(sameSite)
.build())) .build();
.collect(LinkedMultiValueMap::new,
(valueMap, cookie) -> valueMap.add(cookie.getName(), cookie),
LinkedMultiValueMap::addAll);
}
@Nullable
private static String parseSameSite(String headerValue) {
Matcher matcher = SAME_SITE_PATTERN.matcher(headerValue);
return (matcher.matches() ? matcher.group(1) : null);
} }
@Override @Override