Merge pull request #1268 from violetagg/undertow-byte-buffer-pool
This commit is contained in:
commit
c3621bf153
|
@ -78,7 +78,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
* @see ReadListener#onAllDataRead()
|
* @see ReadListener#onAllDataRead()
|
||||||
* @see org.xnio.ChannelListener#handleEvent(Channel)
|
* @see org.xnio.ChannelListener#handleEvent(Channel)
|
||||||
*/
|
*/
|
||||||
public final void onAllDataRead() {
|
public void onAllDataRead() {
|
||||||
if (this.logger.isTraceEnabled()) {
|
if (this.logger.isTraceEnabled()) {
|
||||||
this.logger.trace(this.state + " onAllDataRead");
|
this.logger.trace(this.state + " onAllDataRead");
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport
|
||||||
@Override
|
@Override
|
||||||
public void handleRequest(HttpServerExchange exchange) throws Exception {
|
public void handleRequest(HttpServerExchange exchange) throws Exception {
|
||||||
|
|
||||||
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory);
|
UndertowServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory);
|
||||||
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory);
|
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory);
|
||||||
|
|
||||||
getHttpHandler().handle(request, response).subscribe(new Subscriber<Void>() {
|
getHttpHandler().handle(request, response).subscribe(new Subscriber<Void>() {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import io.undertow.connector.ByteBufferPool;
|
||||||
import io.undertow.connector.PooledByteBuffer;
|
import io.undertow.connector.PooledByteBuffer;
|
||||||
import io.undertow.server.HttpServerExchange;
|
import io.undertow.server.HttpServerExchange;
|
||||||
import io.undertow.server.handlers.Cookie;
|
import io.undertow.server.handlers.Cookie;
|
||||||
|
@ -57,7 +58,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
super(initUri(exchange), initHeaders(exchange));
|
super(initUri(exchange), initHeaders(exchange));
|
||||||
this.exchange = exchange;
|
this.exchange = exchange;
|
||||||
this.body = new RequestBodyPublisher(exchange, dataBufferFactory);
|
this.body = new RequestBodyPublisher(exchange, dataBufferFactory);
|
||||||
this.body.registerListener();
|
this.body.registerListener(exchange);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static URI initUri(HttpServerExchange exchange) {
|
private static URI initUri(HttpServerExchange exchange) {
|
||||||
|
@ -106,6 +107,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
return Flux.from(this.body);
|
return Flux.from(this.body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
|
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
|
||||||
|
|
||||||
private final ChannelListener<StreamSourceChannel> readListener =
|
private final ChannelListener<StreamSourceChannel> readListener =
|
||||||
|
@ -118,17 +120,22 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
|
|
||||||
private final DataBufferFactory dataBufferFactory;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final PooledByteBuffer pooledByteBuffer;
|
private final ByteBufferPool byteBufferPool;
|
||||||
|
|
||||||
|
private PooledByteBuffer pooledByteBuffer;
|
||||||
|
|
||||||
public RequestBodyPublisher(HttpServerExchange exchange,
|
public RequestBodyPublisher(HttpServerExchange exchange,
|
||||||
DataBufferFactory dataBufferFactory) {
|
DataBufferFactory dataBufferFactory) {
|
||||||
this.requestChannel = exchange.getRequestChannel();
|
this.requestChannel = exchange.getRequestChannel();
|
||||||
this.pooledByteBuffer =
|
this.byteBufferPool = exchange.getConnection().getByteBufferPool();
|
||||||
exchange.getConnection().getByteBufferPool().allocate();
|
|
||||||
this.dataBufferFactory = dataBufferFactory;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerListener() {
|
private void registerListener(HttpServerExchange exchange) {
|
||||||
|
exchange.addExchangeCompleteListener((ex, next) -> {
|
||||||
|
onAllDataRead();
|
||||||
|
next.proceed();
|
||||||
|
});
|
||||||
this.requestChannel.getReadSetter().set(this.readListener);
|
this.requestChannel.getReadSetter().set(this.readListener);
|
||||||
this.requestChannel.getCloseSetter().set(this.closeListener);
|
this.requestChannel.getCloseSetter().set(this.closeListener);
|
||||||
this.requestChannel.resumeReads();
|
this.requestChannel.resumeReads();
|
||||||
|
@ -141,6 +148,9 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected DataBuffer read() throws IOException {
|
protected DataBuffer read() throws IOException {
|
||||||
|
if (this.pooledByteBuffer == null) {
|
||||||
|
this.pooledByteBuffer = this.byteBufferPool.allocate();
|
||||||
|
}
|
||||||
ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer();
|
ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer();
|
||||||
int read = this.requestChannel.read(byteBuffer);
|
int read = this.requestChannel.read(byteBuffer);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -157,6 +167,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAllDataRead() {
|
||||||
|
if (this.pooledByteBuffer != null && this.pooledByteBuffer.isOpen()) {
|
||||||
|
this.pooledByteBuffer.close();
|
||||||
|
}
|
||||||
|
super.onAllDataRead();
|
||||||
|
}
|
||||||
|
|
||||||
private class ReadListener implements ChannelListener<StreamSourceChannel> {
|
private class ReadListener implements ChannelListener<StreamSourceChannel> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue