From bdb63483df2cd81bdbcc963db8789f37ea2a6727 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Tue, 14 Jul 2015 00:08:40 +0200 Subject: [PATCH] Common DataWithMediaType class and common synchronization for ResponseBodyEmitter/SseEmitter Issue: SPR-13223 Issue: SPR-13224 --- .../annotation/ResponseBodyEmitter.java | 157 +++++++++--------- .../mvc/method/annotation/SseEmitter.java | 61 +++---- .../annotation/ResponseBodyEmitterTests.java | 15 ++ 3 files changed, 125 insertions(+), 108 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index 500eb716b6..9b12fb0955 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -17,8 +17,8 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.LinkedHashSet; +import java.util.Set; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; @@ -54,18 +54,18 @@ import org.springframework.util.Assert; * * * @author Rossen Stoyanchev + * @author Juergen Hoeller * @since 4.2 */ public class ResponseBodyEmitter { private final Long timeout; - private volatile Handler handler; + private final Set earlySendAttempts = new LinkedHashSet(8); - /* Cache for objects sent before handler is set. */ - private final Map initHandlerCache = new LinkedHashMap(10); + private Handler handler; - private volatile boolean complete; + private boolean complete; private Throwable failure; @@ -110,32 +110,29 @@ public class ResponseBodyEmitter { protected void extendResponse(ServerHttpResponse outputMessage) { } - void initialize(Handler handler) throws IOException { - synchronized (this) { - this.handler = handler; - for (Map.Entry entry : this.initHandlerCache.entrySet()) { - try { - sendInternal(entry.getKey(), entry.getValue()); - } - catch (Throwable ex) { - return; - } + synchronized void initialize(Handler handler) throws IOException { + this.handler = handler; + + for (DataWithMediaType sendAttempt : this.earlySendAttempts) { + sendInternal(sendAttempt.getData(), sendAttempt.getMediaType()); + } + this.earlySendAttempts.clear(); + + if (this.complete) { + if (this.failure != null) { + this.handler.completeWithError(this.failure); } - if (this.complete) { - if (this.failure != null) { - this.handler.completeWithError(this.failure); - } - else { - this.handler.complete(); - } - } - if (this.timeoutCallback != null) { - this.handler.onTimeout(this.timeoutCallback); - } - if (this.completionCallback != null) { - this.handler.onCompletion(this.completionCallback); + else { + this.handler.complete(); } } + + if (this.timeoutCallback != null) { + this.handler.onTimeout(this.timeoutCallback); + } + if (this.completionCallback != null) { + this.handler.onCompletion(this.completionCallback); + } } /** @@ -159,33 +156,29 @@ public class ResponseBodyEmitter { * @throws IOException raised when an I/O error occurs * @throws java.lang.IllegalStateException wraps any other errors */ - public void send(Object object, MediaType mediaType) throws IOException { + public synchronized void send(Object object, MediaType mediaType) throws IOException { Assert.state(!this.complete, "ResponseBodyEmitter is already set complete"); sendInternal(object, mediaType); } private void sendInternal(Object object, MediaType mediaType) throws IOException { - if (object == null) { - return; - } - if (this.handler == null) { - synchronized (this) { - if (this.handler == null) { - this.initHandlerCache.put(object, mediaType); - return; + if (object != null) { + if (this.handler != null) { + try { + this.handler.send(object, mediaType); + } + catch (IOException ex) { + this.handler.completeWithError(ex); + throw ex; + } + catch (Throwable ex) { + this.handler.completeWithError(ex); + throw new IllegalStateException("Failed to send " + object, ex); } } - } - try { - this.handler.send(object, mediaType); - } - catch (IOException ex){ - this.handler.completeWithError(ex); - throw ex; - } - catch (Throwable ex){ - this.handler.completeWithError(ex); - throw new IllegalStateException("Failed to send " + object, ex); + else { + this.earlySendAttempts.add(new DataWithMediaType(object, mediaType)); + } } } @@ -194,12 +187,10 @@ public class ResponseBodyEmitter { *

A dispatch is made into the app server where Spring MVC completes * asynchronous request processing. */ - public void complete() { - synchronized (this) { - this.complete = true; - if (this.handler != null) { - this.handler.complete(); - } + public synchronized void complete() { + this.complete = true; + if (this.handler != null) { + this.handler.complete(); } } @@ -208,13 +199,11 @@ public class ResponseBodyEmitter { *

A dispatch is made into the app server where Spring MVC will pass the * exception through its exception handling mechanism. */ - public void completeWithError(Throwable ex) { - synchronized (this) { - this.complete = true; - this.failure = ex; - if (this.handler != null) { - this.handler.completeWithError(ex); - } + public synchronized void completeWithError(Throwable ex) { + this.complete = true; + this.failure = ex; + if (this.handler != null) { + this.handler.completeWithError(ex); } } @@ -222,12 +211,10 @@ public class ResponseBodyEmitter { * Register code to invoke when the async request times out. This method is * called from a container thread when an async request times out. */ - public void onTimeout(Runnable callback) { - synchronized (this) { - this.timeoutCallback = callback; - if (this.handler != null) { - this.handler.onTimeout(callback); - } + public synchronized void onTimeout(Runnable callback) { + this.timeoutCallback = callback; + if (this.handler != null) { + this.handler.onTimeout(callback); } } @@ -237,12 +224,10 @@ public class ResponseBodyEmitter { * reason including timeout and network error. This method is useful for * detecting that a {@code ResponseBodyEmitter} instance is no longer usable. */ - public void onCompletion(Runnable callback) { - synchronized (this) { - this.completionCallback = callback; - if (this.handler != null) { - this.handler.onCompletion(callback); - } + public synchronized void onCompletion(Runnable callback) { + this.completionCallback = callback; + if (this.handler != null) { + this.handler.onCompletion(callback); } } @@ -263,4 +248,28 @@ public class ResponseBodyEmitter { void onCompletion(Runnable callback); } + + /** + * Simple struct for a data entry. + */ + static class DataWithMediaType { + + private final Object data; + + private final MediaType mediaType; + + public DataWithMediaType(Object data, MediaType mediaType) { + this.data = data; + this.mediaType = mediaType; + } + + public Object getData() { + return this.data; + } + + public MediaType getMediaType() { + return this.mediaType; + } + } + } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java index 42bb931ecb..521c8aa91c 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java @@ -19,8 +19,8 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.io.IOException; import java.nio.charset.Charset; import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.LinkedHashSet; +import java.util.Set; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; @@ -31,6 +31,7 @@ import org.springframework.http.server.ServerHttpResponse; * Server-Sent Events. * * @author Rossen Stoyanchev + * @author Juergen Hoeller * @since 4.2 */ public class SseEmitter extends ResponseBodyEmitter { @@ -41,6 +42,7 @@ public class SseEmitter extends ResponseBodyEmitter { @Override protected void extendResponse(ServerHttpResponse outputMessage) { super.extendResponse(outputMessage); + HttpHeaders headers = outputMessage.getHeaders(); if (headers.getContentType() == null) { headers.setContentType(new MediaType("text", "event-stream")); @@ -75,14 +77,12 @@ public class SseEmitter extends ResponseBodyEmitter { * @param object the object to write * @param mediaType a MediaType hint for selecting an HttpMessageConverter * @throws IOException raised when an I/O error occurs - * @throws java.lang.IllegalStateException wraps any other errors */ @Override public void send(Object object, MediaType mediaType) throws IOException { - if (object == null) { - return; + if (object != null) { + send(event().data(object, mediaType)); } - send(event().data(object, mediaType)); } /** @@ -95,18 +95,19 @@ public class SseEmitter extends ResponseBodyEmitter { * * @param builder a builder for an SSE formatted event. * @throws IOException raised when an I/O error occurs - * @throws java.lang.IllegalStateException wraps any other errors */ public void send(SseEventBuilder builder) throws IOException { - Map map = builder.build(); - for (Map.Entry entry : map.entrySet()) { - super.send(entry.getKey(), entry.getValue()); + Set dataToSend = ((SseEventBuilderImpl) builder).build(); + synchronized (this) { + for (DataWithMediaType entry : dataToSend) { + super.send(entry.getData(), entry.getMediaType()); + } } } public static SseEventBuilder event() { - return new DefaultSseEventBuilder(); + return new SseEventBuilderImpl(); } @@ -144,22 +145,15 @@ public class SseEmitter extends ResponseBodyEmitter { * Add an SSE "data" line. */ SseEventBuilder data(Object object, MediaType mediaType); - - /** - * Return a map with objects that represent the data to be written to - * the response as well as the required SSE text formatting that - * surrounds it. - */ - Map build(); } /** * Default implementation of SseEventBuilder. */ - private static class DefaultSseEventBuilder implements SseEventBuilder { + private static class SseEventBuilderImpl implements SseEventBuilder { - private final Map map = new LinkedHashMap(4); + private final Set dataToSend = new LinkedHashSet(4); private StringBuilder sb; @@ -196,12 +190,12 @@ public class SseEmitter extends ResponseBodyEmitter { public SseEventBuilder data(Object object, MediaType mediaType) { append("data:"); saveAppendedText(); - this.map.put(object, mediaType); + this.dataToSend.add(new DataWithMediaType(object, mediaType)); append("\n"); return this; } - DefaultSseEventBuilder append(String text) { + SseEventBuilderImpl append(String text) { if (this.sb == null) { this.sb = new StringBuilder(); } @@ -209,21 +203,20 @@ public class SseEmitter extends ResponseBodyEmitter { return this; } - private void saveAppendedText() { - if (this.sb != null) { - this.map.put(this.sb.toString(), TEXT_PLAIN); - this.sb = null; - } - } - - @Override - public Map build() { - if (this.sb == null || this.sb.length() == 0 && this.map.isEmpty()) { - return Collections.emptyMap(); + Set build() { + if ((this.sb == null || this.sb.length() == 0) && this.dataToSend.isEmpty()) { + return Collections.emptySet(); } append("\n"); saveAppendedText(); - return this.map; + return this.dataToSend; + } + + private void saveAppendedText() { + if (this.sb != null) { + this.dataToSend.add(new DataWithMediaType(this.sb.toString(), TEXT_PLAIN)); + this.sb = null; + } } } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java index 18654bb29a..f7112d3af0 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java @@ -31,7 +31,9 @@ import static org.junit.Assert.fail; /** * Unit tests for {@link ResponseBodyEmitter}. + * * @author Rossen Stoyanchev + * @author Tomasz Nurkiewicz */ public class ResponseBodyEmitterTests { @@ -62,6 +64,19 @@ public class ResponseBodyEmitterTests { verifyNoMoreInteractions(this.handler); } + @Test + public void sendDuplicateBeforeHandlerInitialized() throws Exception { + this.emitter.send("foo", MediaType.TEXT_PLAIN); + this.emitter.send("foo", MediaType.TEXT_PLAIN); + this.emitter.complete(); + verifyNoMoreInteractions(this.handler); + + this.emitter.initialize(this.handler); + verify(this.handler, times(2)).send("foo", MediaType.TEXT_PLAIN); + verify(this.handler).complete(); + verifyNoMoreInteractions(this.handler); + } + @Test public void sendBeforeHandlerInitializedWithError() throws Exception { IllegalStateException ex = new IllegalStateException();