Use ChannelListener for Undertow zero-copy file transfers

This commit changes the UndertowServerHttpResponse to use a
listener-based approach instead of a blocking approach.

Closes gh-22413
This commit is contained in:
Arjen Poutsma 2019-03-19 11:16:42 +01:00
parent 0552468998
commit 4651039b1f
1 changed files with 64 additions and 7 deletions

View File

@ -27,9 +27,9 @@ import io.undertow.server.handlers.Cookie;
import io.undertow.server.handlers.CookieImpl;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.xnio.channels.Channels;
import org.xnio.channels.StreamSinkChannel;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -124,19 +124,25 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() ->
Mono.defer(() -> {
try (FileChannel source = FileChannel.open(file, StandardOpenOption.READ)) {
Mono.create(sink -> {
try {
FileChannel source = FileChannel.open(file, StandardOpenOption.READ);
TransferBodyListener listener = new TransferBodyListener(source, position,
count, sink);
sink.onDispose(listener::closeSource);
StreamSinkChannel destination = this.exchange.getResponseChannel();
Channels.transferBlocking(destination, source, position, count);
return Mono.empty();
destination.getWriteSetter().set(listener::transfer);
listener.transfer(destination);
}
catch (IOException ex) {
return Mono.error(ex);
sink.error(ex);
}
}));
}
@Override
protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() {
return new ResponseBodyFlushProcessor();
@ -296,4 +302,55 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
}
}
private static class TransferBodyListener {
private final FileChannel source;
private final MonoSink<Void> sink;
private long position;
private long count;
public TransferBodyListener(FileChannel source, long position, long count, MonoSink<Void> sink) {
this.source = source;
this.sink = sink;
this.position = position;
this.count = count;
}
public void transfer(StreamSinkChannel destination) {
try {
while (this.count > 0) {
long len = destination.transferFrom(this.source, this.position, this.count);
if (len != 0) {
this.position += len;
this.count -= len;
}
else {
destination.resumeWrites();
return;
}
}
this.sink.success();
}
catch (IOException ex) {
this.sink.error(ex);
}
}
public void closeSource() {
try {
this.source.close();
}
catch (IOException ignore) {
}
}
}
}