diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index d557f286c8c..2d09bd86164 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -27,9 +27,9 @@ import io.undertow.server.handlers.Cookie; import io.undertow.server.handlers.CookieImpl; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; -import org.xnio.channels.Channels; import org.xnio.channels.StreamSinkChannel; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -124,19 +124,25 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @Override public Mono writeWith(Path file, long position, long count) { return doCommit(() -> - Mono.defer(() -> { - try (FileChannel source = FileChannel.open(file, StandardOpenOption.READ)) { + Mono.create(sink -> { + try { + FileChannel source = FileChannel.open(file, StandardOpenOption.READ); + + TransferBodyListener listener = new TransferBodyListener(source, position, + count, sink); + sink.onDispose(listener::closeSource); + StreamSinkChannel destination = this.exchange.getResponseChannel(); - Channels.transferBlocking(destination, source, position, count); - return Mono.empty(); + destination.getWriteSetter().set(listener::transfer); + + listener.transfer(destination); } catch (IOException ex) { - return Mono.error(ex); + sink.error(ex); } })); } - @Override protected Processor, Void> createBodyFlushProcessor() { return new ResponseBodyFlushProcessor(); @@ -296,4 +302,55 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl } } + + private static class TransferBodyListener { + + private final FileChannel source; + + private final MonoSink sink; + + private long position; + + private long count; + + + public TransferBodyListener(FileChannel source, long position, long count, MonoSink sink) { + this.source = source; + this.sink = sink; + this.position = position; + this.count = count; + } + + public void transfer(StreamSinkChannel destination) { + try { + while (this.count > 0) { + long len = destination.transferFrom(this.source, this.position, this.count); + if (len != 0) { + this.position += len; + this.count -= len; + } + else { + destination.resumeWrites(); + return; + } + } + this.sink.success(); + } + catch (IOException ex) { + this.sink.error(ex); + } + + } + + public void closeSource() { + try { + this.source.close(); + } + catch (IOException ignore) { + } + } + + + } + }