From 0af847c01c5e2e181fd1215e2e2450b189286ba6 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 23 Mar 2018 18:57:50 -0400 Subject: [PATCH] ServerSentEventHttpMessageReader internal refactoring Eliminate use of .block() which Reactor now flags as illegal on schedulers where that's not expected. --- .../ServerSentEventHttpMessageReader.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java index 6567a252c78..b41910f103f 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.IntPredicate; +import java.util.stream.Collectors; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,8 +41,6 @@ import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.lang.Nullable; -import static java.util.stream.Collectors.joining; - /** * Reader that supports a stream of {@link ServerSentEvent}s and also plain * {@link Object}s which is the same as an {@link ServerSentEvent} with data only. @@ -120,9 +119,10 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader line.equals("\n")) .concatMap(rawLines -> { - String[] lines = rawLines.stream().collect(joining()).split("\\r?\\n"); - ServerSentEvent event = buildEvent(lines, valueType, hints); - return (shouldWrap ? Mono.just(event) : Mono.justOrEmpty(event.data())); + String[] lines = rawLines.stream().collect(Collectors.joining()).split("\\r?\\n"); + return buildEvent(lines, valueType, hints) + .filter(event -> shouldWrap || event.data() != null) + .map(event -> shouldWrap ? event : event.data()); }) .cast(Object.class); } @@ -144,12 +144,12 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader buildEvent(String[] lines, ResolvableType valueType, + private Mono> buildEvent(String[] lines, ResolvableType valueType, Map hints) { ServerSentEvent.Builder sseBuilder = ServerSentEvent.builder(); - StringBuilder mutableData = new StringBuilder(); - StringBuilder mutableComment = new StringBuilder(); + StringBuilder data = null; + StringBuilder comment = null; for (String line : lines) { if (line.startsWith("id:")) { @@ -159,42 +159,43 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader 0) { - String data = mutableData.toString(); - sseBuilder.data(decodeData(data, valueType, hints)); + if (comment != null) { + sseBuilder.comment(comment.toString().substring(0, comment.length() - 1)); } - if (mutableComment.length() > 0) { - String comment = mutableComment.toString(); - sseBuilder.comment(comment.substring(0, comment.length() - 1)); + if (data != null) { + return decodeData(data.toString(), valueType, hints).map(decodedData -> { + sseBuilder.data(decodedData); + return sseBuilder.build(); + }); + } + else { + return Mono.just(sseBuilder.build()); } - return sseBuilder.build(); } - @Nullable - private Object decodeData(String data, ResolvableType dataType, Map hints) { + private Mono decodeData(String data, ResolvableType dataType, Map hints) { if (String.class == dataType.resolve()) { - return data.substring(0, data.length() - 1); + return Mono.just(data.substring(0, data.length() - 1)); } if (this.decoder == null) { - return Flux.error(new CodecException("No SSE decoder configured and the data is not String.")); + return Mono.error(new CodecException("No SSE decoder configured and the data is not String.")); } byte[] bytes = data.getBytes(StandardCharsets.UTF_8); Mono input = Mono.just(bufferFactory.wrap(bytes)); - - return this.decoder - .decodeToMono(input, dataType, MediaType.TEXT_EVENT_STREAM, hints) - .block(Duration.ZERO); + return this.decoder.decodeToMono(input, dataType, MediaType.TEXT_EVENT_STREAM, hints); } @Override