EncoderHttpMessageWriter supports streaming MediaTypes
Support for flushing in EncoderHttpMessageWriter is now driven from a configurable list of "streaming" media types with the list including "application/stream+json" by default. As a result Jackson2ServerHttpMessageWriter is no longer needed.
This commit is contained in:
parent
f65544c192
commit
c8671041f1
|
|
@ -31,13 +31,6 @@ import org.springframework.util.MimeType;
|
|||
*/
|
||||
public abstract class AbstractEncoder<T> implements Encoder<T> {
|
||||
|
||||
/**
|
||||
* Hint key to use with a {@link FlushingStrategy} value.
|
||||
*/
|
||||
public static final String FLUSHING_STRATEGY_HINT = AbstractEncoder.class.getName() + ".flushingStrategy";
|
||||
|
||||
public enum FlushingStrategy { AUTO, AFTER_EACH_ELEMENT }
|
||||
|
||||
private final List<MimeType> encodableMimeTypes;
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
package org.springframework.http.codec;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
@ -35,9 +36,6 @@ import org.springframework.http.server.reactive.ServerHttpRequest;
|
|||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import static org.springframework.core.codec.AbstractEncoder.FLUSHING_STRATEGY_HINT;
|
||||
import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AFTER_EACH_ELEMENT;
|
||||
|
||||
/**
|
||||
* {@code HttpMessageWriter} that wraps and delegates to a {@link Encoder}.
|
||||
*
|
||||
|
|
@ -52,12 +50,22 @@ import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AF
|
|||
*/
|
||||
public class EncoderHttpMessageWriter<T> implements ServerHttpMessageWriter<T> {
|
||||
|
||||
/**
|
||||
* Default list of media types that signify a "streaming" scenario such that
|
||||
* there may be a time lag between items written and hence requires flushing.
|
||||
*/
|
||||
public static final List<MediaType> DEFAULT_STREAMING_MEDIA_TYPES =
|
||||
Collections.singletonList(MediaType.APPLICATION_STREAM_JSON);
|
||||
|
||||
|
||||
private final Encoder<T> encoder;
|
||||
|
||||
private final List<MediaType> mediaTypes;
|
||||
|
||||
private final MediaType defaultMediaType;
|
||||
|
||||
private final List<MediaType> streamingMediaTypes = new ArrayList<>(1);
|
||||
|
||||
|
||||
/**
|
||||
* Create an instance wrapping the given {@link Encoder}.
|
||||
|
|
@ -67,6 +75,7 @@ public class EncoderHttpMessageWriter<T> implements ServerHttpMessageWriter<T> {
|
|||
this.encoder = encoder;
|
||||
this.mediaTypes = MediaType.asMediaTypes(encoder.getEncodableMimeTypes());
|
||||
this.defaultMediaType = initDefaultMediaType(this.mediaTypes);
|
||||
this.streamingMediaTypes.addAll(DEFAULT_STREAMING_MEDIA_TYPES);
|
||||
}
|
||||
|
||||
private static MediaType initDefaultMediaType(List<MediaType> mediaTypes) {
|
||||
|
|
@ -86,6 +95,23 @@ public class EncoderHttpMessageWriter<T> implements ServerHttpMessageWriter<T> {
|
|||
return this.mediaTypes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure "streaming" media types for which flushing should be performed
|
||||
* automatically vs at the end of the input stream.
|
||||
* <p>By default this is set to {@link #DEFAULT_STREAMING_MEDIA_TYPES}.
|
||||
* @param mediaTypes one or more media types to add to the list
|
||||
*/
|
||||
public void setStreamingMediaTypes(List<MediaType> mediaTypes) {
|
||||
this.streamingMediaTypes.addAll(mediaTypes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured list of "streaming" media types.
|
||||
*/
|
||||
public List<MediaType> getStreamingMediaTypes() {
|
||||
return Collections.unmodifiableList(this.streamingMediaTypes);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
|
||||
|
|
@ -111,8 +137,9 @@ public class EncoderHttpMessageWriter<T> implements ServerHttpMessageWriter<T> {
|
|||
Flux<DataBuffer> body = this.encoder.encode(inputStream,
|
||||
outputMessage.bufferFactory(), elementType, headers.getContentType(), hints);
|
||||
|
||||
return (hints.get(FLUSHING_STRATEGY_HINT) == AFTER_EACH_ELEMENT ?
|
||||
outputMessage.writeAndFlushWith(body.map(Flux::just)) : outputMessage.writeWith(body));
|
||||
return isStreamingMediaType(headers.getContentType()) ?
|
||||
outputMessage.writeAndFlushWith(body.map(Flux::just)) :
|
||||
outputMessage.writeWith(body);
|
||||
}
|
||||
|
||||
private static boolean useFallback(MediaType main, MediaType fallback) {
|
||||
|
|
@ -127,6 +154,10 @@ public class EncoderHttpMessageWriter<T> implements ServerHttpMessageWriter<T> {
|
|||
return main;
|
||||
}
|
||||
|
||||
private boolean isStreamingMediaType(MediaType contentType) {
|
||||
return this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith);
|
||||
}
|
||||
|
||||
|
||||
// ServerHttpMessageWriter...
|
||||
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.codec;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.codec.AbstractEncoder;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ReactiveHttpOutputMessage;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
|
||||
import static org.springframework.core.codec.AbstractEncoder.FLUSHING_STRATEGY_HINT;
|
||||
import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AFTER_EACH_ELEMENT;
|
||||
|
||||
/**
|
||||
* Jackson {@link ServerHttpMessageWriter} that resolves {@code @JsonView} annotated handler
|
||||
* method and deals with {@link AbstractEncoder#FLUSHING_STRATEGY_HINT}.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @since 5.0
|
||||
* @see com.fasterxml.jackson.annotation.JsonView
|
||||
*/
|
||||
public class Jackson2ServerHttpMessageWriter extends EncoderHttpMessageWriter<Object> {
|
||||
|
||||
|
||||
public Jackson2ServerHttpMessageWriter(Encoder<Object> encoder) {
|
||||
super(encoder);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Void> write(Publisher<?> inputStream, ResolvableType elementType, MediaType mediaType,
|
||||
ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
|
||||
|
||||
if ((mediaType != null) && mediaType.isCompatibleWith(MediaType.APPLICATION_STREAM_JSON)) {
|
||||
Map<String, Object> hintsWithFlush = new HashMap<>(hints);
|
||||
hintsWithFlush.put(FLUSHING_STRATEGY_HINT, AFTER_EACH_ELEMENT);
|
||||
return super.write(inputStream, elementType, mediaType, outputMessage, hintsWithFlush);
|
||||
}
|
||||
return super.write(inputStream, elementType, mediaType, outputMessage, hints);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> write(Publisher<?> inputStream, ResolvableType streamType, ResolvableType elementType,
|
||||
MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response, Map<String, Object> hints) {
|
||||
|
||||
if ((mediaType != null) && mediaType.isCompatibleWith(MediaType.APPLICATION_STREAM_JSON)) {
|
||||
Map<String, Object> hintsWithFlush = new HashMap<>(hints);
|
||||
hintsWithFlush.put(FLUSHING_STRATEGY_HINT, AFTER_EACH_ELEMENT);
|
||||
return super.write(inputStream, streamType, elementType, mediaType, request, response, hintsWithFlush);
|
||||
}
|
||||
return super.write(inputStream, streamType, elementType, mediaType, request, response, hints);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -50,7 +50,6 @@ import org.springframework.http.codec.DecoderHttpMessageReader;
|
|||
import org.springframework.http.codec.EncoderHttpMessageWriter;
|
||||
import org.springframework.http.codec.HttpMessageReader;
|
||||
import org.springframework.http.codec.HttpMessageWriter;
|
||||
import org.springframework.http.codec.Jackson2ServerHttpMessageWriter;
|
||||
import org.springframework.http.codec.ResourceHttpMessageWriter;
|
||||
import org.springframework.http.codec.ServerSentEventHttpMessageWriter;
|
||||
import org.springframework.http.codec.json.Jackson2JsonDecoder;
|
||||
|
|
@ -487,7 +486,7 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware {
|
|||
}
|
||||
if (jackson2Present) {
|
||||
Jackson2JsonEncoder encoder = new Jackson2JsonEncoder();
|
||||
writers.add(new Jackson2ServerHttpMessageWriter(encoder));
|
||||
writers.add(new EncoderHttpMessageWriter<>(encoder));
|
||||
sseDataEncoders.add(encoder);
|
||||
}
|
||||
writers.add(new ServerSentEventHttpMessageWriter(sseDataEncoders));
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ import org.springframework.http.codec.EncoderHttpMessageWriter;
|
|||
import org.springframework.http.codec.FormHttpMessageReader;
|
||||
import org.springframework.http.codec.HttpMessageReader;
|
||||
import org.springframework.http.codec.HttpMessageWriter;
|
||||
import org.springframework.http.codec.Jackson2ServerHttpMessageWriter;
|
||||
import org.springframework.http.codec.ResourceHttpMessageWriter;
|
||||
import org.springframework.http.codec.ServerSentEventHttpMessageWriter;
|
||||
import org.springframework.http.codec.json.Jackson2JsonDecoder;
|
||||
|
|
@ -99,7 +98,7 @@ class DefaultHandlerStrategiesBuilder implements HandlerStrategies.Builder {
|
|||
if (jackson2Present) {
|
||||
messageReader(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder()));
|
||||
Jackson2JsonEncoder jsonEncoder = new Jackson2JsonEncoder();
|
||||
messageWriter(new Jackson2ServerHttpMessageWriter(jsonEncoder));
|
||||
messageWriter(new EncoderHttpMessageWriter<>(jsonEncoder));
|
||||
messageWriter(
|
||||
new ServerSentEventHttpMessageWriter(Collections.singletonList(jsonEncoder)));
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue