ResponseBodyEmitter detects timeout/completion

ResponseBodyEmitter now registers by default to receive callbacks
on timeout/completion and sets its internal "complete" flag to true
in order to prevent proactively further use of the emitter.

Issue: SPR-13498
This commit is contained in:
Rossen Stoyanchev 2015-09-25 14:06:33 -04:00
parent a6a6aed17f
commit fdbe8dc4c1
2 changed files with 69 additions and 22 deletions

View File

@ -69,9 +69,9 @@ public class ResponseBodyEmitter {
private Throwable failure; private Throwable failure;
private Runnable timeoutCallback; private final DefaultCallback timeoutCallback = new DefaultCallback();
private Runnable completionCallback; private final DefaultCallback completionCallback = new DefaultCallback();
/** /**
@ -126,11 +126,8 @@ public class ResponseBodyEmitter {
this.handler.complete(); this.handler.complete();
} }
} }
else {
if (this.timeoutCallback != null) {
this.handler.onTimeout(this.timeoutCallback); this.handler.onTimeout(this.timeoutCallback);
}
if (this.completionCallback != null) {
this.handler.onCompletion(this.completionCallback); this.handler.onCompletion(this.completionCallback);
} }
} }
@ -168,11 +165,11 @@ public class ResponseBodyEmitter {
this.handler.send(object, mediaType); this.handler.send(object, mediaType);
} }
catch (IOException ex) { catch (IOException ex) {
this.handler.completeWithError(ex); completeWithError(ex);
throw ex; throw ex;
} }
catch (Throwable ex) { catch (Throwable ex) {
this.handler.completeWithError(ex); completeWithError(ex);
throw new IllegalStateException("Failed to send " + object, ex); throw new IllegalStateException("Failed to send " + object, ex);
} }
} }
@ -212,10 +209,7 @@ public class ResponseBodyEmitter {
* called from a container thread when an async request times out. * called from a container thread when an async request times out.
*/ */
public synchronized void onTimeout(Runnable callback) { public synchronized void onTimeout(Runnable callback) {
this.timeoutCallback = callback; this.timeoutCallback.setDelegate(callback);
if (this.handler != null) {
this.handler.onTimeout(callback);
}
} }
/** /**
@ -225,10 +219,7 @@ public class ResponseBodyEmitter {
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable. * detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
*/ */
public synchronized void onCompletion(Runnable callback) { public synchronized void onCompletion(Runnable callback) {
this.completionCallback = callback; this.completionCallback.setDelegate(callback);
if (this.handler != null) {
this.handler.onCompletion(callback);
}
} }
@ -272,4 +263,22 @@ public class ResponseBodyEmitter {
} }
} }
private class DefaultCallback implements Runnable {
private Runnable delegate;
public void setDelegate(Runnable delegate) {
this.delegate = delegate;
}
@Override
public void run() {
ResponseBodyEmitter.this.complete = true;
if (this.delegate != null) {
this.delegate.run();
}
}
}
} }

View File

@ -20,12 +20,14 @@ import java.io.IOException;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -101,6 +103,8 @@ public class ResponseBodyEmitterTests {
@Test @Test
public void sendAfterHandlerInitialized() throws Exception { public void sendAfterHandlerInitialized() throws Exception {
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler); verifyNoMoreInteractions(this.handler);
this.emitter.send("foo", MediaType.TEXT_PLAIN); this.emitter.send("foo", MediaType.TEXT_PLAIN);
@ -116,6 +120,8 @@ public class ResponseBodyEmitterTests {
@Test @Test
public void sendAfterHandlerInitializedWithError() throws Exception { public void sendAfterHandlerInitializedWithError() throws Exception {
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler); verifyNoMoreInteractions(this.handler);
IllegalStateException ex = new IllegalStateException(); IllegalStateException ex = new IllegalStateException();
@ -132,6 +138,8 @@ public class ResponseBodyEmitterTests {
@Test @Test
public void sendWithError() throws Exception { public void sendWithError() throws Exception {
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler); verifyNoMoreInteractions(this.handler);
IOException failure = new IOException(); IOException failure = new IOException();
@ -153,15 +161,30 @@ public class ResponseBodyEmitterTests {
Runnable runnable = mock(Runnable.class); Runnable runnable = mock(Runnable.class);
this.emitter.onTimeout(runnable); this.emitter.onTimeout(runnable);
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(runnable);
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(this.handler).onTimeout(captor.capture());
verify(this.handler).onCompletion(any());
Assert.notNull(captor.getValue());
captor.getValue().run();
verify(runnable).run();
} }
@Test @Test
public void onTimeoutAfterHandlerInitialized() throws Exception { public void onTimeoutAfterHandlerInitialized() throws Exception {
Runnable runnable = mock(Runnable.class);
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(this.handler).onTimeout(captor.capture());
verify(this.handler).onCompletion(any());
Runnable runnable = mock(Runnable.class);
this.emitter.onTimeout(runnable); this.emitter.onTimeout(runnable);
verify(this.handler).onTimeout(runnable);
Assert.notNull(captor.getValue());
captor.getValue().run();
verify(runnable).run();
} }
@Test @Test
@ -169,15 +192,30 @@ public class ResponseBodyEmitterTests {
Runnable runnable = mock(Runnable.class); Runnable runnable = mock(Runnable.class);
this.emitter.onCompletion(runnable); this.emitter.onCompletion(runnable);
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
verify(this.handler).onCompletion(runnable);
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(this.handler).onTimeout(any());
verify(this.handler).onCompletion(captor.capture());
Assert.notNull(captor.getValue());
captor.getValue().run();
verify(runnable).run();
} }
@Test @Test
public void onCompletionAfterHandlerInitialized() throws Exception { public void onCompletionAfterHandlerInitialized() throws Exception {
Runnable runnable = mock(Runnable.class);
this.emitter.initialize(this.handler); this.emitter.initialize(this.handler);
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(this.handler).onTimeout(any());
verify(this.handler).onCompletion(captor.capture());
Runnable runnable = mock(Runnable.class);
this.emitter.onCompletion(runnable); this.emitter.onCompletion(runnable);
verify(this.handler).onCompletion(runnable);
Assert.notNull(captor.getValue());
captor.getValue().run();
verify(runnable).run();
} }
} }