Polishing

Closes gh-34081
This commit is contained in:
rstoyanchev 2025-02-12 11:48:06 +00:00
parent 667004e5fa
commit 1fd6ded7a0
6 changed files with 79 additions and 66 deletions

View File

@ -37,6 +37,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
/** /**
* {@link ClientHttpConnector} for the Java {@link HttpClient}. * {@link ClientHttpConnector} for the Java {@link HttpClient}.
@ -99,7 +100,7 @@ public class JdkClientHttpConnector implements ClientHttpConnector {
} }
/** /**
* Set the underlying {@code HttpClient}'s read timeout as a {@code Duration}. * Set the underlying {@code HttpClient} read timeout as a {@code Duration}.
* <p>Default is the system's default timeout. * <p>Default is the system's default timeout.
* @since 6.2 * @since 6.2
* @see java.net.http.HttpRequest.Builder#timeout * @see java.net.http.HttpRequest.Builder#timeout
@ -126,21 +127,23 @@ public class JdkClientHttpConnector implements ClientHttpConnector {
public Mono<ClientHttpResponse> connect( public Mono<ClientHttpResponse> connect(
HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
JdkClientHttpRequest jdkClientHttpRequest = new JdkClientHttpRequest(method, uri, this.bufferFactory, JdkClientHttpRequest request =
this.readTimeout); new JdkClientHttpRequest(method, uri, this.bufferFactory, this.readTimeout);
return requestCallback.apply(jdkClientHttpRequest).then(Mono.defer(() -> { return requestCallback.apply(request).then(Mono.defer(() -> {
HttpRequest httpRequest = jdkClientHttpRequest.getNativeRequest(); HttpRequest nativeRequest = request.getNativeRequest();
CompletableFuture<HttpResponse<Flow.Publisher<List<ByteBuffer>>>> future = CompletableFuture<HttpResponse<Flow.Publisher<List<ByteBuffer>>>> future =
this.httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofPublisher()); this.httpClient.sendAsync(nativeRequest, HttpResponse.BodyHandlers.ofPublisher());
return Mono.fromCompletionStage(future) return Mono.fromCompletionStage(future).map(response ->
.map(response -> { new JdkClientHttpResponse(response, this.bufferFactory, parseCookies(response)));
List<String> headers = response.headers().allValues(HttpHeaders.SET_COOKIE);
return new JdkClientHttpResponse(response, this.bufferFactory, this.cookieParser.parse(headers));
});
})); }));
} }
private MultiValueMap<String, ResponseCookie> parseCookies(HttpResponse<?> response) {
List<String> headers = response.headers().allValues(HttpHeaders.SET_COOKIE);
return this.cookieParser.parse(headers);
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2024 the original author or authors. * Copyright 2002-2025 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -59,8 +59,9 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
private final HttpRequest.Builder builder; private final HttpRequest.Builder builder;
public JdkClientHttpRequest(HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory, public JdkClientHttpRequest(
@Nullable Duration readTimeout) { HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory, @Nullable Duration readTimeout) {
Assert.notNull(httpMethod, "HttpMethod is required"); Assert.notNull(httpMethod, "HttpMethod is required");
Assert.notNull(uri, "URI is required"); Assert.notNull(uri, "URI is required");
Assert.notNull(bufferFactory, "DataBufferFactory is required"); Assert.notNull(bufferFactory, "DataBufferFactory is required");
@ -97,32 +98,6 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
} }
@Override
protected void applyHeaders() {
for (Map.Entry<String, List<String>> entry : getHeaders().headerSet()) {
if (entry.getKey().equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) {
// content-length is specified when writing
continue;
}
for (String value : entry.getValue()) {
this.builder.header(entry.getKey(), value);
}
}
if (!getHeaders().containsHeader(HttpHeaders.ACCEPT)) {
this.builder.header(HttpHeaders.ACCEPT, "*/*");
}
}
@Override
protected void applyCookies() {
MultiValueMap<String, HttpCookie> cookies = getCookies();
if (cookies.isEmpty()) {
return;
}
this.builder.header(HttpHeaders.COOKIE, cookies.values().stream()
.flatMap(List::stream).map(HttpCookie::toString).collect(Collectors.joining(";")));
}
@Override @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> { return doCommit(() -> {
@ -162,4 +137,30 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
}); });
} }
@Override
protected void applyHeaders() {
for (Map.Entry<String, List<String>> entry : getHeaders().headerSet()) {
if (entry.getKey().equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) {
// content-length is specified when writing
continue;
}
for (String value : entry.getValue()) {
this.builder.header(entry.getKey(), value);
}
}
if (!getHeaders().containsHeader(HttpHeaders.ACCEPT)) {
this.builder.header(HttpHeaders.ACCEPT, "*/*");
}
}
@Override
protected void applyCookies() {
MultiValueMap<String, HttpCookie> cookies = getCookies();
if (cookies.isEmpty()) {
return;
}
this.builder.header(HttpHeaders.COOKIE, cookies.values().stream()
.flatMap(List::stream).map(HttpCookie::toString).collect(Collectors.joining(";")));
}
} }

View File

@ -47,7 +47,8 @@ import org.springframework.util.MultiValueMap;
*/ */
class JdkClientHttpResponse extends AbstractClientHttpResponse { class JdkClientHttpResponse extends AbstractClientHttpResponse {
public JdkClientHttpResponse(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, public JdkClientHttpResponse(
HttpResponse<Flow.Publisher<List<ByteBuffer>>> response,
DataBufferFactory bufferFactory, MultiValueMap<String, ResponseCookie> cookies) { DataBufferFactory bufferFactory, MultiValueMap<String, ResponseCookie> cookies) {
super(HttpStatusCode.valueOf(response.statusCode()), super(HttpStatusCode.valueOf(response.statusCode()),
@ -63,7 +64,9 @@ class JdkClientHttpResponse extends AbstractClientHttpResponse {
return HttpHeaders.readOnlyHttpHeaders(multiValueMap); return HttpHeaders.readOnlyHttpHeaders(multiValueMap);
} }
private static Flux<DataBuffer> adaptBody(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, DataBufferFactory bufferFactory) { private static Flux<DataBuffer> adaptBody(
HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, DataBufferFactory bufferFactory) {
return JdkFlowAdapter.flowPublisherToFlux(response.body()) return JdkFlowAdapter.flowPublisherToFlux(response.body())
.flatMapIterable(Function.identity()) .flatMapIterable(Function.identity())
.map(bufferFactory::wrap) .map(bufferFactory::wrap)

View File

@ -22,6 +22,7 @@ import java.util.function.Function;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.jspecify.annotations.Nullable; import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -32,6 +33,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
/** /**
* {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient. * {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient.
@ -126,11 +128,15 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
private Mono<ClientHttpResponse> execute(JettyClientHttpRequest request) { private Mono<ClientHttpResponse> execute(JettyClientHttpRequest request) {
return Mono.fromDirect(request.toReactiveRequest() return Mono.fromDirect(request.toReactiveRequest()
.response((reactiveResponse, chunkPublisher) -> { .response((response, chunkPublisher) -> {
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this.bufferFactory::wrap); Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this.bufferFactory::wrap);
List<String> headers = reactiveResponse.getHeaders().getValuesList(HttpHeaders.SET_COOKIE); return Mono.just(new JettyClientHttpResponse(response, content, parseCookies(response)));
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content, this.cookieParser.parse(headers)));
})); }));
} }
private MultiValueMap<String, ResponseCookie> parseCookies(ReactiveResponse response) {
List<String> headers = response.getHeaders().getValuesList(HttpHeaders.SET_COOKIE);
return this.cookieParser.parse(headers);
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2024 the original author or authors. * Copyright 2002-2025 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -72,11 +72,6 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
return this.jettyRequest.getURI(); return this.jettyRequest.getURI();
} }
@Override
public Mono<Void> setComplete() {
return doCommit();
}
@Override @Override
public DataBufferFactory bufferFactory() { public DataBufferFactory bufferFactory() {
return this.bufferFactory; return this.bufferFactory;
@ -88,6 +83,12 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
return (T) this.jettyRequest; return (T) this.jettyRequest;
} }
@Override
protected HttpHeaders initReadOnlyHeaders() {
return HttpHeaders.readOnlyHttpHeaders(new JettyHeadersAdapter(this.jettyRequest.getHeaders()));
}
@Override @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return Mono.<Void>create(sink -> { return Mono.<Void>create(sink -> {
@ -108,13 +109,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)); .doOnDiscard(DataBuffer.class, DataBufferUtils::release));
} }
private String getContentType() {
MediaType contentType = getHeaders().getContentType();
return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE;
}
private List<Content.Chunk> toContentChunks(DataBuffer dataBuffer) { private List<Content.Chunk> toContentChunks(DataBuffer dataBuffer) {
List<Content.Chunk> result = new ArrayList<>(1); List<Content.Chunk> result = new ArrayList<>(1);
DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers(); DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers();
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -131,11 +126,14 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
return result; return result;
} }
private String getContentType() {
MediaType contentType = getHeaders().getContentType();
return (contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE);
}
@Override @Override
protected void applyCookies() { public Mono<Void> setComplete() {
getCookies().values().stream().flatMap(Collection::stream) return doCommit();
.map(cookie -> HttpCookie.build(cookie.getName(), cookie.getValue()).build())
.forEach(this.jettyRequest::cookie);
} }
@Override @Override
@ -150,8 +148,10 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
} }
@Override @Override
protected HttpHeaders initReadOnlyHeaders() { protected void applyCookies() {
return HttpHeaders.readOnlyHttpHeaders(new JettyHeadersAdapter(this.jettyRequest.getHeaders())); getCookies().values().stream().flatMap(Collection::stream)
.map(cookie -> HttpCookie.build(cookie.getName(), cookie.getValue()).build())
.forEach(this.jettyRequest::cookie);
} }
@Override @Override

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2024 the original author or authors. * Copyright 2002-2025 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -37,10 +37,10 @@ import org.springframework.util.MultiValueMap;
class JettyClientHttpResponse extends AbstractClientHttpResponse { class JettyClientHttpResponse extends AbstractClientHttpResponse {
public JettyClientHttpResponse( public JettyClientHttpResponse(
ReactiveResponse reactiveResponse, Flux<DataBuffer> content, ReactiveResponse response, Flux<DataBuffer> content,
MultiValueMap<String, ResponseCookie> cookies) { MultiValueMap<String, ResponseCookie> cookies) {
super(HttpStatusCode.valueOf(reactiveResponse.getStatus()), adaptHeaders(reactiveResponse), cookies, content); super(HttpStatusCode.valueOf(response.getStatus()), adaptHeaders(response), cookies, content);
} }
private static HttpHeaders adaptHeaders(ReactiveResponse response) { private static HttpHeaders adaptHeaders(ReactiveResponse response) {