diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java new file mode 100644 index 00000000000..4b128b04748 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java @@ -0,0 +1,90 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.util.concurrent.atomic.AtomicBoolean; + +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseCookie; +import org.springframework.util.Assert; +import org.springframework.util.MultiValueMap; + +/** + * Base class for {@link ClientHttpResponse} implementations. + * + * @author Arjen Poutsma + * @since 5.3.32 + */ +public abstract class AbstractClientHttpResponse implements ClientHttpResponse { + + private final HttpStatusCode statusCode; + + private final HttpHeaders headers; + + private final MultiValueMap cookies; + + private final Flux body; + + + + protected AbstractClientHttpResponse(HttpStatusCode statusCode, HttpHeaders headers, + MultiValueMap cookies, Flux body) { + + Assert.notNull(statusCode, "StatusCode must not be null"); + Assert.notNull(headers, "Headers must not be null"); + Assert.notNull(body, "Body must not be null"); + + this.statusCode = statusCode; + this.headers = headers; + this.cookies = cookies; + this.body = singleSubscription(body); + } + + private static Flux singleSubscription(Flux body) { + AtomicBoolean subscribed = new AtomicBoolean(); + return body.doOnSubscribe(s -> { + if (!subscribed.compareAndSet(false, true)) { + throw new IllegalStateException("The client response body can only be consumed once"); + } + }); + } + + + @Override + public HttpStatusCode getStatusCode() { + return this.statusCode; + } + + @Override + public HttpHeaders getHeaders() { + return this.headers; + } + + @Override + public MultiValueMap getCookies() { + return this.cookies; + } + + @Override + public Flux getBody() { + return this.body; + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java index 45cd63bd790..d64219e195a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.http.client.reactive; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hc.client5.http.cookie.Cookie; import org.apache.hc.client5.http.protocol.HttpClientContext; @@ -26,7 +25,6 @@ import org.apache.hc.core5.http.Message; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatusCode; @@ -42,40 +40,22 @@ import org.springframework.util.MultiValueMap; * @since 5.3 * @see Apache HttpComponents */ -class HttpComponentsClientHttpResponse implements ClientHttpResponse { - - private final DataBufferFactory dataBufferFactory; - - private final Message> message; - - private final HttpHeaders headers; - - private final HttpClientContext context; - - private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); +class HttpComponentsClientHttpResponse extends AbstractClientHttpResponse { public HttpComponentsClientHttpResponse(DataBufferFactory dataBufferFactory, Message> message, HttpClientContext context) { - this.dataBufferFactory = dataBufferFactory; - this.message = message; - this.context = context; - - MultiValueMap adapter = new HttpComponentsHeadersAdapter(message.getHead()); - this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + super(HttpStatusCode.valueOf(message.getHead().getCode()), + HttpHeaders.readOnlyHttpHeaders(new HttpComponentsHeadersAdapter(message.getHead())), + adaptCookies(context), + Flux.from(message.getBody()).map(dataBufferFactory::wrap) + ); } - - @Override - public HttpStatusCode getStatusCode() { - return HttpStatusCode.valueOf(this.message.getHead().getCode()); - } - - @Override - public MultiValueMap getCookies() { + private static MultiValueMap adaptCookies(HttpClientContext context) { LinkedMultiValueMap result = new LinkedMultiValueMap<>(); - this.context.getCookieStore().getCookies().forEach(cookie -> + context.getCookieStore().getCookies().forEach(cookie -> result.add(cookie.getName(), ResponseCookie.fromClientResponse(cookie.getName(), cookie.getValue()) .domain(cookie.getDomain()) @@ -88,25 +68,10 @@ class HttpComponentsClientHttpResponse implements ClientHttpResponse { return result; } - private long getMaxAgeSeconds(Cookie cookie) { + private static long getMaxAgeSeconds(Cookie cookie) { String maxAgeAttribute = cookie.getAttribute(Cookie.MAX_AGE_ATTR); return (maxAgeAttribute != null ? Long.parseLong(maxAgeAttribute) : -1); } - @Override - public Flux getBody() { - return Flux.from(this.message.getBody()) - .doOnSubscribe(s -> { - if (!this.rejectSubscribers.compareAndSet(false, true)) { - throw new IllegalStateException("The client response body can only be consumed once."); - } - }) - .map(this.dataBufferFactory::wrap); - } - - @Override - public HttpHeaders getHeaders() { - return this.headers; - } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java index e0872ffd3c9..5574ac42fe1 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,24 +50,20 @@ import org.springframework.util.MultiValueMap; * @author Rossen Stoyanchev * @since 6.0 */ -class JdkClientHttpResponse implements ClientHttpResponse { +class JdkClientHttpResponse extends AbstractClientHttpResponse { private static final Pattern SAME_SITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*"); - private final HttpResponse>> response; - private final DataBufferFactory bufferFactory; + public JdkClientHttpResponse(HttpResponse>> response, + DataBufferFactory bufferFactory) { - private final HttpHeaders headers; - - - public JdkClientHttpResponse( - HttpResponse>> response, DataBufferFactory bufferFactory) { - - this.response = response; - this.bufferFactory = bufferFactory; - this.headers = adaptHeaders(response); + super(HttpStatusCode.valueOf(response.statusCode()), + adaptHeaders(response), + adaptCookies(response), + adaptBody(response, bufferFactory) + ); } private static HttpHeaders adaptHeaders(HttpResponse>> response) { @@ -78,20 +74,8 @@ class JdkClientHttpResponse implements ClientHttpResponse { return HttpHeaders.readOnlyHttpHeaders(multiValueMap); } - - @Override - public HttpStatusCode getStatusCode() { - return HttpStatusCode.valueOf(this.response.statusCode()); - } - - @Override - public HttpHeaders getHeaders() { - return this.headers; - } - - @Override - public MultiValueMap getCookies() { - return this.response.headers().allValues(HttpHeaders.SET_COOKIE).stream() + private static MultiValueMap adaptCookies(HttpResponse>> response) { + return response.headers().allValues(HttpHeaders.SET_COOKIE).stream() .flatMap(header -> { Matcher matcher = SAME_SITE_PATTERN.matcher(header); String sameSite = (matcher.matches() ? matcher.group(1) : null); @@ -102,7 +86,7 @@ class JdkClientHttpResponse implements ClientHttpResponse { LinkedMultiValueMap::addAll); } - private ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String sameSite) { + private static ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String sameSite) { return ResponseCookie.from(cookie.getName(), cookie.getValue()) .domain(cookie.getDomain()) .httpOnly(cookie.isHttpOnly()) @@ -113,12 +97,12 @@ class JdkClientHttpResponse implements ClientHttpResponse { .build(); } - @Override - public Flux getBody() { - return JdkFlowAdapter.flowPublisherToFlux(this.response.body()) + private static Flux adaptBody(HttpResponse>> response, DataBufferFactory bufferFactory) { + return JdkFlowAdapter.flowPublisherToFlux(response.body()) .flatMapIterable(Function.identity()) - .map(this.bufferFactory::wrap) - .doOnDiscard(DataBuffer.class, DataBufferUtils::release); + .map(bufferFactory::wrap) + .doOnDiscard(DataBuffer.class, DataBufferUtils::release) + .cache(0); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java index 7d0e4e73190..89a47fd56a3 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.reactive.client.ReactiveResponse; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; @@ -42,49 +42,37 @@ import org.springframework.util.MultiValueMap; * @see * Jetty ReactiveStreams HttpClient */ -class JettyClientHttpResponse implements ClientHttpResponse { +class JettyClientHttpResponse extends AbstractClientHttpResponse { private static final Pattern SAME_SITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*"); - private final ReactiveResponse reactiveResponse; + public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Flux content) { - private final Flux content; - - private final HttpHeaders headers; - - - public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher content) { - this.reactiveResponse = reactiveResponse; - this.content = Flux.from(content); - - MultiValueMap headers = new JettyHeadersAdapter(reactiveResponse.getHeaders()); - this.headers = HttpHeaders.readOnlyHttpHeaders(headers); + super(HttpStatusCode.valueOf(reactiveResponse.getStatus()), + adaptHeaders(reactiveResponse), + adaptCookies(reactiveResponse), + content); } - - @Override - public HttpStatusCode getStatusCode() { - return HttpStatusCode.valueOf(this.reactiveResponse.getStatus()); + private static HttpHeaders adaptHeaders(ReactiveResponse response) { + MultiValueMap headers = new JettyHeadersAdapter(response.getHeaders()); + return HttpHeaders.readOnlyHttpHeaders(headers); } - - @Override - public MultiValueMap getCookies() { + private static MultiValueMap adaptCookies(ReactiveResponse response) { MultiValueMap result = new LinkedMultiValueMap<>(); - List cookieHeader = getHeaders().get(HttpHeaders.SET_COOKIE); - if (cookieHeader != null) { - cookieHeader.forEach(header -> - HttpCookie.parse(header).forEach(cookie -> result.add(cookie.getName(), + List cookieHeaders = response.getHeaders().getFields(HttpHeaders.SET_COOKIE); + cookieHeaders.forEach(header -> + HttpCookie.parse(header.getValue()).forEach(cookie -> result.add(cookie.getName(), ResponseCookie.fromClientResponse(cookie.getName(), cookie.getValue()) .domain(cookie.getDomain()) .path(cookie.getPath()) .maxAge(cookie.getMaxAge()) .secure(cookie.getSecure()) .httpOnly(cookie.isHttpOnly()) - .sameSite(parseSameSite(header)) + .sameSite(parseSameSite(header.getValue())) .build())) ); - } return CollectionUtils.unmodifiableMultiValueMap(result); } @@ -94,15 +82,4 @@ class JettyClientHttpResponse implements ClientHttpResponse { return (matcher.matches() ? matcher.group(1) : null); } - - @Override - public Flux getBody() { - return this.content; - } - - @Override - public HttpHeaders getHeaders() { - return this.headers; - } - } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index a55db81ffa3..2062e097d57 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -481,7 +481,7 @@ class WebClientIntegrationTests { .retrieve() .bodyToMono(Map.class); - StepVerifier.create(result).verifyComplete(); + StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(3)); } @ParameterizedWebClientTest // SPR-15946 @@ -808,7 +808,7 @@ class WebClientIntegrationTests { MyException error = (MyException) throwable; assertThat(error.getMessage()).isEqualTo("foofoo"); }) - .verify(); + .verify(Duration.ofSeconds(3)); } @ParameterizedWebClientTest @@ -850,7 +850,7 @@ class WebClientIntegrationTests { StepVerifier.create(result) .expectNext("Internal Server error") - .verifyComplete(); + .expectComplete().verify(Duration.ofSeconds(3)); expectRequestCount(1); expectRequest(request -> { @@ -880,7 +880,7 @@ class WebClientIntegrationTests { StepVerifier.create(result) .expectNext("Internal Server error") - .verifyComplete(); + .expectComplete().verify(Duration.ofSeconds(3)); expectRequestCount(1); expectRequest(request -> { @@ -1038,7 +1038,7 @@ class WebClientIntegrationTests { StepVerifier.create(result) .assertNext(r -> assertThat(r.getStatusCode().is2xxSuccessful()).isTrue()) - .verifyComplete(); + .expectComplete().verify(Duration.ofSeconds(3)); } @ParameterizedWebClientTest @@ -1207,7 +1207,7 @@ class WebClientIntegrationTests { WebClientException ex = (WebClientException) throwable; assertThat(ex.getCause()).isInstanceOf(IOException.class); }) - .verify(); + .verify(Duration.ofSeconds(3)); } @ParameterizedWebClientTest @@ -1219,7 +1219,7 @@ class WebClientIntegrationTests { WebClientException ex = (WebClientException) throwable; assertThat(ex.getCause()).isInstanceOf(IOException.class); }) - .verify(); + .verify(Duration.ofSeconds(3)); } @ParameterizedWebClientTest