From 1fd6ded7a01395a3c4a52112a92fcc44f3bb9c29 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 12 Feb 2025 11:48:06 +0000 Subject: [PATCH] Polishing Closes gh-34081 --- .../reactive/JdkClientHttpConnector.java | 25 ++++---- .../client/reactive/JdkClientHttpRequest.java | 59 ++++++++++--------- .../reactive/JdkClientHttpResponse.java | 7 ++- .../reactive/JettyClientHttpConnector.java | 12 +++- .../reactive/JettyClientHttpRequest.java | 36 +++++------ .../reactive/JettyClientHttpResponse.java | 6 +- 6 files changed, 79 insertions(+), 66 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java index 72f846f681..b08d4bbf0b 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java @@ -37,6 +37,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseCookie; import org.springframework.util.Assert; +import org.springframework.util.MultiValueMap; /** * {@link ClientHttpConnector} for the Java {@link HttpClient}. @@ -99,7 +100,7 @@ public class JdkClientHttpConnector implements ClientHttpConnector { } /** - * Set the underlying {@code HttpClient}'s read timeout as a {@code Duration}. + * Set the underlying {@code HttpClient} read timeout as a {@code Duration}. *

Default is the system's default timeout. * @since 6.2 * @see java.net.http.HttpRequest.Builder#timeout @@ -126,21 +127,23 @@ public class JdkClientHttpConnector implements ClientHttpConnector { public Mono connect( HttpMethod method, URI uri, Function> requestCallback) { - JdkClientHttpRequest jdkClientHttpRequest = new JdkClientHttpRequest(method, uri, this.bufferFactory, - this.readTimeout); + JdkClientHttpRequest request = + new JdkClientHttpRequest(method, uri, this.bufferFactory, this.readTimeout); - return requestCallback.apply(jdkClientHttpRequest).then(Mono.defer(() -> { - HttpRequest httpRequest = jdkClientHttpRequest.getNativeRequest(); + return requestCallback.apply(request).then(Mono.defer(() -> { + HttpRequest nativeRequest = request.getNativeRequest(); CompletableFuture>>> future = - this.httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofPublisher()); + this.httpClient.sendAsync(nativeRequest, HttpResponse.BodyHandlers.ofPublisher()); - return Mono.fromCompletionStage(future) - .map(response -> { - List headers = response.headers().allValues(HttpHeaders.SET_COOKIE); - return new JdkClientHttpResponse(response, this.bufferFactory, this.cookieParser.parse(headers)); - }); + return Mono.fromCompletionStage(future).map(response -> + new JdkClientHttpResponse(response, this.bufferFactory, parseCookies(response))); })); } + private MultiValueMap parseCookies(HttpResponse response) { + List headers = response.headers().allValues(HttpHeaders.SET_COOKIE); + return this.cookieParser.parse(headers); + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java index 678a2dcf19..4862425fc5 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,8 +59,9 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { private final HttpRequest.Builder builder; - public JdkClientHttpRequest(HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory, - @Nullable Duration readTimeout) { + public JdkClientHttpRequest( + HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory, @Nullable Duration readTimeout) { + Assert.notNull(httpMethod, "HttpMethod is required"); Assert.notNull(uri, "URI is required"); Assert.notNull(bufferFactory, "DataBufferFactory is required"); @@ -97,32 +98,6 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { } - @Override - protected void applyHeaders() { - for (Map.Entry> entry : getHeaders().headerSet()) { - if (entry.getKey().equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) { - // content-length is specified when writing - continue; - } - for (String value : entry.getValue()) { - this.builder.header(entry.getKey(), value); - } - } - if (!getHeaders().containsHeader(HttpHeaders.ACCEPT)) { - this.builder.header(HttpHeaders.ACCEPT, "*/*"); - } - } - - @Override - protected void applyCookies() { - MultiValueMap cookies = getCookies(); - if (cookies.isEmpty()) { - return; - } - this.builder.header(HttpHeaders.COOKIE, cookies.values().stream() - .flatMap(List::stream).map(HttpCookie::toString).collect(Collectors.joining(";"))); - } - @Override public Mono writeWith(Publisher body) { return doCommit(() -> { @@ -162,4 +137,30 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { }); } + @Override + protected void applyHeaders() { + for (Map.Entry> entry : getHeaders().headerSet()) { + if (entry.getKey().equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) { + // content-length is specified when writing + continue; + } + for (String value : entry.getValue()) { + this.builder.header(entry.getKey(), value); + } + } + if (!getHeaders().containsHeader(HttpHeaders.ACCEPT)) { + this.builder.header(HttpHeaders.ACCEPT, "*/*"); + } + } + + @Override + protected void applyCookies() { + MultiValueMap cookies = getCookies(); + if (cookies.isEmpty()) { + return; + } + this.builder.header(HttpHeaders.COOKIE, cookies.values().stream() + .flatMap(List::stream).map(HttpCookie::toString).collect(Collectors.joining(";"))); + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java index b32d132c6c..356abf4fee 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java @@ -47,7 +47,8 @@ import org.springframework.util.MultiValueMap; */ class JdkClientHttpResponse extends AbstractClientHttpResponse { - public JdkClientHttpResponse(HttpResponse>> response, + public JdkClientHttpResponse( + HttpResponse>> response, DataBufferFactory bufferFactory, MultiValueMap cookies) { super(HttpStatusCode.valueOf(response.statusCode()), @@ -63,7 +64,9 @@ class JdkClientHttpResponse extends AbstractClientHttpResponse { return HttpHeaders.readOnlyHttpHeaders(multiValueMap); } - private static Flux adaptBody(HttpResponse>> response, DataBufferFactory bufferFactory) { + private static Flux adaptBody( + HttpResponse>> response, DataBufferFactory bufferFactory) { + return JdkFlowAdapter.flowPublisherToFlux(response.body()) .flatMapIterable(Function.identity()) .map(bufferFactory::wrap) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index dc1c6a7e34..e8a58c0cca 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -22,6 +22,7 @@ import java.util.function.Function; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.reactive.client.ReactiveResponse; import org.jspecify.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,6 +33,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseCookie; import org.springframework.util.Assert; +import org.springframework.util.MultiValueMap; /** * {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient. @@ -126,11 +128,15 @@ public class JettyClientHttpConnector implements ClientHttpConnector { private Mono execute(JettyClientHttpRequest request) { return Mono.fromDirect(request.toReactiveRequest() - .response((reactiveResponse, chunkPublisher) -> { + .response((response, chunkPublisher) -> { Flux content = Flux.from(chunkPublisher).map(this.bufferFactory::wrap); - List headers = reactiveResponse.getHeaders().getValuesList(HttpHeaders.SET_COOKIE); - return Mono.just(new JettyClientHttpResponse(reactiveResponse, content, this.cookieParser.parse(headers))); + return Mono.just(new JettyClientHttpResponse(response, content, parseCookies(response))); })); } + private MultiValueMap parseCookies(ReactiveResponse response) { + List headers = response.getHeaders().getValuesList(HttpHeaders.SET_COOKIE); + return this.cookieParser.parse(headers); + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index 89b8cef4ab..34282d993a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,11 +72,6 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { return this.jettyRequest.getURI(); } - @Override - public Mono setComplete() { - return doCommit(); - } - @Override public DataBufferFactory bufferFactory() { return this.bufferFactory; @@ -88,6 +83,12 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { return (T) this.jettyRequest; } + @Override + protected HttpHeaders initReadOnlyHeaders() { + return HttpHeaders.readOnlyHttpHeaders(new JettyHeadersAdapter(this.jettyRequest.getHeaders())); + } + + @Override public Mono writeWith(Publisher body) { return Mono.create(sink -> { @@ -108,13 +109,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } - private String getContentType() { - MediaType contentType = getHeaders().getContentType(); - return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE; - } - private List toContentChunks(DataBuffer dataBuffer) { - List result = new ArrayList<>(1); DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers(); while (iterator.hasNext()) { @@ -131,11 +126,14 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { return result; } + private String getContentType() { + MediaType contentType = getHeaders().getContentType(); + return (contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE); + } + @Override - protected void applyCookies() { - getCookies().values().stream().flatMap(Collection::stream) - .map(cookie -> HttpCookie.build(cookie.getName(), cookie.getValue()).build()) - .forEach(this.jettyRequest::cookie); + public Mono setComplete() { + return doCommit(); } @Override @@ -150,8 +148,10 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { } @Override - protected HttpHeaders initReadOnlyHeaders() { - return HttpHeaders.readOnlyHttpHeaders(new JettyHeadersAdapter(this.jettyRequest.getHeaders())); + protected void applyCookies() { + getCookies().values().stream().flatMap(Collection::stream) + .map(cookie -> HttpCookie.build(cookie.getName(), cookie.getValue()).build()) + .forEach(this.jettyRequest::cookie); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java index f4be0618e0..bca01f4d5e 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,10 +37,10 @@ import org.springframework.util.MultiValueMap; class JettyClientHttpResponse extends AbstractClientHttpResponse { public JettyClientHttpResponse( - ReactiveResponse reactiveResponse, Flux content, + ReactiveResponse response, Flux content, MultiValueMap cookies) { - super(HttpStatusCode.valueOf(reactiveResponse.getStatus()), adaptHeaders(reactiveResponse), cookies, content); + super(HttpStatusCode.valueOf(response.getStatus()), adaptHeaders(response), cookies, content); } private static HttpHeaders adaptHeaders(ReactiveResponse response) {