From df706f4c7cc2c577f44ddf02019dca98fccf5a72 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 7 Feb 2020 21:44:44 +0000 Subject: [PATCH] Restore use of Flux-based encode method in HTTP Closes gh-24441 --- .../springframework/core/codec/Decoder.java | 8 +-- .../springframework/core/codec/Encoder.java | 8 +-- .../http/codec/EncoderHttpMessageWriter.java | 16 +++--- .../ServerSentEventHttpMessageWriter.java | 57 +++++++++---------- .../codec/EncoderHttpMessageWriterTests.java | 4 +- 5 files changed, 45 insertions(+), 48 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java index 219e813f02..5fd981d6ac 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,9 +78,9 @@ public interface Decoder { @Nullable MimeType mimeType, @Nullable Map hints); /** - * Decode a data buffer to an Object of type T. This is useful when the input - * stream consists of discrete messages (or events) and the content for each - * can be decoded on its own. + * Decode a data buffer to an Object of type T. This is useful for scenarios, + * that distinct messages (or events) are decoded and handled individually, + * in fully aggregated form. * @param buffer the {@code DataBuffer} to decode * @param targetType the expected output type * @param mimeType the MIME type associated with the data diff --git a/spring-core/src/main/java/org/springframework/core/codec/Encoder.java b/spring-core/src/main/java/org/springframework/core/codec/Encoder.java index 00f42009ae..aa4f5d8aff 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Encoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Encoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,9 +68,9 @@ public interface Encoder { ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints); /** - * Encode an Object of type T to a data buffer. This is useful for scenarios - * that produce a stream of discrete messages (or events) and the - * content for each is encoded individually. + * Encode an Object of type T to a data buffer. This is useful for scenarios, + * that distinct messages (or events) are encoded and handled individually, + * in fully aggregated form. *

By default this method raises {@link UnsupportedOperationException} * and it is expected that some encoders cannot produce a single buffer or * cannot do so synchronously (e.g. encoding a {@code Resource}). diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index bd6c98ba64..a5ed282741 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import org.springframework.core.codec.AbstractEncoder; import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; @@ -114,24 +113,23 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { MediaType contentType = updateContentType(message, mediaType); + Flux body = this.encoder.encode( + inputStream, message.bufferFactory(), elementType, contentType, hints); + if (inputStream instanceof Mono) { - return Mono.from(inputStream) + return body + .singleOrEmpty() .switchIfEmpty(Mono.defer(() -> { message.getHeaders().setContentLength(0); return message.setComplete().then(Mono.empty()); })) - .flatMap(value -> { - DataBufferFactory factory = message.bufferFactory(); - DataBuffer buffer = this.encoder.encodeValue(value, factory, elementType, contentType, hints); + .flatMap(buffer -> { message.getHeaders().setContentLength(buffer.readableByteCount()); return message.writeWith(Mono.just(buffer) .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); }); } - Flux body = this.encoder.encode( - inputStream, message.bufferFactory(), elementType, contentType, hints); - if (isStreamingMediaType(contentType)) { return message.writeAndFlushWith(body.map(buffer -> Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release))); diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java index 3840bae308..aeb536cd6c 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.springframework.http.codec; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -112,7 +112,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter> encode(Publisher input, ResolvableType elementType, - MediaType mediaType, DataBufferFactory bufferFactory, Map hints) { + MediaType mediaType, DataBufferFactory factory, Map hints) { ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ? elementType.getGeneric() : elementType); @@ -144,13 +144,35 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter bufferMono = Mono.fromCallable(() -> - bufferFactory.join(encodeEvent(sb, data, dataType, mediaType, bufferFactory, hints))); + Flux result; + if (data == null) { + result = Flux.just(encodeText(sb + "\n", mediaType, factory)); + } + else if (data instanceof String) { + data = StringUtils.replace((String) data, "\n", "\ndata:"); + result = Flux.just(encodeText(sb + (String) data + "\n\n", mediaType, factory)); + } + else { + result = encodeEvent(sb.toString(), data, dataType, mediaType, factory, hints); + } - return bufferMono.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); }); } + @SuppressWarnings("unchecked") + private Flux encodeEvent(String eventContent, T data, ResolvableType dataType, + MediaType mediaType, DataBufferFactory factory, Map hints) { + + if (this.encoder == null) { + throw new CodecException("No SSE encoder configured and the data is not String."); + } + return Flux.just(factory.join(Arrays.asList( + encodeText(eventContent, mediaType, factory), + ((Encoder) this.encoder).encodeValue(data, factory, dataType, mediaType, hints), + encodeText("\n\n", mediaType, factory)))); + } + private void writeField(String fieldName, Object fieldValue, StringBuilder sb) { sb.append(fieldName); sb.append(':'); @@ -158,29 +180,6 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter List encodeEvent(CharSequence markup, @Nullable T data, ResolvableType dataType, - MediaType mediaType, DataBufferFactory factory, Map hints) { - - List result = new ArrayList<>(4); - result.add(encodeText(markup, mediaType, factory)); - if (data != null) { - if (data instanceof String) { - String dataLine = StringUtils.replace((String) data, "\n", "\ndata:") + "\n"; - result.add(encodeText(dataLine, mediaType, factory)); - } - else if (this.encoder == null) { - throw new CodecException("No SSE encoder configured and the data is not String."); - } - else { - result.add(((Encoder) this.encoder).encodeValue(data, factory, dataType, mediaType, hints)); - result.add(encodeText("\n", mediaType, factory)); - } - } - result.add(encodeText("\n", mediaType, factory)); - return result; - } - private DataBuffer encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) { Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset"); byte[] bytes = text.toString().getBytes(mediaType.getCharset()); diff --git a/spring-web/src/test/java/org/springframework/http/codec/EncoderHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/EncoderHttpMessageWriterTests.java index c7133f3f28..39e831b5fc 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/EncoderHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/EncoderHttpMessageWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -157,7 +157,7 @@ class EncoderHttpMessageWriterTests { void setContentLengthForMonoBody() { DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); DataBuffer buffer = factory.wrap("body".getBytes(StandardCharsets.UTF_8)); - configureEncoder(buffer, MimeTypeUtils.TEXT_PLAIN); + configureEncoder(Flux.just(buffer), MimeTypeUtils.TEXT_PLAIN); HttpMessageWriter writer = new EncoderHttpMessageWriter<>(this.encoder); writer.write(Mono.just("body"), forClass(String.class), TEXT_PLAIN, this.response, NO_HINTS).block();