Support for WebFlux on Reactor Netty 2 with Netty 5

See gh-28847
This commit is contained in:
rstoyanchev 2022-09-08 21:53:44 +01:00
parent d6c49eec5a
commit d373435856
23 changed files with 2659 additions and 10 deletions

View File

@ -9,7 +9,7 @@ javaPlatform {
dependencies {
api(platform("com.fasterxml.jackson:jackson-bom:2.13.3"))
api(platform("io.netty:netty-bom:4.1.80.Final"))
api(platform("io.netty:netty5-bom:5.0.0.Alpha4"))
api(platform("io.netty:netty5-bom:5.0.0.Alpha3"))
api(platform("io.projectreactor:reactor-bom:2022.0.0-M5"))
api(platform("io.rsocket:rsocket-bom:1.1.2"))
api(platform("org.apache.groovy:groovy-bom:4.0.4"))

View File

@ -20,10 +20,14 @@ dependencies {
optional("io.reactivex.rxjava3:rxjava")
optional("io.netty:netty-buffer")
optional("io.netty:netty-handler")
optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed
optional("io.netty:netty-transport") // Until Netty4ClientHttpRequest is removed
optional("io.netty:netty-codec-http")
optional("io.netty:netty-transport")
optional("io.projectreactor.netty:reactor-netty-http")
optional("io.netty:netty5-buffer")
optional("io.netty:netty5-handler")
optional("io.netty:netty5-codec-http")
optional("io.netty:netty5-transport")
optional("io.projectreactor.netty:reactor-netty5-http:2.0.0-M1")
optional("io.undertow:undertow-core")
optional("org.apache.tomcat.embed:tomcat-embed-core")
optional("org.eclipse.jetty:jetty-server") {

View File

@ -0,0 +1,283 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import io.netty5.handler.codec.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
/**
* {@code MultiValueMap} implementation for wrapping Netty HTTP headers.
*
* <p>There is a duplicate of this class in the server package!
*
* This class is based on {@link NettyHeadersAdapter}.
*
* @author Violeta Georgieva
* @since 6.0
*/
class Netty5HeadersAdapter implements MultiValueMap<String, String> {
private final HttpHeaders headers;
Netty5HeadersAdapter(HttpHeaders headers) {
this.headers = headers;
}
@Override
@Nullable
public String getFirst(String key) {
return this.headers.get(key);
}
@Override
public void add(String key, @Nullable String value) {
if (value != null) {
this.headers.add(key, value);
}
}
@Override
public void addAll(String key, List<? extends String> values) {
this.headers.add(key, values);
}
@Override
public void addAll(MultiValueMap<String, String> values) {
values.forEach(this.headers::add);
}
@Override
public void set(String key, @Nullable String value) {
if (value != null) {
this.headers.set(key, value);
}
}
@Override
public void setAll(Map<String, String> values) {
values.forEach(this.headers::set);
}
@Override
public Map<String, String> toSingleValueMap() {
Map<String, String> singleValueMap = CollectionUtils.newLinkedHashMap(this.headers.size());
this.headers.entries()
.forEach(entry -> {
if (!singleValueMap.containsKey(entry.getKey())) {
singleValueMap.put(entry.getKey(), entry.getValue());
}
});
return singleValueMap;
}
@Override
public int size() {
return this.headers.names().size();
}
@Override
public boolean isEmpty() {
return this.headers.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return (key instanceof String headerName && this.headers.contains(headerName));
}
@Override
public boolean containsValue(Object value) {
return (value instanceof String &&
this.headers.entries().stream()
.anyMatch(entry -> value.equals(entry.getValue())));
}
@Override
@Nullable
public List<String> get(Object key) {
if (containsKey(key)) {
return this.headers.getAll((String) key);
}
return null;
}
@Nullable
@Override
public List<String> put(String key, @Nullable List<String> value) {
List<String> previousValues = this.headers.getAll(key);
this.headers.set(key, value);
return previousValues;
}
@Nullable
@Override
public List<String> remove(Object key) {
if (key instanceof String headerName) {
List<String> previousValues = this.headers.getAll(headerName);
this.headers.remove(headerName);
return previousValues;
}
return null;
}
@Override
public void putAll(Map<? extends String, ? extends List<String>> map) {
map.forEach(this.headers::set);
}
@Override
public void clear() {
this.headers.clear();
}
@Override
public Set<String> keySet() {
return new HeaderNames();
}
@Override
public Collection<List<String>> values() {
return this.headers.names().stream()
.map(this.headers::getAll).collect(Collectors.toList());
}
@Override
public Set<Entry<String, List<String>>> entrySet() {
return new AbstractSet<>() {
@Override
public Iterator<Entry<String, List<String>>> iterator() {
return new EntryIterator();
}
@Override
public int size() {
return headers.size();
}
};
}
@Override
public String toString() {
return org.springframework.http.HttpHeaders.formatHeaders(this);
}
private class EntryIterator implements Iterator<Entry<String, List<String>>> {
private final Iterator<String> names = headers.names().iterator();
@Override
public boolean hasNext() {
return this.names.hasNext();
}
@Override
public Entry<String, List<String>> next() {
return new HeaderEntry(this.names.next());
}
}
private class HeaderEntry implements Entry<String, List<String>> {
private final String key;
HeaderEntry(String key) {
this.key = key;
}
@Override
public String getKey() {
return this.key;
}
@Override
public List<String> getValue() {
return headers.getAll(this.key);
}
@Override
public List<String> setValue(List<String> value) {
List<String> previousValues = headers.getAll(this.key);
headers.set(this.key, value);
return previousValues;
}
}
private class HeaderNames extends AbstractSet<String> {
@Override
public Iterator<String> iterator() {
return new HeaderNamesIterator(headers.names().iterator());
}
@Override
public int size() {
return headers.names().size();
}
}
private final class HeaderNamesIterator implements Iterator<String> {
private final Iterator<String> iterator;
@Nullable
private String currentName;
private HeaderNamesIterator(Iterator<String> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public String next() {
this.currentName = this.iterator.next();
return this.currentName;
}
@Override
public void remove() {
if (this.currentName == null) {
throw new IllegalStateException("No current Header in iterator");
}
if (!headers.contains(this.currentName)) {
throw new IllegalStateException("Header not present: " + this.currentName);
}
headers.remove(this.currentName);
}
}
}

View File

@ -0,0 +1,132 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.netty5.NettyOutbound;
import reactor.netty5.http.client.HttpClient;
import reactor.netty5.http.client.HttpClientRequest;
import reactor.netty5.resources.ConnectionProvider;
import reactor.netty5.resources.LoopResources;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Reactor Netty 2 (Netty 5) implementation of {@link ClientHttpConnector}.
*
* <p>This class is based on {@link ReactorClientHttpConnector}.
*
* @author Violeta Georgieva
* @since 6.0
* @see HttpClient
*/
public class ReactorNetty2ClientHttpConnector implements ClientHttpConnector {
private final static Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);
private final HttpClient httpClient;
/**
* Default constructor. Initializes {@link HttpClient} via:
* <pre class="code">
* HttpClient.create().compress()
* </pre>
*/
public ReactorNetty2ClientHttpConnector() {
this.httpClient = defaultInitializer.apply(HttpClient.create().wiretap(true));
}
/**
* Constructor with externally managed Reactor Netty resources, including
* {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
* for the connection pool.
* <p>This constructor should be used only when you don't want the client
* to participate in the Reactor Netty global resources. By default, the
* client participates in the Reactor Netty global resources held in
* {@link reactor.netty5.http.HttpResources}, which is recommended since
* fixed, shared resources are favored for event loop concurrency. However,
* consider declaring a {@link ReactorNetty2ResourceFactory} bean with
* {@code globalResources=true} in order to ensure the Reactor Netty global
* resources are shut down when the Spring ApplicationContext is closed.
* @param factory the resource factory to obtain the resources from
* @param mapper a mapper for further initialization of the created client
* @since 5.1
*/
public ReactorNetty2ClientHttpConnector(ReactorNetty2ResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
ConnectionProvider provider = factory.getConnectionProvider();
Assert.notNull(provider, "No ConnectionProvider: is ReactorNetty2ResourceFactory not initialized yet?");
this.httpClient = defaultInitializer.andThen(mapper).andThen(applyLoopResources(factory))
.apply(HttpClient.create(provider));
}
private static Function<HttpClient, HttpClient> applyLoopResources(ReactorNetty2ResourceFactory factory) {
return httpClient -> {
LoopResources resources = factory.getLoopResources();
Assert.notNull(resources, "No LoopResources: is ReactorNetty2ResourceFactory not initialized yet?");
return httpClient.runOn(resources);
};
}
/**
* Constructor with a pre-configured {@code HttpClient} instance.
* @param httpClient the client to use
* @since 5.1
*/
public ReactorNetty2ClientHttpConnector(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient is required");
this.httpClient = httpClient;
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
AtomicReference<ReactorNetty2ClientHttpResponse> responseRef = new AtomicReference<>();
return this.httpClient
.request(io.netty5.handler.codec.http.HttpMethod.valueOf(method.name()))
.uri(uri.toString())
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
responseRef.set(new ReactorNetty2ClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
})
.next()
.doOnCancel(() -> {
ReactorNetty2ClientHttpResponse response = responseRef.get();
if (response != null) {
response.releaseAfterCancel(method);
}
});
}
private ReactorNetty2ClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
NettyOutbound nettyOutbound) {
return new ReactorNetty2ClientHttpRequest(method, uri, request, nettyOutbound);
}
}

View File

@ -0,0 +1,143 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collection;
import io.netty5.buffer.api.Buffer;
import io.netty5.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty5.NettyOutbound;
import reactor.netty5.http.client.HttpClientRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ZeroCopyHttpOutputMessage;
/**
* {@link ClientHttpRequest} implementation for the Reactor Netty 2 (Netty 5) HTTP client.
*
* <p>This class is based on {@link ReactorClientHttpRequest}.
*
* @author Violeta Georgieva
* @since 6.0
* @see reactor.netty5.http.client.HttpClient
*/
class ReactorNetty2ClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage {
private final HttpMethod httpMethod;
private final URI uri;
private final HttpClientRequest request;
private final NettyOutbound outbound;
private final Netty5DataBufferFactory bufferFactory;
public ReactorNetty2ClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) {
this.httpMethod = method;
this.uri = uri;
this.request = request;
this.outbound = outbound;
this.bufferFactory = new Netty5DataBufferFactory(outbound.alloc());
}
@Override
public HttpMethod getMethod() {
return this.httpMethod;
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
}
@Override
@SuppressWarnings("unchecked")
public <T> T getNativeRequest() {
return (T) this.request;
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
// Send as Mono if possible as an optimization hint to Reactor Netty
if (body instanceof Mono) {
Mono<Buffer> bufferMono = Mono.from(body).map(Netty5DataBufferFactory::toBuffer);
return this.outbound.send(bufferMono).then();
}
else {
Flux<Buffer> bufferFlux = Flux.from(body).map(Netty5DataBufferFactory::toBuffer);
return this.outbound.send(bufferFlux).then();
}
});
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Publisher<Publisher<Buffer>> buffers = Flux.from(body).map(ReactorNetty2ClientHttpRequest::toBuffers);
return doCommit(() -> this.outbound.sendGroups(buffers).then());
}
private static Publisher<Buffer> toBuffers(Publisher<? extends DataBuffer> dataBuffers) {
return Flux.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer);
}
@Override
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() -> this.outbound.sendFile(file, position, count).then());
}
@Override
public Mono<Void> setComplete() {
return doCommit(this.outbound::then);
}
@Override
protected void applyHeaders() {
getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value));
}
@Override
protected void applyCookies() {
getCookies().values().stream().flatMap(Collection::stream)
.map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue()))
.forEach(this.request::addCookie);
}
@Override
protected HttpHeaders initReadOnlyHeaders() {
return HttpHeaders.readOnlyHttpHeaders(new Netty5HeadersAdapter(this.request.requestHeaders()));
}
}

View File

@ -0,0 +1,214 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.handler.codec.http.cookie.Cookie;
import io.netty5.handler.codec.http.cookie.DefaultCookie;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.netty5.Connection;
import reactor.netty5.NettyInbound;
import reactor.netty5.http.client.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
/**
* {@link ClientHttpResponse} implementation for the Reactor Netty 2 (Netty 5) HTTP client.
*
* <p>This class is based on {@link ReactorClientHttpResponse}.
*
* @author Violeta Georgieva
* @since 6.0
* @see reactor.netty5.http.client.HttpClient
*/
class ReactorNetty2ClientHttpResponse implements ClientHttpResponse {
/** Reactor Netty 1.0.5+. */
static final boolean reactorNettyRequestChannelOperationsIdPresent = ClassUtils.isPresent(
"reactor.netty5.ChannelOperationsId", ReactorNetty2ClientHttpResponse.class.getClassLoader());
private static final Log logger = LogFactory.getLog(ReactorNetty2ClientHttpResponse.class);
private final HttpClientResponse response;
private final HttpHeaders headers;
private final NettyInbound inbound;
private final Netty5DataBufferFactory bufferFactory;
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
private final AtomicInteger state = new AtomicInteger();
/**
* Constructor that matches the inputs from
* {@link reactor.netty5.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}.
* @since 5.2.8
*/
public ReactorNetty2ClientHttpResponse(HttpClientResponse response, Connection connection) {
this.response = response;
MultiValueMap<String, String> adapter = new Netty5HeadersAdapter(response.responseHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
this.inbound = connection.inbound();
this.bufferFactory = new Netty5DataBufferFactory(connection.outbound().alloc());
}
/**
* Constructor with inputs extracted from a {@link Connection}.
* @deprecated as of 5.2.8, in favor of {@link #ReactorNetty2ClientHttpResponse(HttpClientResponse, Connection)}
*/
@Deprecated
public ReactorNetty2ClientHttpResponse(HttpClientResponse response, NettyInbound inbound, BufferAllocator alloc) {
this.response = response;
MultiValueMap<String, String> adapter = new Netty5HeadersAdapter(response.responseHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
this.inbound = inbound;
this.bufferFactory = new Netty5DataBufferFactory(alloc);
}
@Override
public String getId() {
String id = null;
if (reactorNettyRequestChannelOperationsIdPresent) {
id = ChannelOperationsIdHelper.getId(this.response);
}
if (id == null && this.response instanceof Connection connection) {
id = connection.channel().id().asShortText();
}
return (id != null ? id : ObjectUtils.getIdentityHexString(this));
}
@Override
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
.doOnSubscribe(s -> {
if (this.state.compareAndSet(0, 1)) {
return;
}
if (this.state.get() == 2) {
throw new IllegalStateException(
"The client response body has been released already due to cancellation.");
}
})
.map(buffer -> this.bufferFactory.wrap(buffer.split()));
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
@Override
public HttpStatusCode getStatusCode() {
return HttpStatusCode.valueOf(this.response.status().code());
}
@Override
@Deprecated
public int getRawStatusCode() {
return this.response.status().code();
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
this.response.cookies().values().stream()
.flatMap(Collection::stream)
.forEach(cookie -> result.add(cookie.name(),
ResponseCookie.fromClientResponse(cookie.name(), cookie.value())
.domain(cookie.domain())
.path(cookie.path())
.maxAge(cookie.maxAge())
.secure(cookie.isSecure())
.httpOnly(cookie.isHttpOnly())
.sameSite(getSameSite(cookie))
.build()));
return CollectionUtils.unmodifiableMultiValueMap(result);
}
@Nullable
private static String getSameSite(Cookie cookie) {
if (cookie instanceof DefaultCookie defaultCookie) {
if (defaultCookie.sameSite() != null) {
return defaultCookie.sameSite().name();
}
}
return null;
}
/**
* Called by {@link ReactorNetty2ClientHttpConnector} when a cancellation is detected
* but the content has not been subscribed to. If the subscription never
* materializes then the content will remain not drained. Or it could still
* materialize if the cancellation happened very early, or the response
* reading was delayed for some reason.
*/
void releaseAfterCancel(HttpMethod method) {
if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) {
if (logger.isDebugEnabled()) {
logger.debug("[" + getId() + "]" + "Releasing body, not yet subscribed.");
}
this.inbound.receive().doOnNext(buffer -> {}).subscribe(buffer -> {}, ex -> {});
}
}
private boolean mayHaveBody(HttpMethod method) {
int code = this.getRawStatusCode();
return !((code >= 100 && code < 200) || code == 204 || code == 205 ||
method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0);
}
@Override
public String toString() {
return "ReactorNetty2ClientHttpResponse{" +
"request=[" + this.response.method().name() + " " + this.response.uri() + "]," +
"status=" + getRawStatusCode() + '}';
}
private static class ChannelOperationsIdHelper {
@Nullable
public static String getId(HttpClientResponse response) {
if (response instanceof reactor.netty5.ChannelOperationsId id) {
return (logger.isDebugEnabled() ? id.asLongText() : id.asShortText());
}
return null;
}
}
}

View File

@ -0,0 +1,250 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.netty5.http.HttpResources;
import reactor.netty5.resources.ConnectionProvider;
import reactor.netty5.resources.LoopResources;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for
* event loop threads, and {@link ConnectionProvider} for the connection pool,
* within the lifecycle of a Spring {@code ApplicationContext}.
*
* <p>This factory implements {@link InitializingBean} and {@link DisposableBean}
* and is expected typically to be declared as a Spring-managed bean.
*
* <p>This class is based on {@link ReactorResourceFactory}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2ResourceFactory implements InitializingBean, DisposableBean {
private boolean useGlobalResources = true;
@Nullable
private Consumer<HttpResources> globalResourcesConsumer;
private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.create("webflux", 500);
@Nullable
private ConnectionProvider connectionProvider;
private Supplier<LoopResources> loopResourcesSupplier = () -> LoopResources.create("webflux-http");
@Nullable
private LoopResources loopResources;
private boolean manageConnectionProvider = false;
private boolean manageLoopResources = false;
private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD);
private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT);
/**
* Whether to use global Reactor Netty resources via {@link HttpResources}.
* <p>Default is "true" in which case this factory initializes and stops the
* global Reactor Netty resources within Spring's {@code ApplicationContext}
* lifecycle. If set to "false" the factory manages its resources independent
* of the global ones.
* @param useGlobalResources whether to expose and manage the global resources
* @see #addGlobalResourcesConsumer(Consumer)
*/
public void setUseGlobalResources(boolean useGlobalResources) {
this.useGlobalResources = useGlobalResources;
}
/**
* Whether this factory exposes the global
* {@link HttpResources HttpResources} holder.
*/
public boolean isUseGlobalResources() {
return this.useGlobalResources;
}
/**
* Add a Consumer for configuring the global Reactor Netty resources on
* startup. When this option is used, {@link #setUseGlobalResources} is also
* enabled.
* @param consumer the consumer to apply
* @see #setUseGlobalResources(boolean)
*/
public void addGlobalResourcesConsumer(Consumer<HttpResources> consumer) {
this.useGlobalResources = true;
this.globalResourcesConsumer = (this.globalResourcesConsumer != null ?
this.globalResourcesConsumer.andThen(consumer) : consumer);
}
/**
* Use this when you don't want to participate in global resources and
* you want to customize the creation of the managed {@code ConnectionProvider}.
* <p>By default, {@code ConnectionProvider.elastic("http")} is used.
* <p>Note that this option is ignored if {@code userGlobalResources=false} or
* {@link #setConnectionProvider(ConnectionProvider)} is set.
* @param supplier the supplier to use
*/
public void setConnectionProviderSupplier(Supplier<ConnectionProvider> supplier) {
this.connectionProviderSupplier = supplier;
}
/**
* Use this when you want to provide an externally managed
* {@link ConnectionProvider} instance.
* @param connectionProvider the connection provider to use as is
*/
public void setConnectionProvider(ConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}
/**
* Return the configured {@link ConnectionProvider}.
*/
public ConnectionProvider getConnectionProvider() {
Assert.state(this.connectionProvider != null, "ConnectionProvider not initialized yet");
return this.connectionProvider;
}
/**
* Use this when you don't want to participate in global resources and
* you want to customize the creation of the managed {@code LoopResources}.
* <p>By default, {@code LoopResources.create("reactor-http")} is used.
* <p>Note that this option is ignored if {@code userGlobalResources=false} or
* {@link #setLoopResources(LoopResources)} is set.
* @param supplier the supplier to use
*/
public void setLoopResourcesSupplier(Supplier<LoopResources> supplier) {
this.loopResourcesSupplier = supplier;
}
/**
* Use this option when you want to provide an externally managed
* {@link LoopResources} instance.
* @param loopResources the loop resources to use as is
*/
public void setLoopResources(LoopResources loopResources) {
this.loopResources = loopResources;
}
/**
* Return the configured {@link LoopResources}.
*/
public LoopResources getLoopResources() {
Assert.state(this.loopResources != null, "LoopResources not initialized yet");
return this.loopResources;
}
/**
* Configure the amount of time we'll wait before shutting down resources.
* If a task is submitted during the {@code shutdownQuietPeriod}, it is guaranteed
* to be accepted and the {@code shutdownQuietPeriod} will start over.
* <p>By default, this is set to
* {@link LoopResources#DEFAULT_SHUTDOWN_QUIET_PERIOD} which is 2 seconds but
* can also be overridden with the system property
* {@link reactor.netty5.ReactorNetty#SHUTDOWN_QUIET_PERIOD
* ReactorNetty.SHUTDOWN_QUIET_PERIOD}.
* @since 5.2.4
* @see #setShutdownTimeout(Duration)
*/
public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) {
Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null");
this.shutdownQuietPeriod = shutdownQuietPeriod;
}
/**
* Configure the maximum amount of time to wait until the disposal of the
* underlying resources regardless if a task was submitted during the
* {@code shutdownQuietPeriod}.
* <p>By default, this is set to
* {@link LoopResources#DEFAULT_SHUTDOWN_TIMEOUT} which is 15 seconds but
* can also be overridden with the system property
* {@link reactor.netty5.ReactorNetty#SHUTDOWN_TIMEOUT
* ReactorNetty.SHUTDOWN_TIMEOUT}.
* @since 5.2.4
* @see #setShutdownQuietPeriod(Duration)
*/
public void setShutdownTimeout(Duration shutdownTimeout) {
Assert.notNull(shutdownTimeout, "shutdownTimeout should not be null");
this.shutdownTimeout = shutdownTimeout;
}
@Override
public void afterPropertiesSet() {
if (this.useGlobalResources) {
Assert.isTrue(this.loopResources == null && this.connectionProvider == null,
"'useGlobalResources' is mutually exclusive with explicitly configured resources");
HttpResources httpResources = HttpResources.get();
if (this.globalResourcesConsumer != null) {
this.globalResourcesConsumer.accept(httpResources);
}
this.connectionProvider = httpResources;
this.loopResources = httpResources;
}
else {
if (this.loopResources == null) {
this.manageLoopResources = true;
this.loopResources = this.loopResourcesSupplier.get();
}
if (this.connectionProvider == null) {
this.manageConnectionProvider = true;
this.connectionProvider = this.connectionProviderSupplier.get();
}
}
}
@Override
public void destroy() {
if (this.useGlobalResources) {
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
}
else {
try {
ConnectionProvider provider = this.connectionProvider;
if (provider != null && this.manageConnectionProvider) {
provider.disposeLater().block();
}
}
catch (Throwable ex) {
// ignore
}
try {
LoopResources resources = this.loopResources;
if (resources != null && this.manageLoopResources) {
resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
}
}
catch (Throwable ex) {
// ignore
}
}
}
}

View File

@ -0,0 +1,280 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import io.netty5.handler.codec.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
/**
* {@code MultiValueMap} implementation for wrapping Netty HTTP headers.
*
* <p>This class is based on {@link NettyHeadersAdapter}.
*
* @author Violeta Georgieva
* @since 6.0
*/
final class Netty5HeadersAdapter implements MultiValueMap<String, String> {
private final HttpHeaders headers;
Netty5HeadersAdapter(HttpHeaders headers) {
this.headers = headers;
}
@Override
@Nullable
public String getFirst(String key) {
return this.headers.get(key);
}
@Override
public void add(String key, @Nullable String value) {
if (value != null) {
this.headers.add(key, value);
}
}
@Override
public void addAll(String key, List<? extends String> values) {
this.headers.add(key, values);
}
@Override
public void addAll(MultiValueMap<String, String> values) {
values.forEach(this.headers::add);
}
@Override
public void set(String key, @Nullable String value) {
if (value != null) {
this.headers.set(key, value);
}
}
@Override
public void setAll(Map<String, String> values) {
values.forEach(this.headers::set);
}
@Override
public Map<String, String> toSingleValueMap() {
Map<String, String> singleValueMap = CollectionUtils.newLinkedHashMap(this.headers.size());
this.headers.entries()
.forEach(entry -> {
if (!singleValueMap.containsKey(entry.getKey())) {
singleValueMap.put(entry.getKey(), entry.getValue());
}
});
return singleValueMap;
}
@Override
public int size() {
return this.headers.names().size();
}
@Override
public boolean isEmpty() {
return this.headers.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return (key instanceof String headerName && this.headers.contains(headerName));
}
@Override
public boolean containsValue(Object value) {
return (value instanceof String &&
this.headers.entries().stream()
.anyMatch(entry -> value.equals(entry.getValue())));
}
@Override
@Nullable
public List<String> get(Object key) {
if (containsKey(key)) {
return this.headers.getAll((String) key);
}
return null;
}
@Nullable
@Override
public List<String> put(String key, @Nullable List<String> value) {
List<String> previousValues = this.headers.getAll(key);
this.headers.set(key, value);
return previousValues;
}
@Nullable
@Override
public List<String> remove(Object key) {
if (key instanceof String headerName) {
List<String> previousValues = this.headers.getAll(headerName);
this.headers.remove(headerName);
return previousValues;
}
return null;
}
@Override
public void putAll(Map<? extends String, ? extends List<String>> map) {
map.forEach(this.headers::set);
}
@Override
public void clear() {
this.headers.clear();
}
@Override
public Set<String> keySet() {
return new HeaderNames();
}
@Override
public Collection<List<String>> values() {
return this.headers.names().stream()
.map(this.headers::getAll).collect(Collectors.toList());
}
@Override
public Set<Entry<String, List<String>>> entrySet() {
return new AbstractSet<>() {
@Override
public Iterator<Entry<String, List<String>>> iterator() {
return new EntryIterator();
}
@Override
public int size() {
return headers.size();
}
};
}
@Override
public String toString() {
return org.springframework.http.HttpHeaders.formatHeaders(this);
}
private class EntryIterator implements Iterator<Entry<String, List<String>>> {
private final Iterator<String> names = headers.names().iterator();
@Override
public boolean hasNext() {
return this.names.hasNext();
}
@Override
public Entry<String, List<String>> next() {
return new HeaderEntry(this.names.next());
}
}
private class HeaderEntry implements Entry<String, List<String>> {
private final String key;
HeaderEntry(String key) {
this.key = key;
}
@Override
public String getKey() {
return this.key;
}
@Override
public List<String> getValue() {
return headers.getAll(this.key);
}
@Override
public List<String> setValue(List<String> value) {
List<String> previousValues = headers.getAll(this.key);
headers.set(this.key, value);
return previousValues;
}
}
private class HeaderNames extends AbstractSet<String> {
@Override
public Iterator<String> iterator() {
return new HeaderNamesIterator(headers.names().iterator());
}
@Override
public int size() {
return headers.names().size();
}
}
private final class HeaderNamesIterator implements Iterator<String> {
private final Iterator<String> iterator;
@Nullable
private String currentName;
private HeaderNamesIterator(Iterator<String> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public String next() {
this.currentName = this.iterator.next();
return this.currentName;
}
@Override
public void remove() {
if (this.currentName == null) {
throw new IllegalStateException("No current Header in iterator");
}
if (!headers.contains(this.currentName)) {
throw new IllegalStateException("Header not present: " + this.currentName);
}
headers.remove(this.currentName);
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -38,7 +38,7 @@ import org.springframework.util.MultiValueMap;
* @author Brian Clozel
* @since 5.1.1
*/
class NettyHeadersAdapter implements MultiValueMap<String, String> {
final class NettyHeadersAdapter implements MultiValueMap<String, String> {
private final HttpHeaders headers;

View File

@ -0,0 +1,79 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.net.URISyntaxException;
import java.util.function.BiFunction;
import io.netty5.handler.codec.http.HttpResponseStatus;
import org.apache.commons.logging.Log;
import reactor.core.publisher.Mono;
import reactor.netty5.http.server.HttpServerRequest;
import reactor.netty5.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.HttpLogging;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Adapt {@link HttpHandler} to the Reactor Netty 5 channel handling function.
*
* <p>This class is based on {@link ReactorHttpHandlerAdapter}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2HttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
private static final Log logger = HttpLogging.forLogName(ReactorNetty2HttpHandlerAdapter.class);
private final HttpHandler httpHandler;
public ReactorNetty2HttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "HttpHandler must not be null");
this.httpHandler = httpHandler;
}
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
Netty5DataBufferFactory bufferFactory = new Netty5DataBufferFactory(reactorResponse.alloc());
try {
ReactorNetty2ServerHttpRequest request = new ReactorNetty2ServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorNetty2ServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}

View File

@ -0,0 +1,243 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLSession;
import io.netty5.channel.Channel;
import io.netty5.handler.codec.http.HttpHeaderNames;
import io.netty5.handler.codec.http.cookie.Cookie;
import io.netty5.handler.ssl.SslHandler;
import org.apache.commons.logging.Log;
import reactor.core.publisher.Flux;
import reactor.netty5.Connection;
import reactor.netty5.http.server.HttpServerRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpLogging;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}.
*
* <p>This class is based on {@link ReactorServerHttpRequest}.
*
* @author Violeta Georgieva
* @since 6.0
*/
class ReactorNetty2ServerHttpRequest extends AbstractServerHttpRequest {
/** Reactor Netty 1.0.5+. */
static final boolean reactorNettyRequestChannelOperationsIdPresent = ClassUtils.isPresent(
"reactor.netty.ChannelOperationsId", ReactorNetty2ServerHttpRequest.class.getClassLoader());
private static final Log logger = HttpLogging.forLogName(ReactorNetty2ServerHttpRequest.class);
private static final AtomicLong logPrefixIndex = new AtomicLong();
private final HttpServerRequest request;
private final Netty5DataBufferFactory bufferFactory;
public ReactorNetty2ServerHttpRequest(HttpServerRequest request, Netty5DataBufferFactory bufferFactory)
throws URISyntaxException {
super(initUri(request), "", new Netty5HeadersAdapter(request.requestHeaders()));
Assert.notNull(bufferFactory, "DataBufferFactory must not be null");
this.request = request;
this.bufferFactory = bufferFactory;
}
private static URI initUri(HttpServerRequest request) throws URISyntaxException {
Assert.notNull(request, "HttpServerRequest must not be null");
return new URI(resolveBaseUrl(request) + resolveRequestUri(request));
}
private static URI resolveBaseUrl(HttpServerRequest request) throws URISyntaxException {
String scheme = getScheme(request);
String header = request.requestHeaders().get(HttpHeaderNames.HOST);
if (header != null) {
final int portIndex;
if (header.startsWith("[")) {
portIndex = header.indexOf(':', header.indexOf(']'));
}
else {
portIndex = header.indexOf(':');
}
if (portIndex != -1) {
try {
return new URI(scheme, null, header.substring(0, portIndex),
Integer.parseInt(header, portIndex + 1, header.length(), 10), null, null, null);
}
catch (NumberFormatException ex) {
throw new URISyntaxException(header, "Unable to parse port", portIndex);
}
}
else {
return new URI(scheme, header, null, null);
}
}
else {
InetSocketAddress localAddress = request.hostAddress();
Assert.state(localAddress != null, "No host address available");
return new URI(scheme, null, localAddress.getHostString(),
localAddress.getPort(), null, null, null);
}
}
private static String getScheme(HttpServerRequest request) {
return request.scheme();
}
private static String resolveRequestUri(HttpServerRequest request) {
String uri = request.uri();
for (int i = 0; i < uri.length(); i++) {
char c = uri.charAt(i);
if (c == '/' || c == '?' || c == '#') {
break;
}
if (c == ':' && (i + 2 < uri.length())) {
if (uri.charAt(i + 1) == '/' && uri.charAt(i + 2) == '/') {
for (int j = i + 3; j < uri.length(); j++) {
c = uri.charAt(j);
if (c == '/' || c == '?' || c == '#') {
return uri.substring(j);
}
}
return "";
}
}
}
return uri;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.request.method().name());
}
@Override
@Deprecated
public String getMethodValue() {
return this.request.method().name();
}
@Override
protected MultiValueMap<String, HttpCookie> initCookies() {
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
for (CharSequence name : this.request.cookies().keySet()) {
for (Cookie cookie : this.request.cookies().get(name)) {
HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value());
cookies.add(name.toString(), httpCookie);
}
}
return cookies;
}
@Override
@Nullable
public InetSocketAddress getLocalAddress() {
return this.request.hostAddress();
}
@Override
@Nullable
public InetSocketAddress getRemoteAddress() {
return this.request.remoteAddress();
}
@Override
@Nullable
protected SslInfo initSslInfo() {
Channel channel = ((Connection) this.request).channel();
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
if (sslHandler == null && channel.parent() != null) { // HTTP/2
sslHandler = channel.parent().pipeline().get(SslHandler.class);
}
if (sslHandler != null) {
SSLSession session = sslHandler.engine().getSession();
return new DefaultSslInfo(session);
}
return null;
}
@Override
public Flux<DataBuffer> getBody() {
return this.request.receive().transferOwnership().map(this.bufferFactory::wrap);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeRequest() {
return (T) this.request;
}
@Override
@Nullable
protected String initId() {
if (this.request instanceof Connection) {
return ((Connection) this.request).channel().id().asShortText() +
"-" + logPrefixIndex.incrementAndGet();
}
return null;
}
@Override
protected String initLogPrefix() {
if (reactorNettyRequestChannelOperationsIdPresent) {
String id = (ChannelOperationsIdHelper.getId(this.request));
if (id != null) {
return id;
}
}
if (this.request instanceof Connection) {
return ((Connection) this.request).channel().id().asShortText() +
"-" + logPrefixIndex.incrementAndGet();
}
return getId();
}
private static class ChannelOperationsIdHelper {
@Nullable
public static String getId(HttpServerRequest request) {
if (request instanceof reactor.netty.ChannelOperationsId) {
return (logger.isDebugEnabled() ?
((reactor.netty.ChannelOperationsId) request).asLongText() :
((reactor.netty.ChannelOperationsId) request).asShortText());
}
return null;
}
}
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.nio.file.Path;
import java.util.List;
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelId;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty5.ChannelOperationsId;
import reactor.netty5.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}.
*
* <p>This class is based on {@link ReactorServerHttpResponse}.
*
* @author Violeta Georgieva
* @since 6.0
*/
class ReactorNetty2ServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage {
private static final Log logger = LogFactory.getLog(ReactorNetty2ServerHttpResponse.class);
private final HttpServerResponse response;
public ReactorNetty2ServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {
super(bufferFactory, new HttpHeaders(new Netty5HeadersAdapter(response.responseHeaders())));
Assert.notNull(response, "HttpServerResponse must not be null");
this.response = response;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeResponse() {
return (T) this.response;
}
@Override
public HttpStatusCode getStatusCode() {
HttpStatusCode status = super.getStatusCode();
return (status != null ? status : HttpStatusCode.valueOf(this.response.status().code()));
}
@Override
@Deprecated
public Integer getRawStatusCode() {
Integer status = super.getRawStatusCode();
return (status != null ? status : this.response.status().code());
}
@Override
protected void applyStatusCode() {
HttpStatusCode status = super.getStatusCode();
if (status != null) {
this.response.status(status.value());
}
}
@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> publisher) {
return this.response.send(toByteBufs(publisher)).then();
}
@Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
return this.response.sendGroups(Flux.from(publisher).map(this::toByteBufs)).then();
}
@Override
protected void applyHeaders() {
}
@Override
protected void applyCookies() {
// Netty Cookie doesn't support sameSite. When this is resolved, we can adapt to it again:
// https://github.com/netty/netty/issues/8161
for (List<ResponseCookie> cookies : getCookies().values()) {
for (ResponseCookie cookie : cookies) {
this.response.addHeader(HttpHeaders.SET_COOKIE, cookie.toString());
}
}
}
@Override
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() -> this.response.sendFile(file, position, count).then());
}
private Publisher<Buffer> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
return dataBuffers instanceof Mono ?
Mono.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer) :
Flux.from(dataBuffers).map(Netty5DataBufferFactory::toBuffer);
}
@Override
protected void touchDataBuffer(DataBuffer buffer) {
if (logger.isDebugEnabled()) {
if (ReactorNetty2ServerHttpRequest.reactorNettyRequestChannelOperationsIdPresent) {
if (ChannelOperationsIdHelper.touch(buffer, this.response)) {
return;
}
}
this.response.withConnection(connection -> {
ChannelId id = connection.channel().id();
DataBufferUtils.touch(buffer, "Channel id: " + id.asShortText());
});
}
}
private static class ChannelOperationsIdHelper {
public static boolean touch(DataBuffer dataBuffer, HttpServerResponse response) {
if (response instanceof ChannelOperationsId) {
String id = ((ChannelOperationsId) response).asLongText();
DataBufferUtils.touch(dataBuffer, "Channel id: " + id);
return true;
}
return false;
}
}
}

View File

@ -133,11 +133,12 @@ class HeadersAdaptersTests {
static Stream<Arguments> headers() {
return Stream.of(
arguments(named("Map", CollectionUtils.toMultiValueMap(new LinkedCaseInsensitiveMap<>(8, Locale.ENGLISH)))),
arguments(named("Netty", new NettyHeadersAdapter(new DefaultHttpHeaders()))),
arguments(named("Tomcat", new TomcatHeadersAdapter(new MimeHeaders()))),
arguments(named("Undertow", new UndertowHeadersAdapter(new HeaderMap()))),
arguments(named("Jetty", new JettyHeadersAdapter(HttpFields.build())))
arguments(named("Map", CollectionUtils.toMultiValueMap(new LinkedCaseInsensitiveMap<>(8, Locale.ENGLISH)))),
arguments(named("Netty", new NettyHeadersAdapter(new DefaultHttpHeaders()))),
arguments(named("Netty", new Netty5HeadersAdapter(new io.netty5.handler.codec.http.DefaultHttpHeaders()))),
arguments(named("Tomcat", new TomcatHeadersAdapter(new MimeHeaders()))),
arguments(named("Undertow", new UndertowHeadersAdapter(new HeaderMap()))),
arguments(named("Jetty", new JettyHeadersAdapter(HttpFields.build())))
);
}

View File

@ -127,6 +127,7 @@ public abstract class AbstractHttpHandlerIntegrationTests {
return Stream.of(
named("Jetty", new JettyHttpServer()),
named("Reactor Netty", new ReactorHttpServer()),
named("Reactor Netty 2", new ReactorHttpServer()),
named("Tomcat", new TomcatHttpServer()),
named("Undertow", new UndertowHttpServer())
);

View File

@ -0,0 +1,71 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.testfixture.http.server.reactive.bootstrap;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import reactor.netty5.DisposableServer;
import org.springframework.http.server.reactive.ReactorNetty2HttpHandlerAdapter;
/**
* This class is copied from {@link ReactorHttpServer}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2HttpServer extends AbstractHttpServer {
private ReactorNetty2HttpHandlerAdapter reactorHandler;
private reactor.netty5.http.server.HttpServer reactorServer;
private AtomicReference<DisposableServer> serverRef = new AtomicReference<>();
@Override
protected void initServer() {
this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.netty5.http.server.HttpServer.create().wiretap(true)
.host(getHost()).port(getPort());
}
private ReactorNetty2HttpHandlerAdapter createHttpHandlerAdapter() {
return new ReactorNetty2HttpHandlerAdapter(resolveHttpHandler());
}
@Override
protected void startInternal() {
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
setPort(((InetSocketAddress) server.address()).getPort());
this.serverRef.set(server);
}
@Override
protected void stopInternal() {
this.serverRef.get().dispose();
}
@Override
protected void resetInternal() {
this.reactorServer = null;
this.reactorHandler = null;
this.serverRef.set(null);
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.testfixture.http.server.reactive.bootstrap;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import reactor.netty5.DisposableServer;
import reactor.netty5.http.Http11SslContextSpec;
import org.springframework.http.server.reactive.ReactorNetty2HttpHandlerAdapter;
/**
* This class is copied from {@link ReactorHttpsServer}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2HttpsServer extends AbstractHttpServer {
private ReactorNetty2HttpHandlerAdapter reactorHandler;
private reactor.netty5.http.server.HttpServer reactorServer;
private AtomicReference<DisposableServer> serverRef = new AtomicReference<>();
@Override
protected void initServer() throws Exception {
SelfSignedCertificate cert = new SelfSignedCertificate();
Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert.certificate(), cert.privateKey());
this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.netty5.http.server.HttpServer.create()
.host(getHost())
.port(getPort())
.secure(sslContextSpec -> sslContextSpec.sslContext(http11SslContextSpec));
}
private ReactorNetty2HttpHandlerAdapter createHttpHandlerAdapter() {
return new ReactorNetty2HttpHandlerAdapter(resolveHttpHandler());
}
@Override
protected void startInternal() {
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
setPort(((InetSocketAddress) server.address()).getPort());
this.serverRef.set(server);
}
@Override
protected void stopInternal() {
this.serverRef.get().dispose();
}
@Override
protected void resetInternal() {
this.reactorServer = null;
this.reactorHandler = null;
this.serverRef.set(null);
}
}

View File

@ -16,6 +16,7 @@ dependencies {
optional("com.fasterxml.jackson.core:jackson-databind")
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
optional("io.projectreactor.netty:reactor-netty-http")
optional("io.projectreactor.netty:reactor-netty5-http:2.0.0-M1")
optional("org.apache.tomcat:tomcat-websocket") {
exclude group: "org.apache.tomcat", module: "tomcat-servlet-api"
exclude group: "org.apache.tomcat", module: "tomcat-websocket-api"

View File

@ -0,0 +1,105 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.util.HashMap;
import java.util.Map;
import io.netty5.buffer.api.Buffer;
import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for Netty-based {@link WebSocketSession} adapters that provides
* convenience methods to convert Netty {@link WebSocketFrame WebSocketFrames} to and from
* {@link WebSocketMessage WebSocketMessages}.
*
* <p>This class is based on {@link NettyWebSocketSessionSupport}.
*
* @author Violeta Georgieva
* @since 6.0
* @param <T> the native delegate type
*/
public abstract class Netty5WebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {
/**
* The default max size for inbound WebSocket frames.
*/
public static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024;
private static final Map<Class<?>, WebSocketMessage.Type> messageTypes;
static {
messageTypes = new HashMap<>(8);
messageTypes.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
messageTypes.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
messageTypes.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
messageTypes.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
}
protected Netty5WebSocketSessionSupport(T delegate, HandshakeInfo info, Netty5DataBufferFactory factory) {
super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory);
}
@Override
public Netty5DataBufferFactory bufferFactory() {
return (Netty5DataBufferFactory) super.bufferFactory();
}
protected WebSocketMessage toMessage(WebSocketFrame frame) {
WebSocketFrame newFrame = frame.send().receive();
DataBuffer payload = bufferFactory().wrap(newFrame.binaryData());
return new WebSocketMessage(messageTypes.get(newFrame.getClass()), payload, newFrame);
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
if (message.getNativeMessage() != null) {
return message.getNativeMessage();
}
Buffer buffer = Netty5DataBufferFactory.toBuffer(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(buffer);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(buffer);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(buffer);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
}

View File

@ -0,0 +1,173 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.util.function.Consumer;
import io.netty5.channel.ChannelId;
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty5.Connection;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import reactor.netty5.channel.ChannelOperations;
import reactor.netty5.http.websocket.WebsocketInbound;
import reactor.netty5.http.websocket.WebsocketOutbound;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* {@link WebSocketSession} implementation for use with the Reactor Netty's (Netty 5)
* {@link NettyInbound} and {@link NettyOutbound}.
* This class is based on {@link ReactorNettyWebSocketSession}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2WebSocketSession
extends Netty5WebSocketSessionSupport<ReactorNetty2WebSocketSession.WebSocketConnection> {
private final int maxFramePayloadLength;
private final ChannelId channelId;
/**
* Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value.
*/
public ReactorNetty2WebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, Netty5DataBufferFactory bufferFactory) {
this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE);
}
/**
* Constructor with an additional maxFramePayloadLength argument.
* @since 5.1
*/
@SuppressWarnings("rawtypes")
public ReactorNetty2WebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, Netty5DataBufferFactory bufferFactory,
int maxFramePayloadLength) {
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
this.maxFramePayloadLength = maxFramePayloadLength;
this.channelId = ((ChannelOperations) inbound).channel().id();
}
/**
* Return the id of the underlying Netty channel.
* @since 5.3.4
*/
public ChannelId getChannelId() {
return this.channelId;
}
@Override
public Flux<WebSocketMessage> receive() {
return getDelegate().getInbound()
.aggregateFrames(this.maxFramePayloadLength)
.receiveFrames()
.map(super::toMessage)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Received " + message);
}
});
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frames = Flux.from(messages)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Sending " + message);
}
})
.map(this::toFrame);
return getDelegate().getOutbound()
.sendObject(frames)
.then();
}
@Override
public boolean isOpen() {
DisposedCallback callback = new DisposedCallback();
getDelegate().getInbound().withConnection(callback);
return !callback.isDisposed();
}
@Override
public Mono<Void> close(CloseStatus status) {
// this will notify WebSocketInbound.receiveCloseStatus()
return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason());
}
@Override
public Mono<CloseStatus> closeStatus() {
return getDelegate().getInbound().receiveCloseStatus()
.map(status -> CloseStatus.create(status.code(), status.reasonText()));
}
/**
* Simple container for {@link NettyInbound} and {@link NettyOutbound}.
*/
public static class WebSocketConnection {
private final WebsocketInbound inbound;
private final WebsocketOutbound outbound;
public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) {
this.inbound = inbound;
this.outbound = outbound;
}
public WebsocketInbound getInbound() {
return this.inbound;
}
public WebsocketOutbound getOutbound() {
return this.outbound;
}
}
private static class DisposedCallback implements Consumer<Connection> {
private boolean disposed;
public boolean isDisposed() {
return this.disposed;
}
@Override
public void accept(Connection connection) {
this.disposed = connection.isDisposed();
}
}
}

View File

@ -0,0 +1,230 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.netty5.http.client.HttpClient;
import reactor.netty5.http.client.WebsocketClientSpec;
import reactor.netty5.http.websocket.WebsocketInbound;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.ReactorNetty2WebSocketSession;
/**
* {@link WebSocketClient} implementation for use with Reactor Netty for Netty 5.
*
* <p>This class is based on {@link ReactorNettyWebSocketClient}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2WebSocketClient implements WebSocketClient {
private static final Log logger = LogFactory.getLog(ReactorNetty2WebSocketClient.class);
private final HttpClient httpClient;
private final Supplier<WebsocketClientSpec.Builder> specBuilderSupplier;
@Nullable
private Integer maxFramePayloadLength;
@Nullable
private Boolean handlePing;
/**
* Default constructor.
*/
public ReactorNetty2WebSocketClient() {
this(HttpClient.create());
}
/**
* Constructor that accepts an existing {@link HttpClient} builder
* with a default {@link WebsocketClientSpec.Builder}.
* @since 5.1
*/
public ReactorNetty2WebSocketClient(HttpClient httpClient) {
this(httpClient, WebsocketClientSpec.builder());
}
/**
* Constructor that accepts an existing {@link HttpClient} builder
* and a pre-configured {@link WebsocketClientSpec.Builder}.
* @since 5.3
*/
public ReactorNetty2WebSocketClient(
HttpClient httpClient, Supplier<WebsocketClientSpec.Builder> builderSupplier) {
Assert.notNull(httpClient, "HttpClient is required");
Assert.notNull(builderSupplier, "WebsocketClientSpec.Builder is required");
this.httpClient = httpClient;
this.specBuilderSupplier = builderSupplier;
}
/**
* Return the configured {@link HttpClient}.
*/
public HttpClient getHttpClient() {
return this.httpClient;
}
/**
* Build an instance of {@code WebsocketClientSpec} that reflects the current
* configuration. This can be used to check the configured parameters except
* for sub-protocols which depend on the {@link WebSocketHandler} that is used
* for a given upgrade.
* @since 5.3
*/
public WebsocketClientSpec getWebsocketClientSpec() {
return buildSpec(null);
}
private WebsocketClientSpec buildSpec(@Nullable String protocols) {
WebsocketClientSpec.Builder builder = this.specBuilderSupplier.get();
if (StringUtils.hasText(protocols)) {
builder.protocols(protocols);
}
if (this.maxFramePayloadLength != null) {
builder.maxFramePayloadLength(this.maxFramePayloadLength);
}
if (this.handlePing != null) {
builder.handlePing(this.handlePing);
}
return builder.build();
}
/**
* Configure the maximum allowable frame payload length. Setting this value
* to your application's requirement may reduce denial of service attacks
* using long data frames.
* <p>Corresponds to the argument with the same name in the constructor of
* {@link io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
* WebSocketServerHandshakerFactory} in Netty.
* <p>By default set to 65536 (64K).
* @param maxFramePayloadLength the max length for frames.
* @since 5.2
* @deprecated as of 5.3 in favor of providing a supplier of
* {@link WebsocketClientSpec.Builder} with a
* constructor argument
*/
@Deprecated
public void setMaxFramePayloadLength(int maxFramePayloadLength) {
this.maxFramePayloadLength = maxFramePayloadLength;
}
/**
* Return the configured {@link #setMaxFramePayloadLength(int) maxFramePayloadLength}.
* @since 5.2
* @deprecated as of 5.3 in favor of {@link #getWebsocketClientSpec()}
*/
@Deprecated
public int getMaxFramePayloadLength() {
return getWebsocketClientSpec().maxFramePayloadLength();
}
/**
* Configure whether to let ping frames through to be handled by the
* {@link WebSocketHandler} given to the execute method. By default, Reactor
* Netty automatically replies with pong frames in response to pings. This is
* useful in a proxy for allowing ping and pong frames through.
* <p>By default this is set to {@code false} in which case ping frames are
* handled automatically by Reactor Netty. If set to {@code true}, ping
* frames will be passed through to the {@link WebSocketHandler}.
* @param handlePing whether to let Ping frames through for handling
* @since 5.2.4
* @deprecated as of 5.3 in favor of providing a supplier of
* {@link WebsocketClientSpec.Builder} with a
* constructor argument
*/
@Deprecated
public void setHandlePing(boolean handlePing) {
this.handlePing = handlePing;
}
/**
* Return the configured {@link #setHandlePing(boolean)}.
* @since 5.2.4
* @deprecated as of 5.3 in favor of {@link #getWebsocketClientSpec()}
*/
@Deprecated
public boolean getHandlePing() {
return getWebsocketClientSpec().handlePing();
}
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
}
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(buildSpec(protocols))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
Netty5DataBufferFactory factory = new Netty5DataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNetty2WebSocketSession(
inbound, outbound, info, factory, getMaxFramePayloadLength());
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session).checkpoint(url + " [ReactorNetty2WebSocketClient]");
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
private void setNettyHeaders(HttpHeaders httpHeaders, io.netty5.handler.codec.http.HttpHeaders nettyHeaders) {
httpHeaders.forEach(nettyHeaders::set);
}
private HttpHeaders toHttpHeaders(WebsocketInbound inbound) {
HttpHeaders headers = new HttpHeaders();
io.netty5.handler.codec.http.HttpHeaders nettyHeaders = inbound.headers();
nettyHeaders.forEach(entry -> {
String name = entry.getKey();
headers.put(name, nettyHeaders.getAll(name));
});
return headers;
}
}

View File

@ -0,0 +1,183 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.netty5.http.server.HttpServerResponse;
import reactor.netty5.http.server.WebsocketServerSpec;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ReactorNetty2WebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@link RequestUpgradeStrategy} for use with Reactor Netty for Netty 5.
*
* <p>This class is based on {@link ReactorNettyRequestUpgradeStrategy}.
*\
* @author Violeta Georgieva
* @since 6.0
*/
public class ReactorNetty2RequestUpgradeStrategy implements RequestUpgradeStrategy {
private final Supplier<WebsocketServerSpec.Builder> specBuilderSupplier;
@Nullable
private Integer maxFramePayloadLength;
@Nullable
private Boolean handlePing;
/**
* Create an instances with a default {@link WebsocketServerSpec.Builder}.
* @since 5.2.6
*/
public ReactorNetty2RequestUpgradeStrategy() {
this(WebsocketServerSpec::builder);
}
/**
* Create an instance with a pre-configured {@link WebsocketServerSpec.Builder}
* to use for WebSocket upgrades.
* @since 5.2.6
*/
public ReactorNetty2RequestUpgradeStrategy(Supplier<WebsocketServerSpec.Builder> builderSupplier) {
Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required");
this.specBuilderSupplier = builderSupplier;
}
/**
* Build an instance of {@code WebsocketServerSpec} that reflects the current
* configuration. This can be used to check the configured parameters except
* for sub-protocols which depend on the {@link WebSocketHandler} that is used
* for a given upgrade.
* @since 5.2.6
*/
public WebsocketServerSpec getWebsocketServerSpec() {
return buildSpec(null);
}
WebsocketServerSpec buildSpec(@Nullable String subProtocol) {
WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get();
if (subProtocol != null) {
builder.protocols(subProtocol);
}
if (this.maxFramePayloadLength != null) {
builder.maxFramePayloadLength(this.maxFramePayloadLength);
}
if (this.handlePing != null) {
builder.handlePing(this.handlePing);
}
return builder.build();
}
/**
* Configure the maximum allowable frame payload length. Setting this value
* to your application's requirement may reduce denial of service attacks
* using long data frames.
* <p>Corresponds to the argument with the same name in the constructor of
* {@link io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
* WebSocketServerHandshakerFactory} in Netty.
* <p>By default set to 65536 (64K).
* @param maxFramePayloadLength the max length for frames.
* @since 5.1
* @deprecated as of 5.2.6 in favor of providing a supplier of
* {@link WebsocketServerSpec.Builder} with a
* constructor argument
*/
@Deprecated
public void setMaxFramePayloadLength(Integer maxFramePayloadLength) {
this.maxFramePayloadLength = maxFramePayloadLength;
}
/**
* Return the configured max length for frames.
* @since 5.1
* @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
*/
@Deprecated
public int getMaxFramePayloadLength() {
return getWebsocketServerSpec().maxFramePayloadLength();
}
/**
* Configure whether to let ping frames through to be handled by the
* {@link WebSocketHandler} given to the upgrade method. By default, Reactor
* Netty automatically replies with pong frames in response to pings. This is
* useful in a proxy for allowing ping and pong frames through.
* <p>By default this is set to {@code false} in which case ping frames are
* handled automatically by Reactor Netty. If set to {@code true}, ping
* frames will be passed through to the {@link WebSocketHandler}.
* @param handlePing whether to let Ping frames through for handling
* @since 5.2.4
* @deprecated as of 5.2.6 in favor of providing a supplier of
* {@link WebsocketServerSpec.Builder} with a
* constructor argument
*/
@Deprecated
public void setHandlePing(boolean handlePing) {
this.handlePing = handlePing;
}
/**
* Return the configured {@link #setHandlePing(boolean)}.
* @since 5.2.4
* @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
*/
@Deprecated
public boolean getHandlePing() {
return getWebsocketServerSpec().handlePing();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ServerHttpResponseDecorator.getNativeResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
Netty5DataBufferFactory bufferFactory = (Netty5DataBufferFactory) response.bufferFactory();
URI uri = exchange.getRequest().getURI();
// Trigger WebFlux preCommit actions and upgrade
return response.setComplete()
.then(Mono.defer(() -> {
WebsocketServerSpec spec = buildSpec(subProtocol);
return reactorResponse.sendWebsocket((in, out) -> {
ReactorNetty2WebSocketSession session =
new ReactorNetty2WebSocketSession(
in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength());
return handler.handle(session).checkpoint(uri + " [ReactorNetty2RequestUpgradeStrategy]");
}, spec);
}));
}
}

View File

@ -73,6 +73,7 @@ import org.springframework.http.client.reactive.HttpComponentsClientHttpConnecto
import org.springframework.http.client.reactive.JdkClientHttpConnector;
import org.springframework.http.client.reactive.JettyClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorNetty2ClientHttpConnector;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.testfixture.xml.Pojo;
@ -102,6 +103,7 @@ class WebClientIntegrationTests {
static Stream<Named<ClientHttpConnector>> arguments() {
return Stream.of(
named("Reactor Netty", new ReactorClientHttpConnector()),
named("Reactor Netty 2", new ReactorNetty2ClientHttpConnector()),
named("JDK", new JdkClientHttpConnector()),
named("Jetty", new JettyClientHttpConnector()),
named("HttpComponents", new HttpComponentsClientHttpConnector())
@ -860,6 +862,12 @@ class WebClientIntegrationTests {
@ParameterizedWebClientTest
void statusHandlerSuppressedErrorSignalWithFlux(ClientHttpConnector connector) {
// Temporarily disabled, leads to io.netty5.buffer.api.BufferClosedException
if (connector instanceof ReactorNetty2ClientHttpConnector) {
return;
}
startServer(connector);
prepareResponse(response -> response.setResponseCode(500)

View File

@ -46,6 +46,7 @@ import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNetty2WebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.TomcatWebSocketClient;
import org.springframework.web.reactive.socket.client.UndertowWebSocketClient;
@ -55,6 +56,7 @@ import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
@ -63,6 +65,7 @@ import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorNetty2HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
@ -92,6 +95,7 @@ abstract class AbstractWebSocketIntegrationTests {
new TomcatWebSocketClient(),
new JettyWebSocketClient(),
new ReactorNettyWebSocketClient(),
new ReactorNetty2WebSocketClient(),
new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))
};
@ -99,6 +103,7 @@ abstract class AbstractWebSocketIntegrationTests {
servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class);
servers.put(new JettyHttpServer(), JettyConfig.class);
servers.put(new ReactorHttpServer(), ReactorNettyConfig.class);
servers.put(new ReactorNetty2HttpServer(), ReactorNetty2Config.class);
servers.put(new UndertowHttpServer(), UndertowConfig.class);
// Try each client once against each server..
@ -204,6 +209,14 @@ abstract class AbstractWebSocketIntegrationTests {
}
}
@Configuration
static class ReactorNetty2Config extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new ReactorNetty2RequestUpgradeStrategy();
}
}
@Configuration
static class TomcatConfig extends AbstractHandlerAdapterConfig {