Support Undertow 1.3.0 in UndertowXhrTransport
As of Undertow 1.3.0, several APIs have been changed: replacing Xnio's Pool/Pooled references to Undertow's new ByteBufferPool abstraction. This move has been made, as part of https://issues.jboss.org/browse/UNDERTOW-522, to prepare deprecations in the Xnio API. This commit adds a new strategy to deal with both 1.0-1.2 and 1.3 Undertow generations. Issue: SPR-13366
This commit is contained in:
parent
ebe128e940
commit
0510329b54
|
@ -72,7 +72,7 @@ configure(allprojects) { project ->
|
||||||
ext.tiles3Version = "3.0.5"
|
ext.tiles3Version = "3.0.5"
|
||||||
ext.tomcatVersion = "8.0.26"
|
ext.tomcatVersion = "8.0.26"
|
||||||
ext.tyrusVersion = "1.3.5" // constrained by WebLogic 12.1.3 support
|
ext.tyrusVersion = "1.3.5" // constrained by WebLogic 12.1.3 support
|
||||||
ext.undertowVersion = "1.2.12.Final"
|
ext.undertowVersion = "1.3.0.CR2"
|
||||||
ext.woodstoxVersion = "5.0.1"
|
ext.woodstoxVersion = "5.0.1"
|
||||||
ext.xmlunitVersion = "1.6"
|
ext.xmlunitVersion = "1.6"
|
||||||
ext.xstreamVersion = "1.4.8"
|
ext.xstreamVersion = "1.4.8"
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -30,6 +31,9 @@ import io.undertow.client.ClientExchange;
|
||||||
import io.undertow.client.ClientRequest;
|
import io.undertow.client.ClientRequest;
|
||||||
import io.undertow.client.ClientResponse;
|
import io.undertow.client.ClientResponse;
|
||||||
import io.undertow.client.UndertowClient;
|
import io.undertow.client.UndertowClient;
|
||||||
|
import io.undertow.connector.ByteBufferPool;
|
||||||
|
import io.undertow.connector.PooledByteBuffer;
|
||||||
|
import io.undertow.server.DefaultByteBufferPool;
|
||||||
import io.undertow.util.AttachmentKey;
|
import io.undertow.util.AttachmentKey;
|
||||||
import io.undertow.util.HeaderMap;
|
import io.undertow.util.HeaderMap;
|
||||||
import io.undertow.util.HttpString;
|
import io.undertow.util.HttpString;
|
||||||
|
@ -37,9 +41,11 @@ import io.undertow.util.Methods;
|
||||||
import io.undertow.util.StringReadChannelListener;
|
import io.undertow.util.StringReadChannelListener;
|
||||||
import org.xnio.ChannelListener;
|
import org.xnio.ChannelListener;
|
||||||
import org.xnio.ChannelListeners;
|
import org.xnio.ChannelListeners;
|
||||||
|
import org.xnio.IoFuture;
|
||||||
import org.xnio.IoUtils;
|
import org.xnio.IoUtils;
|
||||||
import org.xnio.OptionMap;
|
import org.xnio.OptionMap;
|
||||||
import org.xnio.Options;
|
import org.xnio.Options;
|
||||||
|
import org.xnio.Pool;
|
||||||
import org.xnio.Xnio;
|
import org.xnio.Xnio;
|
||||||
import org.xnio.XnioWorker;
|
import org.xnio.XnioWorker;
|
||||||
import org.xnio.channels.StreamSinkChannel;
|
import org.xnio.channels.StreamSinkChannel;
|
||||||
|
@ -49,6 +55,8 @@ import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
import org.springframework.util.ClassUtils;
|
||||||
|
import org.springframework.util.ReflectionUtils;
|
||||||
import org.springframework.util.concurrent.SettableListenableFuture;
|
import org.springframework.util.concurrent.SettableListenableFuture;
|
||||||
import org.springframework.web.client.HttpServerErrorException;
|
import org.springframework.web.client.HttpServerErrorException;
|
||||||
import org.springframework.web.socket.CloseStatus;
|
import org.springframework.web.socket.CloseStatus;
|
||||||
|
@ -61,7 +69,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}.
|
* An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}.
|
||||||
* Compatible with Undertow 1.0, 1.1, 1.2.
|
* Compatible with Undertow from version 1.0 to 1.3.
|
||||||
*
|
*
|
||||||
* <p>When used for testing purposes (e.g. load testing) or for specific use cases
|
* <p>When used for testing purposes (e.g. load testing) or for specific use cases
|
||||||
* (like HTTPS configuration), a custom OptionMap should be provided:
|
* (like HTTPS configuration), a custom OptionMap should be provided:
|
||||||
|
@ -84,6 +92,9 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
|
||||||
*/
|
*/
|
||||||
public class UndertowXhrTransport extends AbstractXhrTransport {
|
public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
|
|
||||||
|
private static final boolean undertow13Present = ClassUtils.isPresent("io.undertow.connector.ByteBufferPool",
|
||||||
|
UndertowXhrTransport.class.getClassLoader());
|
||||||
|
|
||||||
private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class);
|
private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class);
|
||||||
|
|
||||||
|
|
||||||
|
@ -93,21 +104,24 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
|
|
||||||
private final XnioWorker worker;
|
private final XnioWorker worker;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
private final UndertowBufferSupport undertowBufferSupport;
|
||||||
private final org.xnio.Pool<ByteBuffer> bufferPool;
|
|
||||||
|
|
||||||
|
|
||||||
public UndertowXhrTransport() throws IOException {
|
public UndertowXhrTransport() throws IOException {
|
||||||
this(OptionMap.builder().parse(Options.WORKER_NAME, "SockJSClient").getMap());
|
this(OptionMap.builder().parse(Options.WORKER_NAME, "SockJSClient").getMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public UndertowXhrTransport(OptionMap optionMap) throws IOException {
|
public UndertowXhrTransport(OptionMap optionMap) throws IOException {
|
||||||
Assert.notNull(optionMap, "OptionMap is required");
|
Assert.notNull(optionMap, "OptionMap is required");
|
||||||
this.optionMap = optionMap;
|
this.optionMap = optionMap;
|
||||||
this.httpClient = UndertowClient.getInstance();
|
this.httpClient = UndertowClient.getInstance();
|
||||||
this.worker = Xnio.getInstance().createWorker(optionMap);
|
this.worker = Xnio.getInstance().createWorker(optionMap);
|
||||||
this.bufferPool = new org.xnio.ByteBufferSlicePool(1048, 1048);
|
if (undertow13Present) {
|
||||||
|
this.undertowBufferSupport = new Undertow13BufferSupport();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.undertowBufferSupport = new Undertow10BufferSupport();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -144,8 +158,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
logger.trace("Starting XHR receive request for " + url);
|
logger.trace("Starting XHR receive request for " + url);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.httpClient.connect(
|
ClientCallback<ClientConnection> clientCallback = new ClientCallback<ClientConnection>() {
|
||||||
new ClientCallback<ClientConnection>() {
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(ClientConnection connection) {
|
public void completed(ClientConnection connection) {
|
||||||
ClientRequest request = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath());
|
ClientRequest request = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath());
|
||||||
|
@ -156,12 +169,14 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
connection.sendRequest(request, createReceiveCallback(transportRequest,
|
connection.sendRequest(request, createReceiveCallback(transportRequest,
|
||||||
url, httpHeaders, session, connectFuture));
|
url, httpHeaders, session, connectFuture));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(IOException ex) {
|
public void failed(IOException ex) {
|
||||||
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
|
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
|
||||||
}
|
}
|
||||||
},
|
};
|
||||||
url, this.worker, this.bufferPool, this.optionMap);
|
|
||||||
|
this.undertowBufferSupport.httpClientConnect(this.httpClient, clientCallback, url, worker, this.optionMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) {
|
private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) {
|
||||||
|
@ -211,6 +226,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
onFailure(exc);
|
onFailure(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(IOException exc) {
|
public void failed(IOException exc) {
|
||||||
IoUtils.safeClose(exchange.getConnection());
|
IoUtils.safeClose(exchange.getConnection());
|
||||||
|
@ -264,8 +280,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
|
List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ClientConnection connection = this.httpClient.connect(url, this.worker,
|
ClientConnection connection = this.undertowBufferSupport
|
||||||
this.bufferPool, this.optionMap).get();
|
.httpClientConnect(this.httpClient, url, this.worker, this.optionMap).get();
|
||||||
try {
|
try {
|
||||||
ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
|
ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
|
||||||
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
|
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
|
||||||
|
@ -314,12 +330,14 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
result.getResponse().putAttachment(RESPONSE_BODY, string);
|
result.getResponse().putAttachment(RESPONSE_BODY, string);
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void error(IOException ex) {
|
protected void error(IOException ex) {
|
||||||
onFailure(latch, ex);
|
onFailure(latch, ex);
|
||||||
}
|
}
|
||||||
}.setup(result.getResponseChannel());
|
}.setup(result.getResponseChannel());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(IOException ex) {
|
public void failed(IOException ex) {
|
||||||
onFailure(latch, ex);
|
onFailure(latch, ex);
|
||||||
|
@ -389,7 +407,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void handleEvent(StreamSourceChannel channel) {
|
public void handleEvent(StreamSourceChannel channel) {
|
||||||
if (this.session.isDisconnected()) {
|
if (this.session.isDisconnected()) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
|
@ -399,11 +416,11 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
throw new SockJsException("Session closed.", this.session.getId(), null);
|
throw new SockJsException("Session closed.", this.session.getId(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
org.xnio.Pooled<ByteBuffer> pooled = this.connection.getBufferPool().allocate();
|
Object pooled = undertowBufferSupport.allocatePooledResource();
|
||||||
try {
|
try {
|
||||||
int r;
|
int r;
|
||||||
do {
|
do {
|
||||||
ByteBuffer buffer = pooled.getResource();
|
ByteBuffer buffer = undertowBufferSupport.getByteBuffer(pooled);
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
r = channel.read(buffer);
|
r = channel.read(buffer);
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
|
@ -431,7 +448,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
onFailure(exc);
|
onFailure(exc);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
pooled.free();
|
undertowBufferSupport.closePooledResource(pooled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,4 +490,116 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private interface UndertowBufferSupport {
|
||||||
|
|
||||||
|
Object allocatePooledResource();
|
||||||
|
|
||||||
|
ByteBuffer getByteBuffer(Object pooled);
|
||||||
|
|
||||||
|
void closePooledResource(Object pooled);
|
||||||
|
|
||||||
|
void httpClientConnect(UndertowClient httpClient, final ClientCallback<ClientConnection> listener,
|
||||||
|
final URI uri, final XnioWorker worker, OptionMap options);
|
||||||
|
|
||||||
|
IoFuture<ClientConnection> httpClientConnect(UndertowClient httpClient, final URI uri,
|
||||||
|
final XnioWorker worker, OptionMap options);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Undertow10BufferSupport implements UndertowBufferSupport {
|
||||||
|
|
||||||
|
private final org.xnio.Pool<ByteBuffer> xnioBufferPool;
|
||||||
|
|
||||||
|
private final Method httpClientConnectCallbackMethod;
|
||||||
|
|
||||||
|
private final Method httpClientConnectMethod;
|
||||||
|
|
||||||
|
public Undertow10BufferSupport() {
|
||||||
|
this.xnioBufferPool = new org.xnio.ByteBufferSlicePool(1048, 1048);
|
||||||
|
this.httpClientConnectCallbackMethod = ReflectionUtils.findMethod(UndertowClient.class, "connect",
|
||||||
|
ClientCallback.class, URI.class, XnioWorker.class, Pool.class, OptionMap.class);
|
||||||
|
this.httpClientConnectMethod = ReflectionUtils.findMethod(UndertowClient.class, "connect",
|
||||||
|
URI.class, XnioWorker.class, Pool.class, OptionMap.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object allocatePooledResource() {
|
||||||
|
return this.xnioBufferPool.allocate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public ByteBuffer getByteBuffer(Object pooled) {
|
||||||
|
return ((org.xnio.Pooled<ByteBuffer>) pooled).getResource();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void closePooledResource(Object pooled) {
|
||||||
|
((org.xnio.Pooled<ByteBuffer>) pooled).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void httpClientConnect(UndertowClient httpClient, ClientCallback<ClientConnection> listener, URI uri,
|
||||||
|
XnioWorker worker, OptionMap options) {
|
||||||
|
ReflectionUtils.invokeMethod(httpClientConnectCallbackMethod, httpClient, listener, uri, worker,
|
||||||
|
this.xnioBufferPool, options);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public IoFuture<ClientConnection> httpClientConnect(UndertowClient httpClient, URI uri,
|
||||||
|
XnioWorker worker, OptionMap options) {
|
||||||
|
return (IoFuture<ClientConnection>) ReflectionUtils.invokeMethod(httpClientConnectMethod, httpClient, uri,
|
||||||
|
worker, this.xnioBufferPool, options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Undertow13BufferSupport implements UndertowBufferSupport {
|
||||||
|
|
||||||
|
private final ByteBufferPool undertowBufferPool;
|
||||||
|
|
||||||
|
private final Method httpClientConnectCallbackMethod;
|
||||||
|
|
||||||
|
private final Method httpClientConnectMethod;
|
||||||
|
|
||||||
|
public Undertow13BufferSupport() {
|
||||||
|
this.undertowBufferPool = new DefaultByteBufferPool(false, 1024, -1, 2);
|
||||||
|
this.httpClientConnectCallbackMethod = ReflectionUtils.findMethod(UndertowClient.class, "connect",
|
||||||
|
ClientCallback.class, URI.class, XnioWorker.class, ByteBufferPool.class, OptionMap.class);
|
||||||
|
this.httpClientConnectMethod = ReflectionUtils.findMethod(UndertowClient.class, "connect",
|
||||||
|
URI.class, XnioWorker.class, ByteBufferPool.class, OptionMap.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object allocatePooledResource() {
|
||||||
|
return this.undertowBufferPool.allocate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer getByteBuffer(Object pooled) {
|
||||||
|
return ((PooledByteBuffer) pooled).getBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closePooledResource(Object pooled) {
|
||||||
|
((PooledByteBuffer) pooled).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void httpClientConnect(UndertowClient httpClient, ClientCallback<ClientConnection> listener, URI uri,
|
||||||
|
XnioWorker worker, OptionMap options) {
|
||||||
|
ReflectionUtils.invokeMethod(httpClientConnectCallbackMethod, httpClient, listener, uri,
|
||||||
|
this.undertowBufferPool, worker, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public IoFuture<ClientConnection> httpClientConnect(UndertowClient httpClient, URI uri,
|
||||||
|
XnioWorker worker, OptionMap options) {
|
||||||
|
return (IoFuture<ClientConnection>) ReflectionUtils.invokeMethod(httpClientConnectMethod, httpClient, uri,
|
||||||
|
worker, this.undertowBufferPool, options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue