Restore use of Flux-based encode method in HTTP

Closes gh-24441
This commit is contained in:
Rossen Stoyanchev 2020-02-07 21:44:44 +00:00
parent a03a116f6b
commit df706f4c7c
5 changed files with 45 additions and 48 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -78,9 +78,9 @@ public interface Decoder<T> {
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints);
/**
* Decode a data buffer to an Object of type T. This is useful when the input
* stream consists of discrete messages (or events) and the content for each
* can be decoded on its own.
* Decode a data buffer to an Object of type T. This is useful for scenarios,
* that distinct messages (or events) are decoded and handled individually,
* in fully aggregated form.
* @param buffer the {@code DataBuffer} to decode
* @param targetType the expected output type
* @param mimeType the MIME type associated with the data

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -68,9 +68,9 @@ public interface Encoder<T> {
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints);
/**
* Encode an Object of type T to a data buffer. This is useful for scenarios
* that produce a stream of discrete messages (or events) and the
* content for each is encoded individually.
* Encode an Object of type T to a data buffer. This is useful for scenarios,
* that distinct messages (or events) are encoded and handled individually,
* in fully aggregated form.
* <p>By default this method raises {@link UnsupportedOperationException}
* and it is expected that some encoders cannot produce a single buffer or
* cannot do so synchronously (e.g. encoding a {@code Resource}).

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,7 +29,6 @@ import org.springframework.core.codec.AbstractEncoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpLogging;
import org.springframework.http.MediaType;
@ -114,24 +113,23 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
MediaType contentType = updateContentType(message, mediaType);
Flux<DataBuffer> body = this.encoder.encode(
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (inputStream instanceof Mono) {
return Mono.from(inputStream)
return body
.singleOrEmpty()
.switchIfEmpty(Mono.defer(() -> {
message.getHeaders().setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(value -> {
DataBufferFactory factory = message.bufferFactory();
DataBuffer buffer = this.encoder.encodeValue(value, factory, elementType, contentType, hints);
.flatMap(buffer -> {
message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
}
Flux<DataBuffer> body = this.encoder.encode(
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer ->
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ package org.springframework.http.codec;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -112,7 +112,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Objec
}
private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType,
MediaType mediaType, DataBufferFactory bufferFactory, Map<String, Object> hints) {
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ?
elementType.getGeneric() : elementType);
@ -144,13 +144,35 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Objec
sb.append("data:");
}
Mono<DataBuffer> bufferMono = Mono.fromCallable(() ->
bufferFactory.join(encodeEvent(sb, data, dataType, mediaType, bufferFactory, hints)));
Flux<DataBuffer> result;
if (data == null) {
result = Flux.just(encodeText(sb + "\n", mediaType, factory));
}
else if (data instanceof String) {
data = StringUtils.replace((String) data, "\n", "\ndata:");
result = Flux.just(encodeText(sb + (String) data + "\n\n", mediaType, factory));
}
else {
result = encodeEvent(sb.toString(), data, dataType, mediaType, factory, hints);
}
return bufferMono.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
}
@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> encodeEvent(String eventContent, T data, ResolvableType dataType,
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
if (this.encoder == null) {
throw new CodecException("No SSE encoder configured and the data is not String.");
}
return Flux.just(factory.join(Arrays.asList(
encodeText(eventContent, mediaType, factory),
((Encoder<T>) this.encoder).encodeValue(data, factory, dataType, mediaType, hints),
encodeText("\n\n", mediaType, factory))));
}
private void writeField(String fieldName, Object fieldValue, StringBuilder sb) {
sb.append(fieldName);
sb.append(':');
@ -158,29 +180,6 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Objec
sb.append("\n");
}
@SuppressWarnings("unchecked")
private <T> List<DataBuffer> encodeEvent(CharSequence markup, @Nullable T data, ResolvableType dataType,
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
List<DataBuffer> result = new ArrayList<>(4);
result.add(encodeText(markup, mediaType, factory));
if (data != null) {
if (data instanceof String) {
String dataLine = StringUtils.replace((String) data, "\n", "\ndata:") + "\n";
result.add(encodeText(dataLine, mediaType, factory));
}
else if (this.encoder == null) {
throw new CodecException("No SSE encoder configured and the data is not String.");
}
else {
result.add(((Encoder<T>) this.encoder).encodeValue(data, factory, dataType, mediaType, hints));
result.add(encodeText("\n", mediaType, factory));
}
}
result.add(encodeText("\n", mediaType, factory));
return result;
}
private DataBuffer encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
byte[] bytes = text.toString().getBytes(mediaType.getCharset());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -157,7 +157,7 @@ class EncoderHttpMessageWriterTests {
void setContentLengthForMonoBody() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DataBuffer buffer = factory.wrap("body".getBytes(StandardCharsets.UTF_8));
configureEncoder(buffer, MimeTypeUtils.TEXT_PLAIN);
configureEncoder(Flux.just(buffer), MimeTypeUtils.TEXT_PLAIN);
HttpMessageWriter<String> writer = new EncoderHttpMessageWriter<>(this.encoder);
writer.write(Mono.just("body"), forClass(String.class), TEXT_PLAIN, this.response, NO_HINTS).block();