diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java new file mode 100644 index 00000000000..a3738d775ea --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java @@ -0,0 +1,164 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.reactivestreams.FlowAdapters; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.netty.NettyOutbound; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientRequest; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.lang.Nullable; +import org.springframework.util.StreamUtils; + +/** + * {@link ClientHttpRequest} implementation for the Reactor-Netty HTTP client. + * Created via the {@link ReactorNettyClientRequestFactory}. + * + * @author Arjen Poutsma + * @since 6.1 + */ +final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest { + + private final HttpClient httpClient; + + private final HttpMethod method; + + private final URI uri; + + private final Duration exchangeTimeout; + + private final Duration readTimeout; + + + public ReactorNettyClientRequest(HttpClient httpClient, URI uri, HttpMethod method, + Duration exchangeTimeout, Duration readTimeout) { + + this.httpClient = httpClient; + this.method = method; + this.uri = uri; + this.exchangeTimeout = exchangeTimeout; + this.readTimeout = readTimeout; + } + + + @Override + public HttpMethod getMethod() { + return this.method; + } + + @Override + public URI getURI() { + return this.uri; + } + + + @Override + protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body body) throws IOException { + HttpClient.RequestSender requestSender = this.httpClient + .request(io.netty.handler.codec.http.HttpMethod.valueOf(this.method.name())); + + requestSender = (this.uri.isAbsolute() ? requestSender.uri(this.uri) : requestSender.uri(this.uri.toString())); + + try { + ReactorNettyClientResponse result = requestSender.send((reactorRequest, nettyOutbound) -> + send(headers, body, reactorRequest, nettyOutbound)) + .responseConnection((reactorResponse, connection) -> + Mono.just(new ReactorNettyClientResponse(reactorResponse, connection, this.readTimeout))) + .next() + .block(this.exchangeTimeout); + + if (result == null) { + throw new IOException("HTTP exchange resulted in no result"); + } + else { + return result; + } + } + catch (RuntimeException ex) { // Exceptions.ReactiveException is package private + Throwable cause = ex.getCause(); + + if (cause instanceof UncheckedIOException uioEx) { + throw uioEx.getCause(); + } + else if (cause instanceof IOException ioEx) { + throw ioEx; + } + else { + throw ex; + } + } + } + + private Publisher send(HttpHeaders headers, @Nullable Body body, + HttpClientRequest reactorRequest, NettyOutbound nettyOutbound) { + + headers.forEach((key, value) -> reactorRequest.requestHeaders().set(key, value)); + + if (body != null) { + AtomicReference executor = new AtomicReference<>(); + + return nettyOutbound + .withConnection(connection -> executor.set(connection.channel().eventLoop())) + .send(FlowAdapters.toPublisher(OutputStreamPublisher.create( + outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)), + new ByteBufMapper(nettyOutbound.alloc()), + executor.getAndSet(null)))); + } + else { + return nettyOutbound; + } + } + + + private static final class ByteBufMapper implements OutputStreamPublisher.ByteMapper { + + private final ByteBufAllocator allocator; + + + public ByteBufMapper(ByteBufAllocator allocator) { + this.allocator = allocator; + } + + + @Override + public ByteBuf map(int b) { + ByteBuf byteBuf = this.allocator.buffer(1); + byteBuf.writeByte(b); + return byteBuf; + } + + @Override + public ByteBuf map(byte[] b, int off, int len) { + ByteBuf byteBuf = this.allocator.buffer(len); + byteBuf.writeBytes(b, off, len); + return byteBuf; + } + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java new file mode 100644 index 00000000000..0c3172dab08 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java @@ -0,0 +1,133 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; + +import io.netty.channel.ChannelOption; +import reactor.netty.http.client.HttpClient; + +import org.springframework.http.HttpMethod; +import org.springframework.util.Assert; + +/** + * Reactor-Netty implementation of {@link ClientHttpRequestFactory}. + * + * @author Arjen Poutsma + * @since 6.1 + */ +public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory { + + private final HttpClient httpClient; + + + private Duration exchangeTimeout = Duration.ofSeconds(5); + + private Duration readTimeout = Duration.ofSeconds(10); + + + + /** + * Create a new instance of the {@code ReactorNettyClientRequestFactory} + * with a default {@link HttpClient} that has compression enabled. + */ + public ReactorNettyClientRequestFactory() { + this(HttpClient.create().compress(true)); + } + + /** + * Create a new instance of the {@code ReactorNettyClientRequestFactory} + * based on the given {@link HttpClient}. + * @param httpClient the client to base on + */ + public ReactorNettyClientRequestFactory(HttpClient httpClient) { + Assert.notNull(httpClient, "HttpClient must not be null"); + this.httpClient = httpClient; + } + + /** + * Set the underlying connect timeout in milliseconds. + * A value of 0 specifies an infinite timeout. + *

Default is 30 seconds. + * @see HttpClient#option(ChannelOption, Object) + * @see ChannelOption#CONNECT_TIMEOUT_MILLIS + */ + public void setConnectTimeout(int connectTimeout) { + Assert.isTrue(connectTimeout >= 0, "Timeout must be a non-negative value"); + this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); + } + + /** + * Set the underlying connect timeout in milliseconds. + * A value of 0 specifies an infinite timeout. + *

Default is 30 seconds. + * @see HttpClient#option(ChannelOption, Object) + * @see ChannelOption#CONNECT_TIMEOUT_MILLIS + */ + public void setConnectTimeout(Duration connectTimeout) { + Assert.notNull(connectTimeout, "ConnectTimeout must not be null"); + Assert.isTrue(!connectTimeout.isNegative(), "Timeout must be a non-negative value"); + this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)connectTimeout.toMillis()); + } + + /** + * Set the underlying read timeout in milliseconds. + *

Default is 10 seconds. + */ + public void setReadTimeout(long readTimeout) { + Assert.isTrue(readTimeout > 0, "Timeout must be a positive value"); + this.readTimeout = Duration.ofMillis(readTimeout); + } + + /** + * Set the underlying read timeout as {@code Duration}. + *

Default is 10 seconds. + */ + public void setReadTimeout(Duration readTimeout) { + Assert.notNull(readTimeout, "ReadTimeout must not be null"); + Assert.isTrue(!readTimeout.isNegative(), "Timeout must be a non-negative value"); + this.readTimeout = readTimeout; + } + + /** + * Set the timeout for the HTTP exchange in milliseconds. + *

Default is 30 seconds. + */ + public void setExchangeTimeout(long exchangeTimeout) { + Assert.isTrue(exchangeTimeout > 0, "Timeout must be a positive value"); + this.exchangeTimeout = Duration.ofMillis(exchangeTimeout); + } + + /** + * Set the timeout for the HTTP exchange. + *

Default is 30 seconds. + */ + public void setExchangeTimeout(Duration exchangeTimeout) { + Assert.notNull(exchangeTimeout, "ExchangeTimeout must not be null"); + Assert.isTrue(!exchangeTimeout.isNegative(), "Timeout must be a non-negative value"); + this.exchangeTimeout = exchangeTimeout; + } + + + + @Override + public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException { + return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout); + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientResponse.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientResponse.java new file mode 100644 index 00000000000..fb4694d9e3c --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientResponse.java @@ -0,0 +1,93 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; + +import reactor.netty.Connection; +import reactor.netty.http.client.HttpClientResponse; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.support.Netty4HeadersAdapter; +import org.springframework.lang.Nullable; + +/** + * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. + * + * @author Arjen Poutsma + * @since 6.1 + */ +final class ReactorNettyClientResponse implements ClientHttpResponse { + + private final HttpClientResponse response; + + private final Connection connection; + + private final HttpHeaders headers; + + private final Duration readTimeout; + + @Nullable + private volatile InputStream body; + + + + public ReactorNettyClientResponse(HttpClientResponse response, Connection connection, Duration readTimeout) { + this.response = response; + this.connection = connection; + this.readTimeout = readTimeout; + this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders())); + } + + @Override + public HttpStatusCode getStatusCode() { + return HttpStatusCode.valueOf(this.response.status().code()); + } + + @Override + public String getStatusText() { + return this.response.status().reasonPhrase(); + } + + @Override + public HttpHeaders getHeaders() { + return this.headers; + } + + @Override + public InputStream getBody() throws IOException { + if (this.body == null) { + InputStream body = this.connection.inbound().receive() + .aggregate().asInputStream().block(this.readTimeout); + if (body != null) { + this.body = body; + } + else { + throw new IOException("Could not receive body"); + } + } + return this.body; + } + + @Override + public void close() { + this.connection.dispose(); + } +} diff --git a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java new file mode 100644 index 00000000000..bcc7fd25e32 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client; + +import org.junit.jupiter.api.Test; + +import org.springframework.http.HttpMethod; + +/** + * @author Arjen Poutsma + */ +public class ReactorNettyClientHttpRequestFactoryTests extends AbstractHttpRequestFactoryTests { + + @Override + protected ClientHttpRequestFactory createRequestFactory() { + return new ReactorNettyClientRequestFactory(); + } + + @Override + @Test + public void httpMethods() throws Exception { + super.httpMethods(); + assertHttpMethod("patch", HttpMethod.PATCH); + } + +} diff --git a/spring-web/src/test/java/org/springframework/web/client/RestClientIntegrationTests.java b/spring-web/src/test/java/org/springframework/web/client/RestClientIntegrationTests.java index f681c8617b7..625dcd7466a 100644 --- a/spring-web/src/test/java/org/springframework/web/client/RestClientIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/RestClientIntegrationTests.java @@ -48,6 +48,7 @@ import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.http.client.JdkClientHttpRequestFactory; import org.springframework.http.client.JettyClientHttpRequestFactory; import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; +import org.springframework.http.client.ReactorNettyClientRequestFactory; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.util.CollectionUtils; import org.springframework.web.testfixture.xml.Pojo; @@ -77,7 +78,8 @@ class RestClientIntegrationTests { named("HttpComponents", new HttpComponentsClientHttpRequestFactory()), named("OkHttp", new OkHttp3ClientHttpRequestFactory()), named("Jetty", new JettyClientHttpRequestFactory()), - named("JDK HttpClient", new JdkClientHttpRequestFactory()) + named("JDK HttpClient", new JdkClientHttpRequestFactory()), + named("Reactor Netty", new ReactorNettyClientRequestFactory()) ); } diff --git a/spring-web/src/test/java/org/springframework/web/client/RestTemplateIntegrationTests.java b/spring-web/src/test/java/org/springframework/web/client/RestTemplateIntegrationTests.java index 88b498af01b..e76dea513b1 100644 --- a/spring-web/src/test/java/org/springframework/web/client/RestTemplateIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/RestTemplateIntegrationTests.java @@ -51,6 +51,7 @@ import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.http.client.JdkClientHttpRequestFactory; import org.springframework.http.client.JettyClientHttpRequestFactory; import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; +import org.springframework.http.client.ReactorNettyClientRequestFactory; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.http.converter.FormHttpMessageConverter; import org.springframework.http.converter.json.MappingJacksonValue; @@ -96,7 +97,8 @@ class RestTemplateIntegrationTests extends AbstractMockWebServerTests { named("HttpComponents", new HttpComponentsClientHttpRequestFactory()), named("OkHttp", new OkHttp3ClientHttpRequestFactory()), named("Jetty", new JettyClientHttpRequestFactory()), - named("JDK HttpClient", new JdkClientHttpRequestFactory()) + named("JDK HttpClient", new JdkClientHttpRequestFactory()), + named("Reactor Netty", new ReactorNettyClientRequestFactory()) ); }