From a999f40daaac6fd77ccf48ed57ffd13eb5eab9ed Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 21 Mar 2017 17:32:14 -0400 Subject: [PATCH] Polish + minor refactoring of SSE reader and writer Instead of accepting List and then look for the first to support JSON, always expect a single JSON [Encoder|Decoder] and use that unconditionally. When writing use the nested ResolvableType instead of the Class of the actual value which should better support generics. Remove the SSE hint and pass "text/event-stream" as the media type instead to serve as a hint. We are expecting a JSON encoder and using it unconditionally in any case so this should be good enough. --- .../ServerSentEventHttpMessageReader.java | 47 +++-- .../ServerSentEventHttpMessageWriter.java | 161 ++++++++---------- .../http/codec/json/Jackson2JsonEncoder.java | 22 +-- ...ServerSentEventHttpMessageReaderTests.java | 5 +- ...ServerSentEventHttpMessageWriterTests.java | 83 +++++---- .../config/WebFluxConfigurationSupport.java | 9 +- .../DefaultExchangeStrategiesBuilder.java | 10 +- .../DefaultHandlerStrategiesBuilder.java | 6 +- .../reactive/function/BodyInsertersTests.java | 10 +- 9 files changed, 155 insertions(+), 198 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java index bf2e182107c..ceae7ced20c 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java @@ -29,7 +29,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; -import org.springframework.core.codec.CodecException; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.DataBuffer; @@ -39,7 +38,6 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.util.Assert; -import org.springframework.util.MimeTypeUtils; import static java.util.stream.Collectors.joining; @@ -61,18 +59,22 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader> dataDecoders; + private final Decoder decoder; - public ServerSentEventHttpMessageReader() { - this.dataDecoders = Collections.emptyList(); + /** + * Constructor with JSON {@code Encoder} for encoding objects. + */ + public ServerSentEventHttpMessageReader(Decoder decoder) { + Assert.notNull(decoder, "Decoder must not be null"); + this.decoder = decoder; } - public ServerSentEventHttpMessageReader(List> dataDecoders) { - Assert.notNull(dataDecoders, "'dataDecoders' must not be null"); - this.dataDecoders = new ArrayList<>(dataDecoders); - } + @Override + public List getReadableMediaTypes() { + return Collections.singletonList(MediaType.TEXT_EVENT_STREAM); + } @Override public boolean canRead(ResolvableType elementType, MediaType mediaType) { @@ -80,18 +82,13 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader getReadableMediaTypes() { - return Collections.singletonList(MediaType.TEXT_EVENT_STREAM); - } - @Override public Flux read(ResolvableType elementType, ReactiveHttpInputMessage message, Map hints) { - boolean hasSseWrapper = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass()); - ResolvableType dataType = (hasSseWrapper ? elementType.getGeneric(0) : elementType); + boolean shouldWrap = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass()); + ResolvableType valueType = shouldWrap ? elementType.getGeneric(0) : elementType; return Flux.from(message.getBody()) .concatMap(ServerSentEventHttpMessageReader::splitOnNewline) @@ -103,8 +100,8 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader line.equals("\n")) .concatMap(rawLines -> { String[] lines = rawLines.stream().collect(joining()).split("\\r?\\n"); - ServerSentEvent event = buildEvent(lines, dataType, hints); - return (hasSseWrapper ? Mono.just(event) : Mono.justOrEmpty(event.data())); + ServerSentEvent event = buildEvent(lines, valueType, hints); + return (shouldWrap ? Mono.just(event) : Mono.justOrEmpty(event.data())); }) .cast(Object.class); } @@ -126,7 +123,8 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader buildEvent(String[] lines, ResolvableType dataType, Map hints) { + private ServerSentEvent buildEvent(String[] lines, ResolvableType valueType, + Map hints) { ServerSentEvent.Builder sseBuilder = ServerSentEvent.builder(); StringBuilder mutableData = new StringBuilder(); @@ -151,7 +149,7 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader 0) { String data = mutableData.toString(); - sseBuilder.data(decodeData(data, dataType, hints)); + sseBuilder.data(decodeData(data, valueType, hints)); } if (mutableComment.length() > 0) { String comment = mutableComment.toString(); @@ -169,11 +167,8 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader input = Mono.just(bufferFactory.wrap(bytes)); - return this.dataDecoders.stream() - .filter(e -> e.canDecode(dataType, MimeTypeUtils.APPLICATION_JSON)) - .findFirst() - .orElseThrow(() -> new CodecException("No suitable decoder found!")) - .decodeToMono(input, dataType, MimeTypeUtils.APPLICATION_JSON, hints) + return this.decoder + .decodeToMono(input, dataType, MediaType.TEXT_EVENT_STREAM, hints) .block(Duration.ZERO); } @@ -181,7 +176,7 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader readMono(ResolvableType elementType, ReactiveHttpInputMessage message, Map hints) { - // Let's give StringDecoder a chance since SSE is ordered ahead of it + // For single String give StringDecoder a chance which comes after SSE in the order if (String.class.equals(elementType.getRawClass())) { Flux body = message.getBody(); diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java index b25daae2f17..429e4c264c1 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java @@ -17,19 +17,16 @@ package org.springframework.http.codec; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; -import org.springframework.core.codec.CodecException; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -38,39 +35,37 @@ import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; -import org.springframework.util.MimeTypeUtils; /** - * Writer that supports a stream of {@link ServerSentEvent}s and also plain - * {@link Object}s which is the same as an {@link ServerSentEvent} with data - * only. + * {@code ServerHttpMessageWriter} for {@code "text/event-stream"} responses. * * @author Sebastien Deleuze * @author Arjen Poutsma + * @author Rossen Stoyanchev * @since 5.0 */ public class ServerSentEventHttpMessageWriter implements ServerHttpMessageWriter { + private static final List WRITABLE_MEDIA_TYPES = + Collections.singletonList(MediaType.TEXT_EVENT_STREAM); + + + private final Encoder encoder; + + /** - * Server-Sent Events hint key expecting a {@link Boolean} value which when set to true - * will adapt the content in order to comply with Server-Sent Events recommendation. - * For example, it will append "data:" after each line break with data encoders - * supporting it. - * @see Server-Sent Events W3C recommendation + * Constructor with JSON {@code Encoder} for encoding objects. Support for + * {@code String} event data is built-in. */ - public static final String SSE_CONTENT_HINT = ServerSentEventHttpMessageWriter.class.getName() + ".sseContent"; - - - private final List> dataEncoders; - - - public ServerSentEventHttpMessageWriter() { - this.dataEncoders = Collections.emptyList(); + public ServerSentEventHttpMessageWriter(Encoder encoder) { + Assert.notNull(encoder, "'encoder' must not be null"); + this.encoder = encoder; } - public ServerSentEventHttpMessageWriter(List> dataEncoders) { - Assert.notNull(dataEncoders, "'dataEncoders' must not be null"); - this.dataEncoders = new ArrayList<>(dataEncoders); + + @Override + public List getWritableMediaTypes() { + return WRITABLE_MEDIA_TYPES; } @@ -81,61 +76,35 @@ public class ServerSentEventHttpMessageWriter implements ServerHttpMessageWriter } @Override - public List getWritableMediaTypes() { - return Collections.singletonList(MediaType.TEXT_EVENT_STREAM); - } - - @Override - public Mono write(Publisher inputStream, ResolvableType elementType, MediaType mediaType, + public Mono write(Publisher input, ResolvableType elementType, MediaType mediaType, ReactiveHttpOutputMessage message, Map hints) { message.getHeaders().setContentType(MediaType.TEXT_EVENT_STREAM); - - DataBufferFactory bufferFactory = message.bufferFactory(); - Flux> body = encode(inputStream, bufferFactory, elementType, hints); - - return message.writeAndFlushWith(body); + return message.writeAndFlushWith(encode(input, message.bufferFactory(), elementType, hints)); } - private Flux> encode(Publisher inputStream, DataBufferFactory bufferFactory, - ResolvableType type, Map hints) { + private Flux> encode(Publisher input, DataBufferFactory factory, + ResolvableType elementType, Map hints) { - Map hintsWithSse = new HashMap<>(hints); - hintsWithSse.put(SSE_CONTENT_HINT, true); - return Flux.from(inputStream) - .map(o -> toSseEvent(o, type)) - .map(sse -> { - StringBuilder sb = new StringBuilder(); - sse.id().ifPresent(id -> writeField("id", id, sb)); - sse.event().ifPresent(event -> writeField("event", event, sb)); - sse.retry().ifPresent(retry -> writeField("retry", retry.toMillis(), sb)); - sse.comment().ifPresent(comment -> { - comment = comment.replaceAll("\\n", "\n:"); - sb.append(':').append(comment).append("\n"); - }); - Flux dataBuffer = sse.data() - .>map(data -> { - sb.append("data:"); - if (data instanceof String) { - String stringData = ((String) data).replaceAll("\\n", "\ndata:"); - sb.append(stringData).append('\n'); - return Flux.empty(); - } - else { - return applyEncoder(data, bufferFactory, hintsWithSse); - } - }).orElse(Flux.empty()); + ResolvableType valueType = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass()) ? + elementType.getGeneric(0) : elementType; - return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer, - encodeString("\n", bufferFactory)); - }); + return Flux.from(input).map(element -> { - } + ServerSentEvent sse = element instanceof ServerSentEvent ? + (ServerSentEvent) element : ServerSentEvent.builder().data(element).build(); - private ServerSentEvent toSseEvent(Object data, ResolvableType type) { - return ServerSentEvent.class.isAssignableFrom(type.getRawClass()) - ? (ServerSentEvent) data - : ServerSentEvent.builder().data(data).build(); + StringBuilder sb = new StringBuilder(); + sse.id().ifPresent(v -> writeField("id", v, sb)); + sse.event().ifPresent(v -> writeField("event", v, sb)); + sse.retry().ifPresent(v -> writeField("retry", v.toMillis(), sb)); + sse.comment().ifPresent(v -> sb.append(':').append(v.replaceAll("\\n", "\n:")).append("\n")); + sse.data().ifPresent(v -> sb.append("data:")); + + return Flux.concat(encodeText(sb, factory), + encodeData(sse, valueType, factory, hints), + encodeText("\n", factory)); + }); } private void writeField(String fieldName, Object fieldValue, StringBuilder stringBuilder) { @@ -146,40 +115,50 @@ public class ServerSentEventHttpMessageWriter implements ServerHttpMessageWriter } @SuppressWarnings("unchecked") - private Flux applyEncoder(Object data, DataBufferFactory bufferFactory, Map hints) { - ResolvableType elementType = ResolvableType.forClass(data.getClass()); - Optional> encoder = dataEncoders - .stream() - .filter(e -> e.canEncode(elementType, MimeTypeUtils.APPLICATION_JSON)) - .findFirst(); - return ((Encoder) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!"))) - .encode(Mono.just((T) data), bufferFactory, elementType, MimeTypeUtils.APPLICATION_JSON, hints) - .concatWith(encodeString("\n", bufferFactory)); + private Flux encodeData(ServerSentEvent event, ResolvableType valueType, + DataBufferFactory factory, Map hints) { + + Object data = event.data().orElse(null); + if (data == null) { + return Flux.empty(); + } + + if (data instanceof String) { + String text = (String) data; + return Flux.from(encodeText(text.replaceAll("\\n", "\ndata:") + "\n", factory)); + } + + return ((Encoder) this.encoder) + .encode(Mono.just((T) data), factory, valueType, MediaType.TEXT_EVENT_STREAM, hints) + .concatWith(encodeText("\n", factory)); } - private Mono encodeString(String str, DataBufferFactory bufferFactory) { - byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + private Mono encodeText(CharSequence text, DataBufferFactory bufferFactory) { + byte[] bytes = text.toString().getBytes(StandardCharsets.UTF_8); DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length).write(bytes); return Mono.just(buffer); } @Override - public Mono write(Publisher inputStream, ResolvableType actualType, ResolvableType elementType, + public Mono write(Publisher input, ResolvableType actualType, ResolvableType elementType, MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response, Map hints) { - Map allHints = this.dataEncoders.stream() - .filter(encoder -> encoder instanceof ServerHttpEncoder) - .map(encoder -> (ServerHttpEncoder) encoder) - .map(encoder -> encoder.getEncodeHints(actualType, elementType, mediaType, request, response)) - .reduce(new HashMap<>(), (t, u) -> { - t.putAll(u); - return t; - }); - + Map allHints = new HashMap<>(); + allHints.putAll(getEncodeHints(actualType, elementType, mediaType, request, response)); allHints.putAll(hints); - return write(inputStream, elementType, mediaType, response, allHints); + return write(input, elementType, mediaType, response, allHints); + } + + private Map getEncodeHints(ResolvableType actualType, ResolvableType elementType, + MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response) { + + if (this.encoder instanceof ServerHttpEncoder) { + ServerHttpEncoder httpEncoder = (ServerHttpEncoder) this.encoder; + return httpEncoder.getEncodeHints(actualType, elementType, mediaType, request, response); + } + return Collections.emptyMap(); } } diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java index 10741e82e41..06fd3f30274 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SerializationConfig; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.type.TypeFactory; import org.reactivestreams.Publisher; @@ -42,7 +41,6 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerHttpEncoder; -import org.springframework.http.codec.ServerSentEventHttpMessageWriter; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; @@ -97,20 +95,24 @@ public class Jackson2JsonEncoder extends Jackson2CodecSupport implements ServerH Assert.notNull(elementType, "'elementType' must not be null"); if (inputStream instanceof Mono) { - return Flux.from(inputStream).map(value -> encodeValue(value, bufferFactory, elementType, hints)); + return Flux.from(inputStream).map(value -> + encodeValue(value, mimeType, bufferFactory, elementType, hints)); } else if (APPLICATION_STREAM_JSON.isCompatibleWith(mimeType)) { return Flux.from(inputStream).map(value -> { - DataBuffer buffer = encodeValue(value, bufferFactory, elementType, hints); + DataBuffer buffer = encodeValue(value, mimeType, bufferFactory, elementType, hints); buffer.write(new byte[]{'\n'}); return buffer; }); } - ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); - return Flux.from(inputStream).collectList().map(list -> encodeValue(list, bufferFactory, listType, hints)).flux(); + else { + ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); + return Flux.from(inputStream).collectList().map(list -> + encodeValue(list, mimeType, bufferFactory, listType, hints)).flux(); + } } - private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, + private DataBuffer encodeValue(Object value, MimeType mimeType, DataBufferFactory bufferFactory, ResolvableType elementType, Map hints) { TypeFactory typeFactory = this.mapper.getTypeFactory(); @@ -126,9 +128,9 @@ public class Jackson2JsonEncoder extends Jackson2CodecSupport implements ServerH writer = writer.forType(javaType); } - SerializationConfig config = writer.getConfig(); - Boolean sseHint = (Boolean) hints.get(ServerSentEventHttpMessageWriter.SSE_CONTENT_HINT); - if (Boolean.TRUE.equals(sseHint) && config.isEnabled(SerializationFeature.INDENT_OUTPUT)) { + if (MediaType.TEXT_EVENT_STREAM.isCompatibleWith(mimeType) && + writer.getConfig().isEnabled(SerializationFeature.INDENT_OUTPUT)) { + writer = writer.with(this.ssePrettyPrinter); } diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java index 56dd5a1eea4..11182584bee 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java @@ -21,7 +21,6 @@ import java.util.Collections; import org.junit.Test; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; @@ -39,8 +38,8 @@ import static org.junit.Assert.assertTrue; */ public class ServerSentEventHttpMessageReaderTests extends AbstractDataBufferAllocatingTestCase { - private ServerSentEventHttpMessageReader messageReader = new ServerSentEventHttpMessageReader( - Collections.singletonList(new Jackson2JsonDecoder())); + private ServerSentEventHttpMessageReader messageReader = + new ServerSentEventHttpMessageReader(new Jackson2JsonDecoder()); @Test public void cantRead() { diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java index d7ca424ce5e..34eb086535f 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,14 +18,15 @@ package org.springframework.http.codec; import java.time.Duration; import java.util.Collections; +import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.http.MediaType; import org.springframework.http.codec.json.Jackson2JsonEncoder; @@ -34,49 +35,45 @@ import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.springframework.core.ResolvableType.forClass; /** + * Unit tests for {@link ServerSentEventHttpMessageWriter}. * @author Sebastien Deleuze + * @author Rossen Stoyanchev */ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase { - private ServerSentEventHttpMessageWriter messageWriter = new ServerSentEventHttpMessageWriter( - Collections.singletonList(new Jackson2JsonEncoder())); + public static final Map HINTS = Collections.emptyMap(); + + private ServerSentEventHttpMessageWriter messageWriter = + new ServerSentEventHttpMessageWriter(new Jackson2JsonEncoder()); + @Test - public void cantRead() { - assertFalse(messageWriter.canWrite(ResolvableType.forClass(Object.class), - new MediaType("foo", "bar"))); + public void canWrite() { + assertTrue(this.messageWriter.canWrite(forClass(Object.class), null)); + assertTrue(this.messageWriter.canWrite(null, MediaType.TEXT_EVENT_STREAM)); + assertTrue(this.messageWriter.canWrite(forClass(ServerSentEvent.class), new MediaType("foo", "bar"))); } @Test - public void canRead() { - assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class), null)); - assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class), - new MediaType("text", "event-stream"))); - assertTrue(messageWriter.canWrite(ResolvableType.forClass(ServerSentEvent.class), - new MediaType("bar", "bar"))); + public void canNotWrite() { + assertFalse(this.messageWriter.canWrite(forClass(Object.class), new MediaType("foo", "bar"))); } @Test public void writeServerSentEvent() { - ServerSentEvent event = ServerSentEvent.builder(). - data("bar").id("c42").event("foo").comment("bla\nbla bla\nbla bla bla") - .retry(Duration.ofMillis(123L)).build(); - Mono> source = Mono.just(event); + ServerSentEvent event = ServerSentEvent.builder().data("bar").id("c42").event("foo") + .comment("bla\nbla bla\nbla bla bla").retry(Duration.ofMillis(123L)).build(); + + Mono source = Mono.just(event); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class), - new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); + testWrite(source, outputMessage, ServerSentEvent.class); StepVerifier.create(outputMessage.getBodyAsString()) - .expectNext("id:c42\n" + - "event:foo\n" + - "retry:123\n" + - ":bla\n" + - ":bla bla\n" + - ":bla bla bla\n" + - "data:bar\n\n") + .expectNext("id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n") .expectComplete() .verify(); } @@ -85,8 +82,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll public void writeString() { Flux source = Flux.just("foo", "bar"); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - messageWriter.write(source, ResolvableType.forClass(String.class), - new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); + testWrite(source, outputMessage, String.class); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:foo\n\ndata:bar\n\n") @@ -98,25 +94,19 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll public void writeMultiLineString() { Flux source = Flux.just("foo\nbar", "foo\nbaz"); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - messageWriter.write(source, ResolvableType.forClass(String.class), - new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); + testWrite(source, outputMessage, String.class); StepVerifier.create(outputMessage.getBodyAsString()) - .expectNext("data:foo\n" + - "data:bar\n\n" + - "data:foo\n" + - "data:baz\n\n") + .expectNext("data:foo\ndata:bar\n\ndata:foo\ndata:baz\n\n") .expectComplete() .verify(); } @Test public void writePojo() { - Flux source = Flux.just(new Pojo("foofoo", "barbar"), - new Pojo("foofoofoo", "barbarbar")); + Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - messageWriter.write(source, ResolvableType.forClass(Pojo.class), - MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); + testWrite(source, outputMessage, Pojo.class); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n\n" + @@ -127,15 +117,13 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll @Test // SPR-14899 public void writePojoWithPrettyPrint() { - ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().indentOutput(true).build(); - this.messageWriter = new ServerSentEventHttpMessageWriter( - Collections.singletonList(new Jackson2JsonEncoder(mapper))); - Flux source = Flux.just(new Pojo("foofoo", "barbar"), - new Pojo("foofoofoo", "barbarbar")); + ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().indentOutput(true).build(); + this.messageWriter = new ServerSentEventHttpMessageWriter(new Jackson2JsonEncoder(mapper)); + + Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - messageWriter.write(source, ResolvableType.forClass(Pojo.class), - MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); + testWrite(source, outputMessage, Pojo.class); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:{\n" + @@ -148,4 +136,9 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll .verify(); } + private void testWrite(Publisher source, MockServerHttpResponse outputMessage, Class clazz) { + this.messageWriter.write(source, forClass(clazz), + MediaType.TEXT_EVENT_STREAM, outputMessage, HINTS).block(Duration.ofMillis(5000)); + } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java index 8916dd71574..5119b1d5206 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java @@ -37,7 +37,6 @@ import org.springframework.core.codec.ByteBufferEncoder; import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; -import org.springframework.core.codec.Encoder; import org.springframework.core.codec.ResourceDecoder; import org.springframework.core.codec.StringDecoder; import org.springframework.core.convert.converter.Converter; @@ -475,7 +474,6 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware { * {@link #configureMessageWriters(List)}. */ protected final void addDefaultHttpMessageWriters(List> writers) { - List> sseDataEncoders = new ArrayList<>(); writers.add(new EncoderHttpMessageWriter<>(new ByteArrayEncoder())); writers.add(new EncoderHttpMessageWriter<>(new ByteBufferEncoder())); writers.add(new EncoderHttpMessageWriter<>(new DataBufferEncoder())); @@ -485,11 +483,10 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware { writers.add(new EncoderHttpMessageWriter<>(new Jaxb2XmlEncoder())); } if (jackson2Present) { - Jackson2JsonEncoder encoder = new Jackson2JsonEncoder(); - writers.add(new EncoderHttpMessageWriter<>(encoder)); - sseDataEncoders.add(encoder); + Jackson2JsonEncoder jacksonEncoder = new Jackson2JsonEncoder(); + writers.add(new EncoderHttpMessageWriter<>(jacksonEncoder)); + writers.add(new ServerSentEventHttpMessageWriter(jacksonEncoder)); } - writers.add(new ServerSentEventHttpMessageWriter(sseDataEncoders)); } /** diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultExchangeStrategiesBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultExchangeStrategiesBuilder.java index c87695aaed7..f0519dd1f2b 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultExchangeStrategiesBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultExchangeStrategiesBuilder.java @@ -77,7 +77,10 @@ class DefaultExchangeStrategiesBuilder implements ExchangeStrategies.Builder { private void defaultReaders() { messageReader(new DecoderHttpMessageReader<>(new ByteArrayDecoder())); messageReader(new DecoderHttpMessageReader<>(new ByteBufferDecoder())); - messageReader(new ServerSentEventHttpMessageReader(sseDecoders())); + if (jackson2Present) { + // SSE ahead of String e.g. "test/event-stream" + Flux + messageReader(new ServerSentEventHttpMessageReader(new Jackson2JsonDecoder())); + } messageReader(new DecoderHttpMessageReader<>(new StringDecoder(false))); if (jaxb2Present) { messageReader(new DecoderHttpMessageReader<>(new Jaxb2XmlDecoder())); @@ -87,11 +90,6 @@ class DefaultExchangeStrategiesBuilder implements ExchangeStrategies.Builder { } } - private List> sseDecoders() { - return jackson2Present ? Collections.singletonList(new Jackson2JsonDecoder()) : - Collections.emptyList(); - } - private void defaultWriters() { messageWriter(new EncoderHttpMessageWriter<>(new ByteArrayEncoder())); messageWriter(new EncoderHttpMessageWriter<>(new ByteBufferEncoder())); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java index 378687ca53e..34e784965bb 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java @@ -99,11 +99,7 @@ class DefaultHandlerStrategiesBuilder implements HandlerStrategies.Builder { messageReader(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder())); Jackson2JsonEncoder jsonEncoder = new Jackson2JsonEncoder(); messageWriter(new EncoderHttpMessageWriter<>(jsonEncoder)); - messageWriter( - new ServerSentEventHttpMessageWriter(Collections.singletonList(jsonEncoder))); - } - else { - messageWriter(new ServerSentEventHttpMessageWriter()); + messageWriter(new ServerSentEventHttpMessageWriter(jsonEncoder)); } localeResolver(DEFAULT_LOCALE_RESOLVER); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java index 68a0b46bbcf..6f65964b644 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,9 +58,9 @@ import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import static java.nio.charset.StandardCharsets.*; -import static org.junit.Assert.*; -import static org.springframework.http.codec.json.Jackson2CodecSupport.*; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertArrayEquals; +import static org.springframework.http.codec.json.Jackson2CodecSupport.JSON_VIEW_HINT; /** * @author Arjen Poutsma @@ -83,8 +82,7 @@ public class BodyInsertersTests { messageWriters.add(new EncoderHttpMessageWriter<>(new Jaxb2XmlEncoder())); Jackson2JsonEncoder jsonEncoder = new Jackson2JsonEncoder(); messageWriters.add(new EncoderHttpMessageWriter<>(jsonEncoder)); - messageWriters - .add(new ServerSentEventHttpMessageWriter(Collections.singletonList(jsonEncoder))); + messageWriters.add(new ServerSentEventHttpMessageWriter(jsonEncoder)); messageWriters.add(new FormHttpMessageWriter()); this.context = new BodyInserter.Context() {