Remove Undertow-specific support and testing
Undertow does not support Servlet 6.1, we need to remove compatibility tests as well as Undertow-specific classes for WebSocket and reactive support. Closes gh-35354
This commit is contained in:
parent
887ef75700
commit
fce7b3d420
|
@ -3,8 +3,8 @@
|
|||
|
||||
Java NIO provides `ByteBuffer` but many libraries build their own byte buffer API on top,
|
||||
especially for network operations where reusing buffers and/or using direct buffers is
|
||||
beneficial for performance. For example Netty has the `ByteBuf` hierarchy, Undertow uses
|
||||
XNIO, Jetty uses pooled byte buffers with a callback to be released, and so on.
|
||||
beneficial for performance. For example Netty has the `ByteBuf` hierarchy,
|
||||
Jetty uses pooled byte buffers with a callback to be released, and so on.
|
||||
The `spring-core` module provides a set of abstractions to work with various byte buffer
|
||||
APIs as follows:
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ As of Spring Framework 6.0, Spring has been upgraded to the Jakarta EE 9 level
|
|||
traditional `javax` packages. With EE 9 as the minimum and EE 10 supported already,
|
||||
Spring is prepared to provide out-of-the-box support for the further evolution of
|
||||
the Jakarta EE APIs. Spring Framework 6.0 is fully compatible with Tomcat 10.1,
|
||||
Jetty 11 and Undertow 2.3 as web servers, and also with Hibernate ORM 6.1.
|
||||
Jetty 11 as web servers, and also with Hibernate ORM 6.1.
|
||||
|
||||
Over time, the role of Java/Jakarta EE in application development has evolved. In the
|
||||
early days of J2EE and Spring, applications were created to be deployed to an application
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
This part of the documentation covers support for reactive-stack web applications built
|
||||
on a {reactive-streams-site}/[Reactive Streams] API to run on non-blocking servers,
|
||||
such as Netty, Undertow, and Servlet containers. Individual chapters cover
|
||||
such as Netty and Servlet containers. Individual chapters cover
|
||||
the xref:web/webflux.adoc#webflux[Spring WebFlux] framework,
|
||||
the reactive xref:web/webflux-webclient.adoc[`WebClient`],
|
||||
support for xref:web/webflux-test.adoc[testing],
|
||||
|
|
|
@ -367,7 +367,7 @@ subsequently use `DataBufferUtils.release(dataBuffer)` when the buffers are cons
|
|||
`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default, that is an instance
|
||||
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and
|
||||
then uses `RequestUpgradeStrategy` for the server in use. Currently, there is built-in
|
||||
support for Reactor Netty, Tomcat, Jetty, and Undertow.
|
||||
support for Reactor Netty, Tomcat, and Jetty.
|
||||
|
||||
`HandshakeWebSocketService` exposes a `sessionAttributePredicate` property that allows
|
||||
setting a `Predicate<String>` to extract attributes from the `WebSession` and insert them
|
||||
|
@ -446,7 +446,7 @@ specify CORS settings by URL pattern. If both are specified, they are combined b
|
|||
=== Client
|
||||
|
||||
Spring WebFlux provides a `WebSocketClient` abstraction with implementations for
|
||||
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).
|
||||
Reactor Netty, Tomcat, Jetty, and standard Java (that is, JSR-356).
|
||||
|
||||
NOTE: The Tomcat client is effectively an extension of the standard Java one with some extra
|
||||
functionality in the `WebSocketSession` handling to take advantage of the Tomcat-specific
|
||||
|
|
|
@ -8,7 +8,7 @@ The original web framework included in the Spring Framework, Spring Web MVC, was
|
|||
purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework,
|
||||
Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports
|
||||
{reactive-streams-site}/[Reactive Streams] back pressure, and runs on such servers as
|
||||
Netty, Undertow, and Servlet containers.
|
||||
Netty, and Servlet containers.
|
||||
|
||||
Both web frameworks mirror the names of their source modules
|
||||
({spring-framework-code}/spring-webmvc[spring-webmvc] and
|
||||
|
|
|
@ -4,6 +4,6 @@
|
|||
|
||||
[.small]#xref:web/webmvc/mvc-http2.adoc[See equivalent in the Servlet stack]#
|
||||
|
||||
HTTP/2 is supported with Reactor Netty, Tomcat, Jetty, and Undertow. However, there are
|
||||
HTTP/2 is supported with Reactor Netty, Tomcat, and Jetty. However, there are
|
||||
considerations related to server configuration. For more details, see the
|
||||
{spring-framework-wiki}/HTTP-2-support[HTTP/2 wiki page].
|
||||
|
|
|
@ -127,7 +127,7 @@ You have maximum choice of libraries, since, historically, most are blocking.
|
|||
|
||||
* If you are already shopping for a non-blocking web stack, Spring WebFlux offers the same
|
||||
execution model benefits as others in this space and also provides a choice of servers
|
||||
(Netty, Tomcat, Jetty, Undertow, and Servlet containers), a choice of programming models
|
||||
(Netty, Tomcat, Jetty, and Servlet containers), a choice of programming models
|
||||
(annotated controllers and functional web endpoints), and a choice of reactive libraries
|
||||
(Reactor, RxJava, or other).
|
||||
|
||||
|
@ -165,7 +165,7 @@ unsure what benefits to look for, start by learning about how non-blocking I/O w
|
|||
== Servers
|
||||
|
||||
Spring WebFlux is supported on Tomcat, Jetty, Servlet containers, as well as on
|
||||
non-Servlet runtimes such as Netty and Undertow. All servers are adapted to a low-level,
|
||||
non-Servlet runtimes such as Netty. All servers are adapted to a low-level,
|
||||
xref:web/webflux/reactive-spring.adoc#webflux-httphandler[common API] so that higher-level
|
||||
xref:web/webflux/new-framework.adoc#webflux-programming-models[programming models] can be supported across servers.
|
||||
|
||||
|
@ -175,7 +175,7 @@ xref:web/webflux/dispatcher-handler.adoc#webflux-framework-config[WebFlux infras
|
|||
lines of code.
|
||||
|
||||
Spring Boot has a WebFlux starter that automates these steps. By default, the starter uses
|
||||
Netty, but it is easy to switch to Tomcat, Jetty, or Undertow by changing your
|
||||
Netty, but it is easy to switch to Tomcat, or Jetty by changing your
|
||||
Maven or Gradle dependencies. Spring Boot defaults to Netty, because it is more widely
|
||||
used in the asynchronous, non-blocking space and lets a client and a server share resources.
|
||||
|
||||
|
@ -188,8 +188,6 @@ adapter. It is not exposed for direct use.
|
|||
NOTE: It is strongly advised not to map Servlet filters or directly manipulate the Servlet API in the context of a WebFlux application.
|
||||
For the reasons listed above, mixing blocking I/O and non-blocking I/O in the same context will cause runtime issues.
|
||||
|
||||
For Undertow, Spring WebFlux uses Undertow APIs directly without the Servlet API.
|
||||
|
||||
|
||||
[[webflux-performance]]
|
||||
== Performance
|
||||
|
|
|
@ -7,7 +7,7 @@ applications:
|
|||
* For server request processing there are two levels of support.
|
||||
** xref:web/webflux/reactive-spring.adoc#webflux-httphandler[HttpHandler]: Basic contract for HTTP request handling with
|
||||
non-blocking I/O and Reactive Streams back pressure, along with adapters for Reactor Netty,
|
||||
Undertow, Tomcat, Jetty, and any Servlet container.
|
||||
Tomcat, Jetty, and any Servlet container.
|
||||
** xref:web/webflux/reactive-spring.adoc#webflux-web-handler-api[`WebHandler` API]: Slightly higher level, general-purpose web API for
|
||||
request handling, on top of which concrete programming models such as annotated
|
||||
controllers and functional endpoints are built.
|
||||
|
@ -40,10 +40,6 @@ The following table describes the supported server APIs:
|
|||
| Netty API
|
||||
| {reactor-github-org}/reactor-netty[Reactor Netty]
|
||||
|
||||
| Undertow
|
||||
| Undertow API
|
||||
| spring-web: Undertow to Reactive Streams bridge
|
||||
|
||||
| Tomcat
|
||||
| Servlet non-blocking I/O; Tomcat API to read and write ByteBuffers vs byte[]
|
||||
| spring-web: Servlet non-blocking I/O to Reactive Streams bridge
|
||||
|
@ -67,10 +63,6 @@ The following table describes server dependencies (also see
|
|||
|io.projectreactor.netty
|
||||
|reactor-netty
|
||||
|
||||
|Undertow
|
||||
|io.undertow
|
||||
|undertow-core
|
||||
|
||||
|Tomcat
|
||||
|org.apache.tomcat.embed
|
||||
|tomcat-embed-core
|
||||
|
@ -104,30 +96,6 @@ Kotlin::
|
|||
----
|
||||
======
|
||||
|
||||
*Undertow*
|
||||
[tabs]
|
||||
======
|
||||
Java::
|
||||
+
|
||||
[source,java,indent=0,subs="verbatim,quotes"]
|
||||
----
|
||||
HttpHandler handler = ...
|
||||
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
|
||||
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
|
||||
server.start();
|
||||
----
|
||||
|
||||
Kotlin::
|
||||
+
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes"]
|
||||
----
|
||||
val handler: HttpHandler = ...
|
||||
val adapter = UndertowHttpHandlerAdapter(handler)
|
||||
val server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build()
|
||||
server.start()
|
||||
----
|
||||
======
|
||||
|
||||
*Tomcat*
|
||||
[tabs]
|
||||
======
|
||||
|
|
|
@ -54,9 +54,6 @@ dependencies {
|
|||
api("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
|
||||
api("io.reactivex.rxjava3:rxjava:3.1.10")
|
||||
api("io.smallrye.reactive:mutiny:1.10.0")
|
||||
api("io.undertow:undertow-core:2.3.18.Final")
|
||||
api("io.undertow:undertow-servlet:2.3.18.Final")
|
||||
api("io.undertow:undertow-websockets-jsr:2.3.18.Final")
|
||||
api("io.vavr:vavr:0.10.4")
|
||||
api("jakarta.activation:jakarta.activation-api:2.1.3")
|
||||
api("jakarta.annotation:jakarta.annotation-api:3.0.0")
|
||||
|
|
|
@ -28,7 +28,6 @@ dependencies {
|
|||
optional("io.netty:netty-transport")
|
||||
optional("io.projectreactor.netty:reactor-netty-http")
|
||||
optional("io.reactivex.rxjava3:rxjava")
|
||||
optional("io.undertow:undertow-core")
|
||||
optional("jakarta.el:jakarta.el-api")
|
||||
optional("jakarta.faces:jakarta.faces-api")
|
||||
optional("jakarta.json.bind:jakarta.json.bind-api")
|
||||
|
|
|
@ -84,9 +84,8 @@ public class HeadersAdapterBenchmark {
|
|||
case "Netty" -> new Netty4HeadersAdapter(new DefaultHttpHeaders());
|
||||
case "HttpComponents" -> new HttpComponentsHeadersAdapter(new HttpGet("https://example.com"));
|
||||
case "Jetty" -> new JettyHeadersAdapter(HttpFields.build());
|
||||
// FIXME tomcat/undertow implementations (in another package)
|
||||
// FIXME tomcat implementations (in another package)
|
||||
// case "Tomcat" -> new TomcatHeadersAdapter(new MimeHeaders());
|
||||
// case "Undertow" -> new UndertowHeadersAdapter(new HeaderMap());
|
||||
default -> throw new IllegalArgumentException("Unsupported implementation: " + this.implementation);
|
||||
};
|
||||
initHeaders();
|
||||
|
|
|
@ -37,9 +37,8 @@ import org.springframework.util.Assert;
|
|||
* event-listener read APIs and Reactive Streams.
|
||||
*
|
||||
* <p>Specifically a base class for reading from the HTTP request body with
|
||||
* Servlet non-blocking I/O and Undertow XNIO as well as handling incoming
|
||||
* WebSocket messages with standard Jakarta WebSocket (JSR-356), Jetty, and
|
||||
* Undertow.
|
||||
* Servlet non-blocking I/O as well as handling incoming
|
||||
* WebSocket messages with standard Jakarta WebSocket (JSR-356), and Jetty.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Violeta Georgieva
|
||||
|
|
|
@ -34,8 +34,8 @@ import org.springframework.util.StringUtils;
|
|||
* event-listener write APIs and Reactive Streams.
|
||||
*
|
||||
* <p>Specifically a base class for writing to the HTTP response body with
|
||||
* Servlet non-blocking I/O and Undertow XNIO as well for writing WebSocket
|
||||
* messages through the Jakarta WebSocket API (JSR-356), Jetty, and Undertow.
|
||||
* Servlet non-blocking I/O as well for writing WebSocket
|
||||
* messages through the Jakarta WebSocket API (JSR-356), and Jetty.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Violeta Georgieva
|
||||
|
|
|
@ -1,272 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.util.AbstractSet;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import io.undertow.util.HeaderMap;
|
||||
import io.undertow.util.HeaderValues;
|
||||
import io.undertow.util.HttpString;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
|
||||
/**
|
||||
* {@code MultiValueMap} implementation for wrapping Undertow HTTP headers.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @author Sam Brannen
|
||||
* @since 5.1.1
|
||||
*/
|
||||
class UndertowHeadersAdapter implements MultiValueMap<String, String> {
|
||||
|
||||
private final HeaderMap headers;
|
||||
|
||||
|
||||
UndertowHeadersAdapter(HeaderMap headers) {
|
||||
this.headers = headers;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getFirst(String key) {
|
||||
return this.headers.getFirst(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(String key, @Nullable String value) {
|
||||
this.headers.add(HttpString.tryFromString(key), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void addAll(String key, List<? extends String> values) {
|
||||
this.headers.addAll(HttpString.tryFromString(key), (List<String>) values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAll(MultiValueMap<String, String> values) {
|
||||
values.forEach((key, list) -> this.headers.addAll(HttpString.tryFromString(key), list));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(String key, @Nullable String value) {
|
||||
this.headers.put(HttpString.tryFromString(key), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAll(Map<String, String> values) {
|
||||
values.forEach((key, list) -> this.headers.put(HttpString.tryFromString(key), list));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> toSingleValueMap() {
|
||||
Map<String, String> singleValueMap = CollectionUtils.newLinkedHashMap(this.headers.size());
|
||||
this.headers.forEach(values ->
|
||||
singleValueMap.put(values.getHeaderName().toString(), values.getFirst()));
|
||||
return singleValueMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return this.headers.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return (this.headers.size() == 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsKey(Object key) {
|
||||
return (key instanceof String headerName && this.headers.contains(headerName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsValue(Object value) {
|
||||
return (value instanceof String &&
|
||||
this.headers.getHeaderNames().stream()
|
||||
.map(this.headers::get)
|
||||
.anyMatch(values -> values.contains(value)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable List<String> get(Object key) {
|
||||
return (key instanceof String headerName ? this.headers.get(headerName) : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable List<String> put(String key, List<String> value) {
|
||||
HeaderValues previousValues = this.headers.get(key);
|
||||
this.headers.putAll(HttpString.tryFromString(key), value);
|
||||
return previousValues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable List<String> remove(Object key) {
|
||||
if (key instanceof String headerName) {
|
||||
Collection<String> removed = this.headers.remove(headerName);
|
||||
if (removed != null) {
|
||||
return new ArrayList<>(removed);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putAll(Map<? extends String, ? extends List<String>> map) {
|
||||
map.forEach((key, values) ->
|
||||
this.headers.putAll(HttpString.tryFromString(key), values));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
this.headers.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> keySet() {
|
||||
return new HeaderNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<List<String>> values() {
|
||||
return this.headers.getHeaderNames().stream()
|
||||
.map(this.headers::get)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Entry<String, List<String>>> entrySet() {
|
||||
return new AbstractSet<>() {
|
||||
@Override
|
||||
public Iterator<Entry<String, List<String>>> iterator() {
|
||||
return new EntryIterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return headers.size();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return org.springframework.http.HttpHeaders.formatHeaders(this);
|
||||
}
|
||||
|
||||
|
||||
private class EntryIterator implements Iterator<Entry<String, List<String>>> {
|
||||
|
||||
private final Iterator<HttpString> names = headers.getHeaderNames().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return this.names.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Entry<String, List<String>> next() {
|
||||
return new HeaderEntry(this.names.next());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class HeaderEntry implements Entry<String, List<String>> {
|
||||
|
||||
private final HttpString key;
|
||||
|
||||
HeaderEntry(HttpString key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return this.key.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getValue() {
|
||||
return headers.get(this.key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> setValue(List<String> value) {
|
||||
List<String> previousValues = headers.get(this.key);
|
||||
headers.putAll(this.key, value);
|
||||
return previousValues;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class HeaderNames extends AbstractSet<String> {
|
||||
|
||||
@Override
|
||||
public Iterator<String> iterator() {
|
||||
return new HeaderNamesIterator(headers.getHeaderNames().iterator());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return headers.getHeaderNames().size();
|
||||
}
|
||||
}
|
||||
|
||||
private final class HeaderNamesIterator implements Iterator<String> {
|
||||
|
||||
private final Iterator<HttpString> iterator;
|
||||
|
||||
private @Nullable String currentName;
|
||||
|
||||
private HeaderNamesIterator(Iterator<HttpString> iterator) {
|
||||
this.iterator = iterator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return this.iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
this.currentName = this.iterator.next().toString();
|
||||
return this.currentName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
if (this.currentName == null) {
|
||||
throw new IllegalStateException("No current Header in iterator");
|
||||
}
|
||||
if (!headers.contains(this.currentName)) {
|
||||
throw new IllegalStateException("Header not present: " + this.currentName);
|
||||
}
|
||||
headers.remove(this.currentName);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,141 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import io.undertow.server.HttpServerExchange;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.HttpLogging;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Adapt {@link HttpHandler} to the Undertow {@link io.undertow.server.HttpHandler}.
|
||||
*
|
||||
* @author Marek Hawrylczak
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Arjen Poutsma
|
||||
* @since 5.0
|
||||
*/
|
||||
public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandler {
|
||||
|
||||
private static final Log logger = HttpLogging.forLogName(UndertowHttpHandlerAdapter.class);
|
||||
|
||||
|
||||
private final HttpHandler httpHandler;
|
||||
|
||||
private DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance;
|
||||
|
||||
|
||||
public UndertowHttpHandlerAdapter(HttpHandler httpHandler) {
|
||||
Assert.notNull(httpHandler, "HttpHandler must not be null");
|
||||
this.httpHandler = httpHandler;
|
||||
}
|
||||
|
||||
|
||||
public void setDataBufferFactory(DataBufferFactory bufferFactory) {
|
||||
Assert.notNull(bufferFactory, "DataBufferFactory must not be null");
|
||||
this.bufferFactory = bufferFactory;
|
||||
}
|
||||
|
||||
public DataBufferFactory getDataBufferFactory() {
|
||||
return this.bufferFactory;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void handleRequest(HttpServerExchange exchange) {
|
||||
exchange.dispatch(() -> {
|
||||
UndertowServerHttpRequest request = null;
|
||||
try {
|
||||
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
|
||||
}
|
||||
catch (URISyntaxException ex) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.debug("Failed to get request URI: " + ex.getMessage());
|
||||
}
|
||||
exchange.setStatusCode(400);
|
||||
return;
|
||||
}
|
||||
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
|
||||
|
||||
if (request.getMethod() == HttpMethod.HEAD) {
|
||||
response = new HttpHeadResponseDecorator(response);
|
||||
}
|
||||
|
||||
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
|
||||
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private static class HandlerResultSubscriber implements Subscriber<Void> {
|
||||
|
||||
private final HttpServerExchange exchange;
|
||||
|
||||
private final String logPrefix;
|
||||
|
||||
|
||||
public HandlerResultSubscriber(HttpServerExchange exchange, UndertowServerHttpRequest request) {
|
||||
this.exchange = exchange;
|
||||
this.logPrefix = request.getLogPrefix();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
subscription.request(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Void aVoid) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable ex) {
|
||||
logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage());
|
||||
if (this.exchange.isResponseStarted()) {
|
||||
try {
|
||||
logger.debug(this.logPrefix + "Closing connection");
|
||||
this.exchange.getConnection().close();
|
||||
}
|
||||
catch (IOException ex2) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.debug(this.logPrefix + "Setting HttpServerExchange status to 500 Server Error");
|
||||
this.exchange.setStatusCode(500);
|
||||
this.exchange.endExchange();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
logger.trace(this.logPrefix + "Handling completed");
|
||||
this.exchange.endExchange();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,197 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import io.undertow.connector.ByteBufferPool;
|
||||
import io.undertow.connector.PooledByteBuffer;
|
||||
import io.undertow.server.HttpServerExchange;
|
||||
import io.undertow.server.handlers.Cookie;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.xnio.channels.StreamSourceChannel;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.http.HttpCookie;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Adapt {@link ServerHttpRequest} to the Undertow {@link HttpServerExchange}.
|
||||
*
|
||||
* @author Marek Hawrylczak
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.0
|
||||
*/
|
||||
class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||
|
||||
private static final AtomicLong logPrefixIndex = new AtomicLong();
|
||||
|
||||
|
||||
private final HttpServerExchange exchange;
|
||||
|
||||
private final RequestBodyPublisher body;
|
||||
|
||||
|
||||
public UndertowServerHttpRequest(HttpServerExchange exchange, DataBufferFactory bufferFactory)
|
||||
throws URISyntaxException {
|
||||
|
||||
super(HttpMethod.valueOf(exchange.getRequestMethod().toString()), initUri(exchange), "",
|
||||
new HttpHeaders(new UndertowHeadersAdapter(exchange.getRequestHeaders())));
|
||||
this.exchange = exchange;
|
||||
this.body = new RequestBodyPublisher(exchange, bufferFactory);
|
||||
this.body.registerListeners(exchange);
|
||||
}
|
||||
|
||||
private static URI initUri(HttpServerExchange exchange) throws URISyntaxException {
|
||||
Assert.notNull(exchange, "HttpServerExchange is required");
|
||||
String requestURL = exchange.getRequestURL();
|
||||
String query = exchange.getQueryString();
|
||||
String requestUriAndQuery = (StringUtils.hasLength(query) ? requestURL + "?" + query : requestURL);
|
||||
return new URI(requestUriAndQuery);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MultiValueMap<String, HttpCookie> initCookies() {
|
||||
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
|
||||
for (Cookie cookie : this.exchange.requestCookies()) {
|
||||
HttpCookie httpCookie = new HttpCookie(cookie.getName(), cookie.getValue());
|
||||
cookies.add(cookie.getName(), httpCookie);
|
||||
}
|
||||
return cookies;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable InetSocketAddress getLocalAddress() {
|
||||
return this.exchange.getDestinationAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable InetSocketAddress getRemoteAddress() {
|
||||
return this.exchange.getSourceAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable SslInfo initSslInfo() {
|
||||
SSLSession session = this.exchange.getConnection().getSslSession();
|
||||
if (session != null) {
|
||||
return new DefaultSslInfo(session);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
return Flux.from(this.body);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> T getNativeRequest() {
|
||||
return (T) this.exchange;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String initId() {
|
||||
return ObjectUtils.getIdentityHexString(this.exchange.getConnection()) +
|
||||
"-" + logPrefixIndex.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
private class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
|
||||
|
||||
private final StreamSourceChannel channel;
|
||||
|
||||
private final DataBufferFactory bufferFactory;
|
||||
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
|
||||
public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
|
||||
super(UndertowServerHttpRequest.this.getLogPrefix());
|
||||
this.channel = exchange.getRequestChannel();
|
||||
this.bufferFactory = bufferFactory;
|
||||
this.byteBufferPool = exchange.getConnection().getByteBufferPool();
|
||||
}
|
||||
|
||||
private void registerListeners(HttpServerExchange exchange) {
|
||||
exchange.addExchangeCompleteListener((ex, next) -> {
|
||||
onAllDataRead();
|
||||
next.proceed();
|
||||
});
|
||||
this.channel.getReadSetter().set(c -> onDataAvailable());
|
||||
this.channel.getCloseSetter().set(c -> onAllDataRead());
|
||||
this.channel.resumeReads();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void checkOnDataAvailable() {
|
||||
this.channel.resumeReads();
|
||||
// We are allowed to try, it will return null if data is not available
|
||||
onDataAvailable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readingPaused() {
|
||||
this.channel.suspendReads();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable DataBuffer read() throws IOException {
|
||||
PooledByteBuffer pooledByteBuffer = this.byteBufferPool.allocate();
|
||||
try (pooledByteBuffer) {
|
||||
ByteBuffer byteBuffer = pooledByteBuffer.getBuffer();
|
||||
int read = this.channel.read(byteBuffer);
|
||||
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : ""));
|
||||
}
|
||||
|
||||
if (read > 0) {
|
||||
byteBuffer.flip();
|
||||
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read);
|
||||
dataBuffer.write(byteBuffer);
|
||||
return dataBuffer;
|
||||
}
|
||||
else if (read == -1) {
|
||||
onAllDataRead();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void discardData() {
|
||||
// Nothing to discard since we pass data buffers on immediately..
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,344 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
import io.undertow.server.HttpServerExchange;
|
||||
import io.undertow.server.handlers.Cookie;
|
||||
import io.undertow.server.handlers.CookieImpl;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.reactivestreams.Processor;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.xnio.channels.StreamSinkChannel;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.MonoSink;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatusCode;
|
||||
import org.springframework.http.ResponseCookie;
|
||||
import org.springframework.http.ZeroCopyHttpOutputMessage;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Adapt {@link ServerHttpResponse} to the Undertow {@link HttpServerExchange}.
|
||||
*
|
||||
* @author Marek Hawrylczak
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Arjen Poutsma
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.0
|
||||
*/
|
||||
class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse implements ZeroCopyHttpOutputMessage {
|
||||
|
||||
private final HttpServerExchange exchange;
|
||||
|
||||
private final UndertowServerHttpRequest request;
|
||||
|
||||
private @Nullable StreamSinkChannel responseChannel;
|
||||
|
||||
|
||||
UndertowServerHttpResponse(
|
||||
HttpServerExchange exchange, DataBufferFactory bufferFactory, UndertowServerHttpRequest request) {
|
||||
|
||||
super(bufferFactory, createHeaders(exchange));
|
||||
this.exchange = exchange;
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
private static HttpHeaders createHeaders(HttpServerExchange exchange) {
|
||||
Assert.notNull(exchange, "HttpServerExchange must not be null");
|
||||
UndertowHeadersAdapter headersMap = new UndertowHeadersAdapter(exchange.getResponseHeaders());
|
||||
return new HttpHeaders(headersMap);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> T getNativeResponse() {
|
||||
return (T) this.exchange;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpStatusCode getStatusCode() {
|
||||
HttpStatusCode status = super.getStatusCode();
|
||||
return (status != null ? status : HttpStatusCode.valueOf(this.exchange.getStatusCode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void applyStatusCode() {
|
||||
HttpStatusCode status = super.getStatusCode();
|
||||
if (status != null) {
|
||||
this.exchange.setStatusCode(status.value());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void applyHeaders() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void applyCookies() {
|
||||
for (String name : getCookies().keySet()) {
|
||||
for (ResponseCookie httpCookie : getCookies().get(name)) {
|
||||
Cookie cookie = new CookieImpl(name, httpCookie.getValue());
|
||||
if (!httpCookie.getMaxAge().isNegative()) {
|
||||
cookie.setMaxAge((int) httpCookie.getMaxAge().getSeconds());
|
||||
}
|
||||
if (httpCookie.getDomain() != null) {
|
||||
cookie.setDomain(httpCookie.getDomain());
|
||||
}
|
||||
if (httpCookie.getPath() != null) {
|
||||
cookie.setPath(httpCookie.getPath());
|
||||
}
|
||||
cookie.setSecure(httpCookie.isSecure());
|
||||
cookie.setHttpOnly(httpCookie.isHttpOnly());
|
||||
// TODO: add "Partitioned" attribute when Undertow supports it
|
||||
cookie.setSameSiteMode(httpCookie.getSameSite());
|
||||
this.exchange.setResponseCookie(cookie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> writeWith(Path file, long position, long count) {
|
||||
return doCommit(() ->
|
||||
Mono.create(sink -> {
|
||||
try {
|
||||
FileChannel source = FileChannel.open(file, StandardOpenOption.READ);
|
||||
TransferBodyListener listener = new TransferBodyListener(source, position, count, sink);
|
||||
sink.onDispose(listener::closeSource);
|
||||
StreamSinkChannel destination = this.exchange.getResponseChannel();
|
||||
destination.getWriteSetter().set(listener::transfer);
|
||||
listener.transfer(destination);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() {
|
||||
return new ResponseBodyFlushProcessor();
|
||||
}
|
||||
|
||||
private ResponseBodyProcessor createBodyProcessor() {
|
||||
if (this.responseChannel == null) {
|
||||
this.responseChannel = this.exchange.getResponseChannel();
|
||||
}
|
||||
return new ResponseBodyProcessor(this.responseChannel);
|
||||
}
|
||||
|
||||
|
||||
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
|
||||
|
||||
private final StreamSinkChannel channel;
|
||||
|
||||
private volatile @Nullable ByteBuffer byteBuffer;
|
||||
|
||||
/** Keep track of write listener calls, for {@link #writePossible}. */
|
||||
private volatile boolean writePossible;
|
||||
|
||||
|
||||
public ResponseBodyProcessor(StreamSinkChannel channel) {
|
||||
super(request.getLogPrefix());
|
||||
Assert.notNull(channel, "StreamSinkChannel must not be null");
|
||||
this.channel = channel;
|
||||
this.channel.getWriteSetter().set(c -> {
|
||||
this.writePossible = true;
|
||||
onWritePossible();
|
||||
});
|
||||
this.channel.suspendWrites();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isWritePossible() {
|
||||
this.channel.resumeWrites();
|
||||
return this.writePossible;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean write(DataBuffer dataBuffer) throws IOException {
|
||||
ByteBuffer buffer = this.byteBuffer;
|
||||
if (buffer == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Track write listener calls from here on.
|
||||
this.writePossible = false;
|
||||
|
||||
// In case of IOException, onError handling should call discardData(DataBuffer)..
|
||||
int total = buffer.remaining();
|
||||
int written = writeByteBuffer(buffer);
|
||||
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "Wrote " + written + " of " + total + " bytes");
|
||||
}
|
||||
if (written != total) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We wrote all, so can still write more.
|
||||
this.writePossible = true;
|
||||
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
this.byteBuffer = null;
|
||||
return true;
|
||||
}
|
||||
|
||||
private int writeByteBuffer(ByteBuffer byteBuffer) throws IOException {
|
||||
int written;
|
||||
int totalWritten = 0;
|
||||
do {
|
||||
written = this.channel.write(byteBuffer);
|
||||
totalWritten += written;
|
||||
}
|
||||
while (byteBuffer.hasRemaining() && written > 0);
|
||||
return totalWritten;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void dataReceived(DataBuffer dataBuffer) {
|
||||
super.dataReceived(dataBuffer);
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
|
||||
dataBuffer.toByteBuffer(byteBuffer);
|
||||
this.byteBuffer = byteBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isDataEmpty(DataBuffer dataBuffer) {
|
||||
return (dataBuffer.readableByteCount() == 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writingComplete() {
|
||||
this.channel.getWriteSetter().set(null);
|
||||
this.channel.resumeWrites();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writingFailed(Throwable ex) {
|
||||
cancel();
|
||||
onError(ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void discardData(DataBuffer dataBuffer) {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProcessor<DataBuffer> {
|
||||
|
||||
public ResponseBodyFlushProcessor() {
|
||||
super(request.getLogPrefix());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
|
||||
return UndertowServerHttpResponse.this.createBodyProcessor();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void flush() throws IOException {
|
||||
StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel;
|
||||
if (channel != null) {
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "flush");
|
||||
}
|
||||
channel.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isWritePossible() {
|
||||
StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel;
|
||||
if (channel != null) {
|
||||
// We can always call flush, just ensure writes are on.
|
||||
channel.resumeWrites();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isFlushPending() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TransferBodyListener {
|
||||
|
||||
private final FileChannel source;
|
||||
|
||||
private final MonoSink<Void> sink;
|
||||
|
||||
private long position;
|
||||
|
||||
private long count;
|
||||
|
||||
|
||||
public TransferBodyListener(FileChannel source, long position, long count, MonoSink<Void> sink) {
|
||||
this.source = source;
|
||||
this.sink = sink;
|
||||
this.position = position;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public void transfer(StreamSinkChannel destination) {
|
||||
try {
|
||||
while (this.count > 0) {
|
||||
long len = destination.transferFrom(this.source, this.position, this.count);
|
||||
if (len != 0) {
|
||||
this.position += len;
|
||||
this.count -= len;
|
||||
}
|
||||
else {
|
||||
destination.resumeWrites();
|
||||
return;
|
||||
}
|
||||
}
|
||||
this.sink.success();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
this.sink.error(ex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void closeSource() {
|
||||
try {
|
||||
this.source.close();
|
||||
}
|
||||
catch (IOException ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -5,7 +5,7 @@
|
|||
* {@link org.springframework.http.server.reactive.HttpHandler} for processing.
|
||||
*
|
||||
* <p>Also provides implementations adapting to different runtimes
|
||||
* including Servlet containers, Netty + Reactor IO, and Undertow.
|
||||
* including Servlet containers and Netty + Reactor IO.
|
||||
*/
|
||||
@NullMarked
|
||||
package org.springframework.http.server.reactive;
|
||||
|
|
|
@ -266,7 +266,7 @@ public class StandardMultipartHttpServletRequest extends AbstractMultipartHttpSe
|
|||
if (dest.isAbsolute() && !dest.exists()) {
|
||||
// Servlet Part.write is not guaranteed to support absolute file paths:
|
||||
// may translate the given path to a relative location within a temp dir
|
||||
// (for example, on Jetty whereas Tomcat and Undertow detect absolute paths).
|
||||
// (for example, on Jetty whereas Tomcat detects absolute paths).
|
||||
// At least we offloaded the file from memory storage; it'll get deleted
|
||||
// from the temp dir eventually in any case. And for our user's purposes,
|
||||
// we can manually copy it to the requested location as a fallback.
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.springframework.web.client.RestTemplate;
|
|||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeFalse;
|
||||
|
@ -80,7 +79,6 @@ class CookieIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
public void partitionedAttributeTest(HttpServer httpServer) throws Exception {
|
||||
assumeFalse(httpServer instanceof UndertowHttpServer, "Undertow does not support Partitioned cookies");
|
||||
assumeFalse(httpServer instanceof JettyHttpServer, "Jetty does not support Servlet 6.1 yet");
|
||||
startServer(httpServer);
|
||||
|
||||
|
@ -100,8 +98,6 @@ class CookieIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
public void cookiesWithSameNameTest(HttpServer httpServer) throws Exception {
|
||||
assumeFalse(httpServer instanceof UndertowHttpServer, "Bug in Undertow in Cookies with same name handling");
|
||||
|
||||
startServer(httpServer);
|
||||
|
||||
URI url = new URI("http://localhost:" + port);
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.stream.Stream;
|
|||
|
||||
import io.netty.handler.codec.http.DefaultHttpHeaders;
|
||||
import io.netty.handler.codec.http.ReadOnlyHttpHeaders;
|
||||
import io.undertow.util.HeaderMap;
|
||||
import org.apache.tomcat.util.http.MimeHeaders;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
|
@ -102,7 +101,6 @@ class DefaultServerHttpRequestBuilderTests {
|
|||
initHeader("Map", CollectionUtils.toMultiValueMap(new LinkedCaseInsensitiveMap<>(8, Locale.ENGLISH))),
|
||||
initHeader("Netty", new Netty4HeadersAdapter(new DefaultHttpHeaders())),
|
||||
initHeader("Tomcat", new TomcatHeadersAdapter(new MimeHeaders())),
|
||||
initHeader("Undertow", new UndertowHeadersAdapter(new HeaderMap())),
|
||||
initHeader("Jetty", new JettyHeadersAdapter(HttpFields.build())),
|
||||
//immutable versions of some headers
|
||||
argumentSet("Netty immutable", new Netty4HeadersAdapter(new ReadOnlyHttpHeaders(false,
|
||||
|
|
|
@ -29,8 +29,6 @@ import java.util.function.Function;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import io.netty.handler.codec.http.DefaultHttpHeaders;
|
||||
import io.undertow.util.HeaderMap;
|
||||
import io.undertow.util.HttpString;
|
||||
import org.apache.hc.client5.http.classic.methods.HttpGet;
|
||||
import org.apache.tomcat.util.http.MimeHeaders;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
|
@ -273,7 +271,6 @@ class HeadersAdaptersTests {
|
|||
argumentSet("Map", CollectionUtils.toMultiValueMap(new LinkedCaseInsensitiveMap<>(8, Locale.ENGLISH))),
|
||||
argumentSet("Netty", new Netty4HeadersAdapter(new DefaultHttpHeaders())),
|
||||
argumentSet("Tomcat", new TomcatHeadersAdapter(new MimeHeaders())),
|
||||
argumentSet("Undertow", new UndertowHeadersAdapter(new HeaderMap())),
|
||||
argumentSet("Jetty", new JettyHeadersAdapter(HttpFields.build())),
|
||||
argumentSet("HttpComponents", new HttpComponentsHeadersAdapter(new HttpGet("https://example.com")))
|
||||
);
|
||||
|
@ -291,8 +288,6 @@ class HeadersAdaptersTests {
|
|||
argumentSet("Netty", new Netty4HeadersAdapter(withHeaders(new DefaultHttpHeaders(), h -> h::add))),
|
||||
argumentSet("Tomcat", new TomcatHeadersAdapter(withHeaders(new MimeHeaders(),
|
||||
h -> (k, v) -> h.addValue(k).setString(v)))),
|
||||
argumentSet("Undertow", new UndertowHeadersAdapter(withHeaders(new HeaderMap(),
|
||||
h -> (k, v) -> h.add(HttpString.tryFromString(k), v)))),
|
||||
argumentSet("Jetty", new JettyHeadersAdapter(withHeaders(HttpFields.build(), h -> h::add))),
|
||||
argumentSet("HttpComponents", new HttpComponentsHeadersAdapter(withHeaders(new HttpGet("https://example.com"),
|
||||
h -> h::addHeader)))
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.springframework.web.testfixture.http.server.reactive.bootstrap.Abstra
|
|||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
@ -55,7 +54,7 @@ class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
void zeroCopy(HttpServer httpServer) throws Exception {
|
||||
assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer ||
|
||||
assumeTrue(httpServer instanceof ReactorHttpServer ||
|
||||
httpServer instanceof JettyCoreHttpServer, "Zero-copy does not support Servlet");
|
||||
|
||||
startServer(httpServer);
|
||||
|
|
|
@ -123,15 +123,6 @@ class StandardMultipartHttpServletRequestTests {
|
|||
.isThrownBy(() -> requestWithException(ex)).withCause(ex);
|
||||
}
|
||||
|
||||
@Test // gh-32549
|
||||
void undertowRequestTooBigException() {
|
||||
IOException ex = new IOException("Connection terminated as request was larger than 10000");
|
||||
|
||||
assertThatExceptionOfType(MaxUploadSizeExceededException.class)
|
||||
.isThrownBy(() -> requestWithException(ex)).withCause(ex);
|
||||
}
|
||||
|
||||
|
||||
private static StandardMultipartHttpServletRequest requestWithPart(String name, String disposition, String content) {
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
MockPart part = new MockPart(name, null, content.getBytes(StandardCharsets.UTF_8));
|
||||
|
@ -150,14 +141,4 @@ class StandardMultipartHttpServletRequestTests {
|
|||
return new StandardMultipartHttpServletRequest(request);
|
||||
}
|
||||
|
||||
private static StandardMultipartHttpServletRequest requestWithException(IOException ex) {
|
||||
MockHttpServletRequest request = new MockHttpServletRequest() {
|
||||
@Override
|
||||
public Collection<Part> getParts() throws IOException {
|
||||
throw ex;
|
||||
}
|
||||
};
|
||||
return new StandardMultipartHttpServletRequest(request);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -128,8 +128,7 @@ public abstract class AbstractHttpHandlerIntegrationTests {
|
|||
argumentSet("Jetty", new JettyHttpServer()),
|
||||
argumentSet("Jetty Core", new JettyCoreHttpServer()),
|
||||
argumentSet("Reactor Netty", new ReactorHttpServer()),
|
||||
argumentSet("Tomcat", new TomcatHttpServer()),
|
||||
argumentSet("Undertow", new UndertowHttpServer())
|
||||
argumentSet("Tomcat", new TomcatHttpServer())
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.testfixture.http.server.reactive.bootstrap;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import io.undertow.Undertow;
|
||||
|
||||
import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter;
|
||||
|
||||
/**
|
||||
* @author Marek Hawrylczak
|
||||
*/
|
||||
public class UndertowHttpServer extends AbstractHttpServer {
|
||||
|
||||
private Undertow server;
|
||||
|
||||
|
||||
@Override
|
||||
protected void initServer() throws Exception {
|
||||
this.server = Undertow.builder().addHttpListener(getPort(), getHost())
|
||||
.setHandler(initHttpHandlerAdapter())
|
||||
.build();
|
||||
}
|
||||
|
||||
private UndertowHttpHandlerAdapter initHttpHandlerAdapter() {
|
||||
return new UndertowHttpHandlerAdapter(resolveHttpHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void startInternal() {
|
||||
this.server.start();
|
||||
Undertow.ListenerInfo info = this.server.getListenerInfo().get(0);
|
||||
setPort(((InetSocketAddress) info.getAddress()).getPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopInternal() {
|
||||
this.server.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resetInternal() {
|
||||
this.server = null;
|
||||
}
|
||||
|
||||
}
|
|
@ -15,7 +15,6 @@ dependencies {
|
|||
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
|
||||
optional("com.google.protobuf:protobuf-java-util")
|
||||
optional("io.projectreactor.netty:reactor-netty-http")
|
||||
optional("io.undertow:undertow-websockets-jsr")
|
||||
optional("jakarta.servlet:jakarta.servlet-api")
|
||||
optional("jakarta.validation:jakarta.validation-api")
|
||||
optional("jakarta.websocket:jakarta.websocket-api")
|
||||
|
@ -45,7 +44,6 @@ dependencies {
|
|||
testImplementation("io.micrometer:micrometer-observation-test")
|
||||
testImplementation("io.projectreactor:reactor-test")
|
||||
testImplementation("io.reactivex.rxjava3:rxjava")
|
||||
testImplementation("io.undertow:undertow-core")
|
||||
testImplementation("jakarta.xml.bind:jakarta.xml.bind-api")
|
||||
testImplementation("jakarta.validation:jakarta.validation-api")
|
||||
testImplementation("org.apache.httpcomponents.client5:httpclient5")
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.springframework.web.util.pattern.PathPatternParser;
|
|||
*
|
||||
* <p>Additionally, this class can {@linkplain #toHttpHandler(RouterFunction) transform}
|
||||
* a {@code RouterFunction} into an {@code HttpHandler}, which can be run in Servlet
|
||||
* environments, Reactor, or Undertow.
|
||||
* environments, or Reactor.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Sebastien Deleuze
|
||||
|
@ -272,8 +272,6 @@ public abstract class RouterFunctions {
|
|||
* {@link org.springframework.http.server.reactive.ServletHttpHandlerAdapter}</li>
|
||||
* <li>Reactor using the
|
||||
* {@link org.springframework.http.server.reactive.ReactorHttpHandlerAdapter}
|
||||
* <li>Undertow using the
|
||||
* {@link org.springframework.http.server.reactive.UndertowHttpHandlerAdapter}</li>
|
||||
* </ul>
|
||||
* <p>Note that {@code HttpWebHandlerAdapter} also implements {@link WebHandler},
|
||||
* allowing for additional filter and exception handler registration through
|
||||
|
@ -294,8 +292,6 @@ public abstract class RouterFunctions {
|
|||
* {@link org.springframework.http.server.reactive.ServletHttpHandlerAdapter}</li>
|
||||
* <li>Reactor using the
|
||||
* {@link org.springframework.http.server.reactive.ReactorHttpHandlerAdapter}</li>
|
||||
* <li>Undertow using the
|
||||
* {@link org.springframework.http.server.reactive.UndertowHttpHandlerAdapter}</li>
|
||||
* </ul>
|
||||
* @param routerFunction the router function to convert
|
||||
* @param strategies the strategies to use
|
||||
|
|
|
@ -42,8 +42,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
|
|||
|
||||
/**
|
||||
* Base class for {@link WebSocketSession} implementations that bridge between
|
||||
* event-listener WebSocket APIs (for example, Jakarta WebSocket API (JSR-356), Jetty,
|
||||
* Undertow) and Reactive Streams.
|
||||
* event-listener WebSocket APIs (for example, Jakarta WebSocket API (JSR-356), Jetty)
|
||||
* and Reactive Streams.
|
||||
*
|
||||
* <p>Also implements {@code Subscriber<Void>} so it can be used to subscribe to
|
||||
* the completion of {@link WebSocketHandler#handle(WebSocketSession)}.
|
||||
|
|
|
@ -1,108 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.reactive.socket.adapter;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import io.undertow.websockets.WebSocketConnectionCallback;
|
||||
import io.undertow.websockets.core.AbstractReceiveListener;
|
||||
import io.undertow.websockets.core.BufferedBinaryMessage;
|
||||
import io.undertow.websockets.core.BufferedTextMessage;
|
||||
import io.undertow.websockets.core.CloseMessage;
|
||||
import io.undertow.websockets.core.WebSocketChannel;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.reactive.socket.CloseStatus;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.WebSocketMessage;
|
||||
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
|
||||
|
||||
/**
|
||||
* Undertow {@link WebSocketConnectionCallback} implementation that adapts and
|
||||
* delegates to a Spring {@link WebSocketHandler}.
|
||||
*
|
||||
* @author Violeta Georgieva
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class UndertowWebSocketHandlerAdapter extends AbstractReceiveListener {
|
||||
|
||||
private final UndertowWebSocketSession session;
|
||||
|
||||
|
||||
public UndertowWebSocketHandlerAdapter(UndertowWebSocketSession session) {
|
||||
Assert.notNull(session, "UndertowWebSocketSession is required");
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
|
||||
this.session.handleMessage(Type.TEXT, toMessage(Type.TEXT, message.getData()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
|
||||
this.session.handleMessage(Type.BINARY, toMessage(Type.BINARY, message.getData().getResource()));
|
||||
message.getData().free();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
protected void onFullPongMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
|
||||
this.session.handleMessage(Type.PONG, toMessage(Type.PONG, message.getData().getResource()));
|
||||
message.getData().free();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
|
||||
CloseMessage closeMessage = new CloseMessage(message.getData().getResource());
|
||||
this.session.handleClose(CloseStatus.create(closeMessage.getCode(), closeMessage.getReason()));
|
||||
message.getData().free();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onError(WebSocketChannel channel, Throwable error) {
|
||||
this.session.handleError(error);
|
||||
}
|
||||
|
||||
private <T> WebSocketMessage toMessage(Type type, T message) {
|
||||
if (Type.TEXT.equals(type)) {
|
||||
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
|
||||
return new WebSocketMessage(Type.TEXT, this.session.bufferFactory().wrap(bytes));
|
||||
}
|
||||
else if (Type.BINARY.equals(type) || Type.PONG.equals(type)) {
|
||||
ByteBuffer[] byteBuffers = (ByteBuffer[]) message;
|
||||
List<DataBuffer> dataBuffers = new ArrayList<>(byteBuffers.length);
|
||||
for (ByteBuffer byteBuffer : byteBuffers) {
|
||||
dataBuffers.add(this.session.bufferFactory().wrap(byteBuffer));
|
||||
}
|
||||
DataBuffer joined = this.session.bufferFactory().join(dataBuffers);
|
||||
return new WebSocketMessage(type, joined);
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unexpected message type: " + message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,141 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.reactive.socket.adapter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import io.undertow.websockets.core.CloseMessage;
|
||||
import io.undertow.websockets.core.WebSocketCallback;
|
||||
import io.undertow.websockets.core.WebSocketChannel;
|
||||
import io.undertow.websockets.core.WebSockets;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.web.reactive.socket.CloseStatus;
|
||||
import org.springframework.web.reactive.socket.HandshakeInfo;
|
||||
import org.springframework.web.reactive.socket.WebSocketMessage;
|
||||
import org.springframework.web.reactive.socket.WebSocketSession;
|
||||
|
||||
/**
|
||||
* Spring {@link WebSocketSession} implementation that adapts to an Undertow
|
||||
* {@link io.undertow.websockets.core.WebSocketChannel}.
|
||||
*
|
||||
* @author Violeta Georgieva
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
|
||||
|
||||
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) {
|
||||
this(channel, info, factory, null);
|
||||
}
|
||||
|
||||
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info,
|
||||
DataBufferFactory factory, Sinks.@Nullable Empty<Void> completionSink) {
|
||||
|
||||
super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionSink);
|
||||
suspendReceiving();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected boolean canSuspendReceiving() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void suspendReceiving() {
|
||||
getDelegate().suspendReceives();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resumeReceiving() {
|
||||
getDelegate().resumeReceives();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean sendMessage(WebSocketMessage message) throws IOException {
|
||||
DataBuffer dataBuffer = message.getPayload();
|
||||
WebSocketChannel channel = getDelegate();
|
||||
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
|
||||
getSendProcessor().setReadyToSend(false);
|
||||
String text = dataBuffer.toString(StandardCharsets.UTF_8);
|
||||
WebSockets.sendText(text, channel, new SendProcessorCallback(message.getPayload()));
|
||||
}
|
||||
else {
|
||||
getSendProcessor().setReadyToSend(false);
|
||||
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
|
||||
while (iterator.hasNext()) {
|
||||
ByteBuffer byteBuffer = iterator.next();
|
||||
switch (message.getType()) {
|
||||
case BINARY -> WebSockets.sendBinary(byteBuffer, channel, new SendProcessorCallback(dataBuffer));
|
||||
case PING -> WebSockets.sendPing(byteBuffer, channel, new SendProcessorCallback(dataBuffer));
|
||||
case PONG -> WebSockets.sendPong(byteBuffer, channel, new SendProcessorCallback(dataBuffer));
|
||||
default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return getDelegate().isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> close(CloseStatus status) {
|
||||
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
|
||||
if (!getDelegate().isCloseFrameSent()) {
|
||||
WebSockets.sendClose(cm, getDelegate(), null);
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
||||
private final class SendProcessorCallback implements WebSocketCallback<Void> {
|
||||
|
||||
private final DataBuffer payload;
|
||||
|
||||
SendProcessorCallback(DataBuffer payload) {
|
||||
this.payload = payload;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete(WebSocketChannel channel, Void context) {
|
||||
DataBufferUtils.release(this.payload);
|
||||
getSendProcessor().setReadyToSend(true);
|
||||
getSendProcessor().onWritePossible();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(WebSocketChannel channel, Void context, Throwable throwable) {
|
||||
DataBufferUtils.release(this.payload);
|
||||
getSendProcessor().cancel();
|
||||
getSendProcessor().onError(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,260 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.reactive.socket.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.undertow.connector.ByteBufferPool;
|
||||
import io.undertow.server.DefaultByteBufferPool;
|
||||
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder;
|
||||
import io.undertow.websockets.client.WebSocketClientNegotiation;
|
||||
import io.undertow.websockets.core.WebSocketChannel;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.xnio.IoFuture;
|
||||
import org.xnio.XnioWorker;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.reactive.socket.HandshakeInfo;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
|
||||
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketSession;
|
||||
|
||||
/**
|
||||
* Undertow based implementation of {@link WebSocketClient}.
|
||||
*
|
||||
* @author Violeta Georgieva
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class UndertowWebSocketClient implements WebSocketClient {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(UndertowWebSocketClient.class);
|
||||
|
||||
private static final int DEFAULT_POOL_BUFFER_SIZE = 8192;
|
||||
|
||||
|
||||
private final XnioWorker worker;
|
||||
|
||||
private ByteBufferPool byteBufferPool;
|
||||
|
||||
private final Consumer<ConnectionBuilder> builderConsumer;
|
||||
|
||||
|
||||
/**
|
||||
* Constructor with the {@link XnioWorker} to pass to
|
||||
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}.
|
||||
* @param worker the Xnio worker
|
||||
*/
|
||||
public UndertowWebSocketClient(XnioWorker worker) {
|
||||
this(worker, builder -> {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Alternate constructor providing additional control over the
|
||||
* {@link ConnectionBuilder} for each WebSocket connection.
|
||||
* @param worker the Xnio worker to use to create {@code ConnectionBuilder}'s
|
||||
* @param builderConsumer a consumer to configure {@code ConnectionBuilder}'s
|
||||
*/
|
||||
public UndertowWebSocketClient(XnioWorker worker, Consumer<ConnectionBuilder> builderConsumer) {
|
||||
this(worker, new DefaultByteBufferPool(false, DEFAULT_POOL_BUFFER_SIZE), builderConsumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Alternate constructor providing additional control over the
|
||||
* {@link ConnectionBuilder} for each WebSocket connection.
|
||||
* @param worker the Xnio worker to use to create {@code ConnectionBuilder}'s
|
||||
* @param byteBufferPool the ByteBufferPool to use to create {@code ConnectionBuilder}'s
|
||||
* @param builderConsumer a consumer to configure {@code ConnectionBuilder}'s
|
||||
* @since 5.0.8
|
||||
*/
|
||||
public UndertowWebSocketClient(XnioWorker worker, ByteBufferPool byteBufferPool,
|
||||
Consumer<ConnectionBuilder> builderConsumer) {
|
||||
|
||||
Assert.notNull(worker, "XnioWorker must not be null");
|
||||
Assert.notNull(byteBufferPool, "ByteBufferPool must not be null");
|
||||
this.worker = worker;
|
||||
this.byteBufferPool = byteBufferPool;
|
||||
this.builderConsumer = builderConsumer;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the configured {@link XnioWorker}.
|
||||
*/
|
||||
public XnioWorker getXnioWorker() {
|
||||
return this.worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link io.undertow.connector.ByteBufferPool ByteBufferPool} to pass to
|
||||
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}.
|
||||
* <p>By default an indirect {@link io.undertow.server.DefaultByteBufferPool}
|
||||
* with a buffer size of 8192 is used.
|
||||
* @since 5.0.8
|
||||
* @see #DEFAULT_POOL_BUFFER_SIZE
|
||||
*/
|
||||
public void setByteBufferPool(ByteBufferPool byteBufferPool) {
|
||||
Assert.notNull(byteBufferPool, "ByteBufferPool must not be null");
|
||||
this.byteBufferPool = byteBufferPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the {@link io.undertow.connector.ByteBufferPool} currently used
|
||||
* for newly created WebSocket sessions by this client.
|
||||
* @return the byte buffer pool
|
||||
* @since 5.0.8
|
||||
*/
|
||||
public ByteBufferPool getByteBufferPool() {
|
||||
return this.byteBufferPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured <code>Consumer<ConnectionBuilder></code>.
|
||||
*/
|
||||
public Consumer<ConnectionBuilder> getConnectionBuilderConsumer() {
|
||||
return this.builderConsumer;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Void> execute(URI url, WebSocketHandler handler) {
|
||||
return execute(url, new HttpHeaders(), handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
|
||||
return executeInternal(url, headers, handler);
|
||||
}
|
||||
|
||||
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
|
||||
Sinks.Empty<Void> completion = Sinks.empty();
|
||||
return Mono.deferContextual(
|
||||
contextView -> {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Connecting to " + url);
|
||||
}
|
||||
List<String> protocols = handler.getSubProtocols();
|
||||
ConnectionBuilder builder = createConnectionBuilder(url);
|
||||
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers, builder);
|
||||
builder.setClientNegotiation(negotiation);
|
||||
builder.connect().addNotifier(
|
||||
new IoFuture.HandlingNotifier<>() {
|
||||
@Override
|
||||
public void handleDone(WebSocketChannel channel, Object attachment) {
|
||||
handleChannel(url, ContextWebSocketHandler.decorate(handler, contextView),
|
||||
completion, negotiation, channel);
|
||||
}
|
||||
@Override
|
||||
public void handleFailed(IOException ex, Object attachment) {
|
||||
// Ignore result: can't overflow, ok if not first or no one listens
|
||||
completion.tryEmitError(
|
||||
new IllegalStateException("Failed to connect to " + url, ex));
|
||||
}
|
||||
}, null);
|
||||
return completion.asMono();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link ConnectionBuilder} for the given URI.
|
||||
* <p>The default implementation creates a builder with the configured
|
||||
* {@link #getXnioWorker() XnioWorker} and {@link #getByteBufferPool() ByteBufferPool} and
|
||||
* then passes it to the {@link #getConnectionBuilderConsumer() consumer}
|
||||
* provided at construction time.
|
||||
*/
|
||||
protected ConnectionBuilder createConnectionBuilder(URI url) {
|
||||
ConnectionBuilder builder = io.undertow.websockets.client.WebSocketClient
|
||||
.connectionBuilder(getXnioWorker(), getByteBufferPool(), url);
|
||||
this.builderConsumer.accept(builder);
|
||||
return builder;
|
||||
}
|
||||
|
||||
private void handleChannel(URI url, WebSocketHandler handler, Sinks.Empty<Void> completionSink,
|
||||
DefaultNegotiation negotiation, WebSocketChannel channel) {
|
||||
|
||||
HandshakeInfo info = createHandshakeInfo(url, negotiation);
|
||||
DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance;
|
||||
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, bufferFactory, completionSink);
|
||||
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
|
||||
|
||||
channel.getReceiveSetter().set(adapter);
|
||||
channel.resumeReceives();
|
||||
|
||||
handler.handle(session)
|
||||
.checkpoint(url + " [UndertowWebSocketClient]")
|
||||
.subscribe(session);
|
||||
}
|
||||
|
||||
private HandshakeInfo createHandshakeInfo(URI url, DefaultNegotiation negotiation) {
|
||||
HttpHeaders responseHeaders = negotiation.getResponseHeaders();
|
||||
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
|
||||
return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
|
||||
}
|
||||
|
||||
|
||||
private static final class DefaultNegotiation extends WebSocketClientNegotiation {
|
||||
|
||||
private final HttpHeaders requestHeaders;
|
||||
|
||||
private final HttpHeaders responseHeaders = new HttpHeaders();
|
||||
|
||||
private final @Nullable WebSocketClientNegotiation delegate;
|
||||
|
||||
public DefaultNegotiation(List<String> protocols, HttpHeaders requestHeaders,
|
||||
ConnectionBuilder connectionBuilder) {
|
||||
|
||||
super(protocols, Collections.emptyList());
|
||||
this.requestHeaders = requestHeaders;
|
||||
this.delegate = connectionBuilder.getClientNegotiation();
|
||||
}
|
||||
|
||||
public HttpHeaders getResponseHeaders() {
|
||||
return this.responseHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeRequest(Map<String, List<String>> headers) {
|
||||
this.requestHeaders.forEach(headers::put);
|
||||
if (this.delegate != null) {
|
||||
this.delegate.beforeRequest(headers);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterRequest(Map<String, List<String>> headers) {
|
||||
this.responseHeaders.putAll(headers);
|
||||
if (this.delegate != null) {
|
||||
this.delegate.afterRequest(headers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -48,7 +48,6 @@ import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUp
|
|||
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.upgrade.StandardWebSocketUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
|
||||
import org.springframework.web.server.MethodNotAllowedException;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.ServerWebInputException;
|
||||
|
@ -79,8 +78,6 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
|
|||
|
||||
private static final boolean jettyCoreWsPresent;
|
||||
|
||||
private static final boolean undertowWsPresent;
|
||||
|
||||
private static final boolean reactorNettyPresent;
|
||||
|
||||
static {
|
||||
|
@ -89,8 +86,6 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
|
|||
"org.eclipse.jetty.ee11.websocket.server.JettyWebSocketServerContainer", classLoader);
|
||||
jettyCoreWsPresent = ClassUtils.isPresent(
|
||||
"org.eclipse.jetty.websocket.server.ServerWebSocketContainer", classLoader);
|
||||
undertowWsPresent = ClassUtils.isPresent(
|
||||
"io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader);
|
||||
reactorNettyPresent = ClassUtils.isPresent(
|
||||
"reactor.netty.http.server.HttpServerResponse", classLoader);
|
||||
}
|
||||
|
@ -276,9 +271,6 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
|
|||
else if (jettyCoreWsPresent) {
|
||||
return new JettyCoreRequestUpgradeStrategy();
|
||||
}
|
||||
else if (undertowWsPresent) {
|
||||
return new UndertowRequestUpgradeStrategy();
|
||||
}
|
||||
else if (reactorNettyPresent) {
|
||||
return new ReactorNettyRequestUpgradeStrategy();
|
||||
}
|
||||
|
|
|
@ -1,117 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.reactive.socket.server.upgrade;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.undertow.server.HttpServerExchange;
|
||||
import io.undertow.websockets.WebSocketConnectionCallback;
|
||||
import io.undertow.websockets.WebSocketProtocolHandshakeHandler;
|
||||
import io.undertow.websockets.core.WebSocketChannel;
|
||||
import io.undertow.websockets.core.protocol.Handshake;
|
||||
import io.undertow.websockets.core.protocol.version13.Hybi13Handshake;
|
||||
import io.undertow.websockets.spi.WebSocketHttpExchange;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
|
||||
import org.springframework.web.reactive.socket.HandshakeInfo;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
|
||||
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketSession;
|
||||
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* A WebSocket {@code RequestUpgradeStrategy} for Undertow.
|
||||
*
|
||||
* @author Violeta Georgieva
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Brian Clozel
|
||||
* @since 5.0
|
||||
*/
|
||||
public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
|
||||
|
||||
@Override
|
||||
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
|
||||
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
|
||||
|
||||
HttpServerExchange httpExchange = ServerHttpRequestDecorator.getNativeRequest(exchange.getRequest());
|
||||
|
||||
Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
|
||||
Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
|
||||
List<Handshake> handshakes = Collections.singletonList(handshake);
|
||||
|
||||
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
|
||||
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
|
||||
|
||||
// Trigger WebFlux preCommit actions and upgrade
|
||||
return exchange.getResponse().setComplete()
|
||||
.then(Mono.deferContextual(contextView -> {
|
||||
DefaultCallback callback = new DefaultCallback(
|
||||
handshakeInfo,
|
||||
ContextWebSocketHandler.decorate(handler, contextView),
|
||||
bufferFactory);
|
||||
try {
|
||||
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
return Mono.empty();
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
private static class DefaultCallback implements WebSocketConnectionCallback {
|
||||
|
||||
private final HandshakeInfo handshakeInfo;
|
||||
|
||||
private final WebSocketHandler handler;
|
||||
|
||||
private final DataBufferFactory bufferFactory;
|
||||
|
||||
public DefaultCallback(HandshakeInfo handshakeInfo, WebSocketHandler handler, DataBufferFactory bufferFactory) {
|
||||
this.handshakeInfo = handshakeInfo;
|
||||
this.handler = handler;
|
||||
this.bufferFactory = bufferFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
|
||||
UndertowWebSocketSession session = createSession(channel);
|
||||
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
|
||||
|
||||
channel.getReceiveSetter().set(adapter);
|
||||
channel.resumeReceives();
|
||||
|
||||
this.handler.handle(session)
|
||||
.checkpoint(exchange.getRequestURI() + " [UndertowRequestUpgradeStrategy]")
|
||||
.subscribe(session);
|
||||
}
|
||||
|
||||
private UndertowWebSocketSession createSession(WebSocketChannel channel) {
|
||||
return new UndertowWebSocketSession(channel, this.handshakeInfo, this.bufferFactory);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -25,8 +25,6 @@ import java.time.Duration;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
@ -49,11 +47,9 @@ import org.springframework.web.reactive.function.server.RouterFunction;
|
|||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeFalse;
|
||||
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
|
||||
|
||||
/**
|
||||
|
@ -104,18 +100,9 @@ class MultipartRouterFunctionIntegrationTests extends AbstractRouterFunctionInte
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
void transferTo(HttpServer httpServer) throws Exception {
|
||||
// TODO Determine why Undertow fails: https://github.com/spring-projects/spring-framework/issues/25310
|
||||
assumeFalse(httpServer instanceof UndertowHttpServer, "Undertow currently fails with transferTo");
|
||||
verifyTransferTo(httpServer);
|
||||
}
|
||||
|
||||
@Disabled("Unstable on Undertow: https://github.com/spring-projects/spring-framework/issues/25310")
|
||||
// Using @RepeatedTest(100), this test fails approximately 10% - 20% of the time.
|
||||
@Test
|
||||
void transferToWithUndertow() throws Exception {
|
||||
verifyTransferTo(new UndertowHttpServer());
|
||||
}
|
||||
|
||||
private void verifyTransferTo(HttpServer httpServer) throws Exception {
|
||||
startServer(httpServer);
|
||||
|
||||
|
@ -162,7 +149,6 @@ class MultipartRouterFunctionIntegrationTests extends AbstractRouterFunctionInte
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
void proxy(HttpServer httpServer) throws Exception {
|
||||
assumeFalse(httpServer instanceof UndertowHttpServer, "Undertow currently fails proxying requests");
|
||||
startServer(httpServer);
|
||||
|
||||
Mono<ResponseEntity<Void>> result = webClient
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyC
|
|||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.params.provider.Arguments.argumentSet;
|
||||
|
@ -55,8 +54,7 @@ class ContextPathIntegrationTests {
|
|||
argumentSet("Jetty", new JettyHttpServer()),
|
||||
argumentSet("Jetty Core", new JettyCoreHttpServer()),
|
||||
argumentSet("Reactor Netty", new ReactorHttpServer()),
|
||||
argumentSet("Tomcat", new TomcatHttpServer()),
|
||||
argumentSet("Undertow", new UndertowHttpServer())
|
||||
argumentSet("Tomcat", new TomcatHttpServer())
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -60,10 +60,8 @@ import org.springframework.web.reactive.function.client.WebClient;
|
|||
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeFalse;
|
||||
|
||||
class MultipartWebClientIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
||||
|
||||
|
@ -168,8 +166,6 @@ class MultipartWebClientIntegrationTests extends AbstractHttpHandlerIntegrationT
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
void transferTo(HttpServer httpServer) throws Exception {
|
||||
// TODO Determine why Undertow fails: https://github.com/spring-projects/spring-framework/issues/25310
|
||||
assumeFalse(httpServer instanceof UndertowHttpServer, "Undertow currently fails with transferTo");
|
||||
startServer(httpServer);
|
||||
|
||||
Flux<String> result = webClient
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyC
|
|||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
@ -314,10 +313,7 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()),
|
||||
args(new TomcatHttpServer(), new ReactorClientHttpConnector()),
|
||||
args(new TomcatHttpServer(), new JettyClientHttpConnector()),
|
||||
args(new TomcatHttpServer(), new HttpComponentsClientHttpConnector()),
|
||||
args(new UndertowHttpServer(), new ReactorClientHttpConnector()),
|
||||
args(new UndertowHttpServer(), new JettyClientHttpConnector()),
|
||||
args(new UndertowHttpServer(), new HttpComponentsClientHttpConnector())
|
||||
args(new TomcatHttpServer(), new HttpComponentsClientHttpConnector())
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -33,8 +33,6 @@ import org.junit.jupiter.api.AfterEach;
|
|||
import org.junit.jupiter.api.Named;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.xnio.OptionMap;
|
||||
import org.xnio.Xnio;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple3;
|
||||
|
@ -50,7 +48,6 @@ import org.springframework.web.reactive.DispatcherHandler;
|
|||
import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
|
||||
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
|
||||
import org.springframework.web.reactive.socket.client.TomcatWebSocketClient;
|
||||
import org.springframework.web.reactive.socket.client.UndertowWebSocketClient;
|
||||
import org.springframework.web.reactive.socket.client.WebSocketClient;
|
||||
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.WebSocketService;
|
||||
|
@ -60,7 +57,6 @@ import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUp
|
|||
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.upgrade.StandardWebSocketUpgradeStrategy;
|
||||
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
|
||||
import org.springframework.web.server.WebFilter;
|
||||
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
|
||||
|
@ -68,7 +64,6 @@ import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyC
|
|||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
|
||||
|
||||
import static org.junit.jupiter.api.Named.named;
|
||||
|
||||
|
@ -97,8 +92,7 @@ abstract class AbstractReactiveWebSocketIntegrationTests {
|
|||
List<Named<WebSocketClient>> clients = List.of(
|
||||
named(TomcatWebSocketClient.class.getSimpleName(), new TomcatWebSocketClient()),
|
||||
named(JettyWebSocketClient.class.getSimpleName(), new JettyWebSocketClient()),
|
||||
named(ReactorNettyWebSocketClient.class.getSimpleName(), new ReactorNettyWebSocketClient()),
|
||||
named(UndertowWebSocketClient.class.getSimpleName(), new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY)))
|
||||
named(ReactorNettyWebSocketClient.class.getSimpleName(), new ReactorNettyWebSocketClient())
|
||||
);
|
||||
|
||||
Map<Named<HttpServer>, Class<?>> servers = new LinkedHashMap<>();
|
||||
|
@ -107,7 +101,6 @@ abstract class AbstractReactiveWebSocketIntegrationTests {
|
|||
servers.put(named(JettyHttpServer.class.getSimpleName(), new JettyHttpServer()), JettyConfig.class);
|
||||
servers.put(named(JettyCoreHttpServer.class.getSimpleName(), new JettyCoreHttpServer()), JettyCoreConfig.class);
|
||||
servers.put(named(ReactorHttpServer.class.getSimpleName(), new ReactorHttpServer()), ReactorNettyConfig.class);
|
||||
servers.put(named(UndertowHttpServer.class.getSimpleName(), new UndertowHttpServer()), UndertowConfig.class);
|
||||
|
||||
// Try each client once against each server
|
||||
|
||||
|
@ -242,14 +235,4 @@ abstract class AbstractReactiveWebSocketIntegrationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
static class UndertowConfig extends AbstractHandlerAdapterConfig {
|
||||
|
||||
@Override
|
||||
protected RequestUpgradeStrategy getUpgradeStrategy() {
|
||||
return new UndertowRequestUpgradeStrategy();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RestController
|
|||
import org.springframework.web.client.HttpServerErrorException
|
||||
import org.springframework.web.reactive.config.EnableWebFlux
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer
|
||||
import reactor.core.publisher.Flux
|
||||
|
||||
class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
||||
|
@ -116,8 +115,6 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
|||
|
||||
@ParameterizedHttpServerTest
|
||||
fun `Suspending handler method returning ResponseEntity of Flux `(httpServer: HttpServer) {
|
||||
assumeFalse(httpServer is UndertowHttpServer, "Undertow currently fails")
|
||||
|
||||
startServer(httpServer)
|
||||
|
||||
val entity = performGet("/entity-flux", HttpHeaders.EMPTY, String::class.java)
|
||||
|
|
|
@ -7,8 +7,6 @@ dependencies {
|
|||
optional(project(":spring-messaging"))
|
||||
optional(project(":spring-webmvc"))
|
||||
optional("com.fasterxml.jackson.core:jackson-databind")
|
||||
optional("io.undertow:undertow-servlet")
|
||||
optional("io.undertow:undertow-websockets-jsr")
|
||||
optional("jakarta.servlet:jakarta.servlet-api")
|
||||
optional("jakarta.websocket:jakarta.websocket-api")
|
||||
optional("jakarta.websocket:jakarta.websocket-client-api")
|
||||
|
|
|
@ -1,479 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.socket.sockjs.client;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import io.undertow.client.ClientCallback;
|
||||
import io.undertow.client.ClientConnection;
|
||||
import io.undertow.client.ClientExchange;
|
||||
import io.undertow.client.ClientRequest;
|
||||
import io.undertow.client.ClientResponse;
|
||||
import io.undertow.client.UndertowClient;
|
||||
import io.undertow.connector.ByteBufferPool;
|
||||
import io.undertow.connector.PooledByteBuffer;
|
||||
import io.undertow.server.DefaultByteBufferPool;
|
||||
import io.undertow.util.AttachmentKey;
|
||||
import io.undertow.util.HeaderMap;
|
||||
import io.undertow.util.HttpString;
|
||||
import io.undertow.util.Methods;
|
||||
import io.undertow.util.StringReadChannelListener;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.xnio.ChannelListener;
|
||||
import org.xnio.ChannelListeners;
|
||||
import org.xnio.IoUtils;
|
||||
import org.xnio.OptionMap;
|
||||
import org.xnio.Options;
|
||||
import org.xnio.Xnio;
|
||||
import org.xnio.XnioWorker;
|
||||
import org.xnio.channels.StreamSinkChannel;
|
||||
import org.xnio.channels.StreamSourceChannel;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatusCode;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StreamUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.client.HttpServerErrorException;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.sockjs.SockJsException;
|
||||
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
|
||||
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
|
||||
|
||||
/**
|
||||
* An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}.
|
||||
*
|
||||
* <p>Requires Undertow 1.3 or 1.4, including XNIO.
|
||||
*
|
||||
* <p>When used for testing purposes (for example, load testing) or for specific use cases
|
||||
* (like HTTPS configuration), a custom {@link OptionMap} should be provided:
|
||||
*
|
||||
* <pre class="code">
|
||||
* OptionMap optionMap = OptionMap.builder()
|
||||
* .set(Options.WORKER_IO_THREADS, 8)
|
||||
* .set(Options.TCP_NODELAY, true)
|
||||
* .set(Options.KEEP_ALIVE, true)
|
||||
* .set(Options.WORKER_NAME, "SockJSClient")
|
||||
* .getMap();
|
||||
*
|
||||
* UndertowXhrTransport transport = new UndertowXhrTransport(optionMap);
|
||||
* </pre>
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.1.2
|
||||
* @see org.xnio.Options
|
||||
*/
|
||||
public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||
|
||||
private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class);
|
||||
|
||||
|
||||
private final OptionMap optionMap;
|
||||
|
||||
private final UndertowClient httpClient;
|
||||
|
||||
private final XnioWorker worker;
|
||||
|
||||
private final ByteBufferPool bufferPool;
|
||||
|
||||
|
||||
public UndertowXhrTransport() throws IOException {
|
||||
this(OptionMap.builder().parse(Options.WORKER_NAME, "SockJSClient").getMap());
|
||||
}
|
||||
|
||||
public UndertowXhrTransport(OptionMap optionMap) throws IOException {
|
||||
Assert.notNull(optionMap, "OptionMap is required");
|
||||
this.optionMap = optionMap;
|
||||
this.httpClient = UndertowClient.getInstance();
|
||||
this.worker = Xnio.getInstance().createWorker(optionMap);
|
||||
this.bufferPool = new DefaultByteBufferPool(false, 1024, -1, 2);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return Undertow's native HTTP client.
|
||||
*/
|
||||
public UndertowClient getHttpClient() {
|
||||
return this.httpClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the {@link org.xnio.XnioWorker} backing the I/O operations
|
||||
* for Undertow's HTTP client.
|
||||
* @see org.xnio.Xnio
|
||||
*/
|
||||
public XnioWorker getWorker() {
|
||||
return this.worker;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
|
||||
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
|
||||
CompletableFuture<WebSocketSession> connectFuture) {
|
||||
|
||||
executeReceiveRequest(request, receiveUrl, handshakeHeaders, session, connectFuture);
|
||||
}
|
||||
|
||||
private void executeReceiveRequest(final TransportRequest transportRequest,
|
||||
final URI url, final HttpHeaders headers, final XhrClientSockJsSession session,
|
||||
final CompletableFuture<WebSocketSession> connectFuture) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Starting XHR receive request for " + url);
|
||||
}
|
||||
|
||||
ClientCallback<ClientConnection> clientCallback = new ClientCallback<>() {
|
||||
@Override
|
||||
public void completed(ClientConnection connection) {
|
||||
ClientRequest request = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath());
|
||||
HttpString headerName = HttpString.tryFromString(HttpHeaders.HOST);
|
||||
request.getRequestHeaders().add(headerName, url.getHost());
|
||||
addHttpHeaders(request, headers);
|
||||
HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
|
||||
connection.sendRequest(request, createReceiveCallback(transportRequest,
|
||||
url, httpHeaders, session, connectFuture));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(IOException ex) {
|
||||
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
|
||||
}
|
||||
};
|
||||
|
||||
this.httpClient.connect(clientCallback, url, this.worker, this.bufferPool, this.optionMap);
|
||||
}
|
||||
|
||||
private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) {
|
||||
HeaderMap headerMap = request.getRequestHeaders();
|
||||
headers.forEach((key, values) -> {
|
||||
for (String value : values) {
|
||||
headerMap.add(HttpString.tryFromString(key), value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private ClientCallback<ClientExchange> createReceiveCallback(final TransportRequest transportRequest,
|
||||
final URI url, final HttpHeaders headers, final XhrClientSockJsSession sockJsSession,
|
||||
final CompletableFuture<WebSocketSession> connectFuture) {
|
||||
|
||||
return new ClientCallback<>() {
|
||||
@Override
|
||||
public void completed(final ClientExchange exchange) {
|
||||
exchange.setResponseListener(new ClientCallback<>() {
|
||||
@Override
|
||||
public void completed(ClientExchange result) {
|
||||
ClientResponse response = result.getResponse();
|
||||
if (response.getResponseCode() != 200) {
|
||||
HttpStatusCode status = HttpStatusCode.valueOf(response.getResponseCode());
|
||||
IoUtils.safeClose(result.getConnection());
|
||||
onFailure(new HttpServerErrorException(status, "Unexpected XHR receive status"));
|
||||
}
|
||||
else {
|
||||
SockJsResponseListener listener = new SockJsResponseListener(
|
||||
transportRequest, result.getConnection(), url, headers,
|
||||
sockJsSession, connectFuture);
|
||||
listener.setup(result.getResponseChannel());
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("XHR receive headers: " + toHttpHeaders(response.getResponseHeaders()));
|
||||
}
|
||||
try {
|
||||
StreamSinkChannel channel = result.getRequestChannel();
|
||||
channel.shutdownWrites();
|
||||
if (!channel.flush()) {
|
||||
channel.getWriteSetter().set(ChannelListeners.<StreamSinkChannel>flushingChannelListener(null, null));
|
||||
channel.resumeWrites();
|
||||
}
|
||||
}
|
||||
catch (IOException exc) {
|
||||
IoUtils.safeClose(result.getConnection());
|
||||
onFailure(exc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(IOException exc) {
|
||||
IoUtils.safeClose(exchange.getConnection());
|
||||
onFailure(exc);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(IOException exc) {
|
||||
onFailure(exc);
|
||||
}
|
||||
|
||||
private void onFailure(Throwable failure) {
|
||||
if (connectFuture.completeExceptionally(failure)) {
|
||||
return;
|
||||
}
|
||||
if (sockJsSession.isDisconnected()) {
|
||||
sockJsSession.afterTransportClosed(null);
|
||||
}
|
||||
else {
|
||||
sockJsSession.handleTransportError(failure);
|
||||
sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static HttpHeaders toHttpHeaders(HeaderMap headerMap) {
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
for (HttpString name : headerMap.getHeaderNames()) {
|
||||
for (String value : headerMap.get(name)) {
|
||||
httpHeaders.add(name.toString(), value);
|
||||
}
|
||||
}
|
||||
return httpHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl, HttpHeaders headers) {
|
||||
return executeRequest(infoUrl, Methods.GET, headers, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
|
||||
return executeRequest(url, Methods.POST, headers, message.getPayload());
|
||||
}
|
||||
|
||||
protected ResponseEntity<String> executeRequest(
|
||||
URI url, HttpString method, HttpHeaders headers, @Nullable String body) {
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
List<ClientResponse> responses = new CopyOnWriteArrayList<>();
|
||||
|
||||
try {
|
||||
ClientConnection connection =
|
||||
this.httpClient.connect(url, this.worker, this.bufferPool, this.optionMap).get();
|
||||
try {
|
||||
ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
|
||||
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
|
||||
if (StringUtils.hasLength(body)) {
|
||||
HttpString headerName = HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH);
|
||||
request.getRequestHeaders().add(headerName, body.length());
|
||||
}
|
||||
addHttpHeaders(request, headers);
|
||||
connection.sendRequest(request, createRequestCallback(body, responses, latch));
|
||||
|
||||
latch.await();
|
||||
ClientResponse response = responses.iterator().next();
|
||||
HttpStatusCode status = HttpStatusCode.valueOf(response.getResponseCode());
|
||||
HttpHeaders responseHeaders = toHttpHeaders(response.getResponseHeaders());
|
||||
String responseBody = response.getAttachment(RESPONSE_BODY);
|
||||
return (responseBody != null ?
|
||||
new ResponseEntity<>(responseBody, responseHeaders, status) :
|
||||
new ResponseEntity<>(responseHeaders, status));
|
||||
}
|
||||
finally {
|
||||
IoUtils.safeClose(connection);
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SockJsTransportFailureException("Interrupted while processing request to " + url, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private ClientCallback<ClientExchange> createRequestCallback(final @Nullable String body,
|
||||
final List<ClientResponse> responses, final CountDownLatch latch) {
|
||||
|
||||
return new ClientCallback<>() {
|
||||
@Override
|
||||
public void completed(ClientExchange result) {
|
||||
result.setResponseListener(new ClientCallback<>() {
|
||||
@Override
|
||||
public void completed(final ClientExchange result) {
|
||||
responses.add(result.getResponse());
|
||||
new StringReadChannelListener(result.getConnection().getBufferPool()) {
|
||||
@Override
|
||||
protected void stringDone(String string) {
|
||||
result.getResponse().putAttachment(RESPONSE_BODY, string);
|
||||
latch.countDown();
|
||||
}
|
||||
@Override
|
||||
protected void error(IOException ex) {
|
||||
onFailure(latch, ex);
|
||||
}
|
||||
}.setup(result.getResponseChannel());
|
||||
}
|
||||
@Override
|
||||
public void failed(IOException ex) {
|
||||
onFailure(latch, ex);
|
||||
}
|
||||
});
|
||||
try {
|
||||
if (body != null) {
|
||||
result.getRequestChannel().write(ByteBuffer.wrap(body.getBytes()));
|
||||
}
|
||||
result.getRequestChannel().shutdownWrites();
|
||||
if (!result.getRequestChannel().flush()) {
|
||||
result.getRequestChannel().getWriteSetter()
|
||||
.set(ChannelListeners.<StreamSinkChannel>flushingChannelListener(null, null));
|
||||
result.getRequestChannel().resumeWrites();
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
onFailure(latch, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(IOException ex) {
|
||||
onFailure(latch, ex);
|
||||
}
|
||||
|
||||
private void onFailure(CountDownLatch latch, IOException ex) {
|
||||
latch.countDown();
|
||||
throw new SockJsTransportFailureException("Failed to execute request", ex);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
private class SockJsResponseListener implements ChannelListener<StreamSourceChannel> {
|
||||
|
||||
private final TransportRequest request;
|
||||
|
||||
private final ClientConnection connection;
|
||||
|
||||
private final URI url;
|
||||
|
||||
private final HttpHeaders headers;
|
||||
|
||||
private final XhrClientSockJsSession session;
|
||||
|
||||
private final CompletableFuture<WebSocketSession> connectFuture;
|
||||
|
||||
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
|
||||
public SockJsResponseListener(TransportRequest request, ClientConnection connection, URI url,
|
||||
HttpHeaders headers, XhrClientSockJsSession sockJsSession,
|
||||
CompletableFuture<WebSocketSession> connectFuture) {
|
||||
|
||||
this.request = request;
|
||||
this.connection = connection;
|
||||
this.url = url;
|
||||
this.headers = headers;
|
||||
this.session = sockJsSession;
|
||||
this.connectFuture = connectFuture;
|
||||
}
|
||||
|
||||
public void setup(StreamSourceChannel channel) {
|
||||
channel.suspendReads();
|
||||
channel.getReadSetter().set(this);
|
||||
channel.resumeReads();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleEvent(StreamSourceChannel channel) {
|
||||
if (this.session.isDisconnected()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("SockJS sockJsSession closed, closing response.");
|
||||
}
|
||||
IoUtils.safeClose(this.connection);
|
||||
throw new SockJsException("Session closed.", this.session.getId(), null);
|
||||
}
|
||||
|
||||
try (PooledByteBuffer pooled = bufferPool.allocate()) {
|
||||
int r;
|
||||
do {
|
||||
ByteBuffer buffer = pooled.getBuffer();
|
||||
buffer.clear();
|
||||
r = channel.read(buffer);
|
||||
buffer.flip();
|
||||
if (r == 0) {
|
||||
return;
|
||||
}
|
||||
else if (r == -1) {
|
||||
onSuccess();
|
||||
}
|
||||
else {
|
||||
while (buffer.hasRemaining()) {
|
||||
int b = buffer.get();
|
||||
if (b == '\n') {
|
||||
handleFrame();
|
||||
}
|
||||
else {
|
||||
this.outputStream.write(b);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
while (r > 0);
|
||||
}
|
||||
catch (IOException exc) {
|
||||
onFailure(exc);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFrame() {
|
||||
String content = StreamUtils.copyToString(this.outputStream, SockJsFrame.CHARSET);
|
||||
this.outputStream.reset();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("XHR content received: " + content);
|
||||
}
|
||||
if (!PRELUDE.equals(content)) {
|
||||
this.session.handleFrame(content);
|
||||
}
|
||||
}
|
||||
|
||||
public void onSuccess() {
|
||||
if (this.outputStream.size() > 0) {
|
||||
handleFrame();
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("XHR receive request completed.");
|
||||
}
|
||||
IoUtils.safeClose(this.connection);
|
||||
executeReceiveRequest(this.request, this.url, this.headers, this.session, this.connectFuture);
|
||||
}
|
||||
|
||||
public void onFailure(Throwable failure) {
|
||||
IoUtils.safeClose(this.connection);
|
||||
if (this.connectFuture.completeExceptionally(failure)) {
|
||||
return;
|
||||
}
|
||||
if (this.session.isDisconnected()) {
|
||||
this.session.afterTransportClosed(null);
|
||||
}
|
||||
else {
|
||||
this.session.handleTransportError(failure);
|
||||
this.session.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -55,8 +55,7 @@ public abstract class AbstractWebSocketIntegrationTests {
|
|||
static Stream<Arguments> argumentsFactory() {
|
||||
return Stream.of(
|
||||
arguments(named("Jetty", new JettyWebSocketTestServer()), named("Standard", new StandardWebSocketClient())),
|
||||
arguments(named("Tomcat", new TomcatWebSocketTestServer()), named("Standard", new StandardWebSocketClient())),
|
||||
arguments(named("Undertow", new UndertowTestServer()), named("Standard", new StandardWebSocketClient())));
|
||||
arguments(named("Tomcat", new TomcatWebSocketTestServer()), named("Standard", new StandardWebSocketClient())));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,181 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.socket;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import io.undertow.Undertow;
|
||||
import io.undertow.server.HttpHandler;
|
||||
import io.undertow.servlet.api.DeploymentInfo;
|
||||
import io.undertow.servlet.api.DeploymentManager;
|
||||
import io.undertow.servlet.api.FilterInfo;
|
||||
import io.undertow.servlet.api.InstanceFactory;
|
||||
import io.undertow.servlet.api.InstanceHandle;
|
||||
import io.undertow.servlet.api.ServletInfo;
|
||||
import io.undertow.websockets.jsr.WebSocketDeploymentInfo;
|
||||
import jakarta.servlet.DispatcherType;
|
||||
import jakarta.servlet.Filter;
|
||||
import jakarta.servlet.Servlet;
|
||||
import jakarta.servlet.ServletContext;
|
||||
import jakarta.servlet.ServletException;
|
||||
import org.xnio.OptionMap;
|
||||
import org.xnio.Xnio;
|
||||
|
||||
import org.springframework.web.context.WebApplicationContext;
|
||||
import org.springframework.web.servlet.DispatcherServlet;
|
||||
|
||||
import static io.undertow.servlet.Servlets.defaultContainer;
|
||||
import static io.undertow.servlet.Servlets.deployment;
|
||||
import static io.undertow.servlet.Servlets.servlet;
|
||||
|
||||
/**
|
||||
* Undertow-based {@link WebSocketTestServer}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Sam Brannen
|
||||
*/
|
||||
public class UndertowTestServer implements WebSocketTestServer {
|
||||
|
||||
private int port;
|
||||
|
||||
private Undertow server;
|
||||
|
||||
private DeploymentManager manager;
|
||||
|
||||
|
||||
@Override
|
||||
public void setup() {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public void deployConfig(WebApplicationContext wac, Filter... filters) {
|
||||
DispatcherServletInstanceFactory servletFactory = new DispatcherServletInstanceFactory(wac);
|
||||
// manually building WebSocketDeploymentInfo in order to avoid class cast exceptions
|
||||
// with tomcat's implementation when using undertow 1.1.0+
|
||||
WebSocketDeploymentInfo info = new WebSocketDeploymentInfo();
|
||||
try {
|
||||
info.setWorker(Xnio.getInstance().createWorker(OptionMap.EMPTY));
|
||||
info.setBuffers(new org.xnio.ByteBufferSlicePool(1024,1024));
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
|
||||
ServletInfo servletInfo = servlet("DispatcherServlet", DispatcherServlet.class, servletFactory)
|
||||
.addMapping("/").setAsyncSupported(true);
|
||||
DeploymentInfo servletBuilder = deployment()
|
||||
.setClassLoader(UndertowTestServer.class.getClassLoader())
|
||||
.setDeploymentName("undertow-websocket-test")
|
||||
.setContextPath("/")
|
||||
.addServlet(servletInfo)
|
||||
.addServletContextAttribute(WebSocketDeploymentInfo.ATTRIBUTE_NAME, info);
|
||||
for (final Filter filter : filters) {
|
||||
String filterName = filter.getClass().getName();
|
||||
FilterInstanceFactory filterFactory = new FilterInstanceFactory(filter);
|
||||
FilterInfo filterInfo = new FilterInfo(filterName, filter.getClass(), filterFactory);
|
||||
servletBuilder.addFilter(filterInfo.setAsyncSupported(true));
|
||||
for (DispatcherType type : DispatcherType.values()) {
|
||||
servletBuilder.addFilterUrlMapping(filterName, "/*", type);
|
||||
}
|
||||
}
|
||||
try {
|
||||
this.manager = defaultContainer().addDeployment(servletBuilder);
|
||||
this.manager.deploy();
|
||||
HttpHandler httpHandler = this.manager.start();
|
||||
this.server = Undertow.builder().addHttpListener(0, "localhost").setHandler(httpHandler).build();
|
||||
}
|
||||
catch (ServletException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void undeployConfig() {
|
||||
this.manager.undeploy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
this.server.start();
|
||||
Undertow.ListenerInfo info = this.server.getListenerInfo().get(0);
|
||||
this.port = ((InetSocketAddress) info.getAddress()).getPort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
this.server.stop();
|
||||
this.port = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return this.port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServletContext getServletContext() {
|
||||
return this.manager.getDeployment().getServletContext();
|
||||
}
|
||||
|
||||
|
||||
private static class DispatcherServletInstanceFactory implements InstanceFactory<Servlet> {
|
||||
|
||||
private final WebApplicationContext wac;
|
||||
|
||||
public DispatcherServletInstanceFactory(WebApplicationContext wac) {
|
||||
this.wac = wac;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InstanceHandle<Servlet> createInstance() {
|
||||
return new InstanceHandle<>() {
|
||||
@Override
|
||||
public Servlet getInstance() {
|
||||
return new DispatcherServlet(wac);
|
||||
}
|
||||
@Override
|
||||
public void release() {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class FilterInstanceFactory implements InstanceFactory<Filter> {
|
||||
|
||||
private final Filter filter;
|
||||
|
||||
private FilterInstanceFactory(Filter filter) {
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InstanceHandle<Filter> createInstance() {
|
||||
return new InstanceHandle<>() {
|
||||
@Override
|
||||
public Filter getInstance() {
|
||||
return filter;
|
||||
}
|
||||
@Override
|
||||
public void release() {}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.socket.sockjs.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.springframework.web.socket.UndertowTestServer;
|
||||
import org.springframework.web.socket.WebSocketTestServer;
|
||||
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
|
||||
|
||||
/**
|
||||
* @author Brian Clozel
|
||||
*/
|
||||
class UndertowSockJsIntegrationTests extends AbstractSockJsIntegrationTests {
|
||||
|
||||
@Override
|
||||
protected WebSocketTestServer createWebSocketTestServer() {
|
||||
return new UndertowTestServer();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Transport createWebSocketTransport() {
|
||||
return new WebSocketTransport(new StandardWebSocketClient());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractXhrTransport createXhrTransport() {
|
||||
try {
|
||||
return new UndertowXhrTransport();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IllegalStateException("Could not create UndertowXhrTransport");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue