Avoid inifinite recursion in UndertowServerHttpResponse

Undertow does not provide a way to check if we can write so with the
current implementation of isWritePossible, deep recursion can occur
when writing slows down. We now use a flag to keep track of write
ChannelListener callbacks.

This commit also addresses a related issue in
AbstractListenerWriteProcessor that went undected since #3c2d186
where after a large (single) buffer that is not written fully, the
completion signal is processed before the all data is written.

Issue: SPR-16702
This commit is contained in:
Rossen Stoyanchev 2018-04-11 14:10:38 -04:00
parent e88ca0d633
commit 3549745e37
3 changed files with 41 additions and 25 deletions

View File

@ -132,11 +132,15 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
/**
* Flush the output if ready, or otherwise {@link #isFlushPending()} should
* return true after.
* <p>This is primarily for the Servlet non-blocking I/O API where flush
* cannot be called without a readyToWrite check.
*/
protected abstract void flush() throws IOException;
/**
* Whether flushing is pending.
* <p>This is primarily for the Servlet non-blocking I/O API where flush
* cannot be called without a readyToWrite check.
*/
protected abstract boolean isFlushPending();

View File

@ -193,6 +193,12 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
return result;
}
private void changeStateToReceived(State oldState) {
if (changeState(oldState, State.RECEIVED)) {
writeIfPossible();
}
}
private void changeStateToComplete(State oldState) {
if (changeState(oldState, State.COMPLETED)) {
writingComplete();
@ -255,9 +261,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
else {
processor.dataReceived(data);
if (processor.changeState(this, RECEIVED)) {
processor.writeIfPossible();
}
processor.changeStateToReceived(this);
}
}
@Override
@ -286,13 +290,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
}
}
else if (processor.changeState(WRITING, RECEIVED)) {
if (processor.subscriberCompleted) {
processor.changeStateToComplete(RECEIVED);
}
else {
processor.writeIfPossible();
}
else {
processor.changeStateToReceived(WRITING);
}
}
catch (IOException ex) {

View File

@ -133,16 +133,6 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
return new ResponseBodyProcessor(this.responseChannel);
}
private boolean isWritePossible() {
if (this.responseChannel == null) {
this.responseChannel = this.exchange.getResponseChannel();
}
if (!this.responseChannel.isWriteResumed()) {
this.responseChannel.resumeWrites();
}
return this.responseChannel.isWriteResumed();
}
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
@ -151,16 +141,24 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Nullable
private volatile ByteBuffer byteBuffer;
/** Keep track of write listener calls, for {@link #writePossible}. */
private volatile boolean writePossible;
public ResponseBodyProcessor(StreamSinkChannel channel) {
Assert.notNull(channel, "StreamSinkChannel must not be null");
this.channel = channel;
this.channel.getWriteSetter().set(c -> onWritePossible());
this.channel.getWriteSetter().set(c -> {
this.writePossible = true;
onWritePossible();
});
this.channel.suspendWrites();
}
@Override
protected boolean isWritePossible() {
return UndertowServerHttpResponse.this.isWritePossible();
this.channel.resumeWrites();
return this.writePossible;
}
@Override
@ -172,6 +170,10 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
if (logger.isTraceEnabled()) {
logger.trace("write: " + dataBuffer);
}
// Track write listener calls from here on..
this.writePossible = false;
int total = buffer.remaining();
int written = writeByteBuffer(buffer);
@ -181,6 +183,10 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
if (written != total) {
return false;
}
// We wrote all, so can still write more..
this.writePossible = true;
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + dataBuffer);
}
@ -239,11 +245,12 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
protected void flush() throws IOException {
if (UndertowServerHttpResponse.this.responseChannel != null) {
StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel;
if (channel != null) {
if (logger.isTraceEnabled()) {
logger.trace("flush");
}
UndertowServerHttpResponse.this.responseChannel.flush();
channel.flush();
}
}
@ -255,7 +262,13 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
protected boolean isWritePossible() {
return UndertowServerHttpResponse.this.isWritePossible();
StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel;
if (channel != null) {
// We can always call flush, just ensure writes are on..
channel.resumeWrites();
return true;
}
return false;
}
@Override