parent
f89d2ac148
commit
181482fa15
|
|
@ -52,15 +52,21 @@ public class ByteArrayEncoder extends AbstractEncoder<byte[]> {
|
|||
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
|
||||
@Nullable Map<String, Object> hints) {
|
||||
|
||||
// The following (byte[] bytes) lambda signature declaration is necessary for Eclipse.
|
||||
return Flux.from(inputStream).map((byte[] bytes) -> {
|
||||
DataBuffer dataBuffer = bufferFactory.wrap(bytes);
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||
String logPrefix = Hints.getLogPrefix(hints);
|
||||
logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
|
||||
}
|
||||
return dataBuffer;
|
||||
});
|
||||
// Use (byte[] bytes) for Eclipse
|
||||
return Flux.from(inputStream).map((byte[] bytes) ->
|
||||
encodeValue(bytes, bufferFactory, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer encodeValue(byte[] bytes, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
DataBuffer dataBuffer = bufferFactory.wrap(bytes);
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||
String logPrefix = Hints.getLogPrefix(hints);
|
||||
logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
|
||||
}
|
||||
return dataBuffer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -53,14 +53,20 @@ public class ByteBufferEncoder extends AbstractEncoder<ByteBuffer> {
|
|||
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
|
||||
@Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream).map(byteBuffer -> {
|
||||
DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||
String logPrefix = Hints.getLogPrefix(hints);
|
||||
logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
|
||||
}
|
||||
return dataBuffer;
|
||||
});
|
||||
return Flux.from(inputStream).map(byteBuffer ->
|
||||
encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer encodeValue(ByteBuffer byteBuffer, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||
String logPrefix = Hints.getLogPrefix(hints);
|
||||
logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
|
||||
}
|
||||
return dataBuffer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -71,32 +71,37 @@ public final class CharSequenceEncoder extends AbstractEncoder<CharSequence> {
|
|||
DataBufferFactory bufferFactory, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
Charset charset = getCharset(mimeType);
|
||||
return Flux.from(inputStream).map(charSequence ->
|
||||
encodeValue(charSequence, bufferFactory, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
return Flux.from(inputStream).map(charSequence -> {
|
||||
if (!Hints.isLoggingSuppressed(hints)) {
|
||||
LogFormatUtils.traceDebug(logger, traceOn -> {
|
||||
String formatted = LogFormatUtils.formatValue(charSequence, !traceOn);
|
||||
return Hints.getLogPrefix(hints) + "Writing " + formatted;
|
||||
});
|
||||
@Override
|
||||
public DataBuffer encodeValue(CharSequence charSequence, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
if (!Hints.isLoggingSuppressed(hints)) {
|
||||
LogFormatUtils.traceDebug(logger, traceOn -> {
|
||||
String formatted = LogFormatUtils.formatValue(charSequence, !traceOn);
|
||||
return Hints.getLogPrefix(hints) + "Writing " + formatted;
|
||||
});
|
||||
}
|
||||
boolean release = true;
|
||||
Charset charset = getCharset(mimeType);
|
||||
int capacity = calculateCapacity(charSequence, charset);
|
||||
DataBuffer dataBuffer = bufferFactory.allocateBuffer(capacity);
|
||||
try {
|
||||
dataBuffer.write(charSequence, charset);
|
||||
release = false;
|
||||
}
|
||||
catch (CoderMalfunctionError ex) {
|
||||
throw new EncodingException("String encoding error: " + ex.getMessage(), ex);
|
||||
}
|
||||
finally {
|
||||
if (release) {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
boolean release = true;
|
||||
int capacity = calculateCapacity(charSequence, charset);
|
||||
DataBuffer dataBuffer = bufferFactory.allocateBuffer(capacity);
|
||||
try {
|
||||
dataBuffer.write(charSequence, charset);
|
||||
release = false;
|
||||
}
|
||||
catch (CoderMalfunctionError ex) {
|
||||
throw new EncodingException("String encoding error: " + ex.getMessage(), ex);
|
||||
}
|
||||
finally {
|
||||
if (release) {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
}
|
||||
return dataBuffer;
|
||||
});
|
||||
}
|
||||
return dataBuffer;
|
||||
}
|
||||
|
||||
int calculateCapacity(CharSequence sequence, Charset charset) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -53,15 +53,25 @@ public class DataBufferEncoder extends AbstractEncoder<DataBuffer> {
|
|||
@Nullable Map<String, Object> hints) {
|
||||
|
||||
Flux<DataBuffer> flux = Flux.from(inputStream);
|
||||
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||
flux = flux.doOnNext(buffer -> {
|
||||
String logPrefix = Hints.getLogPrefix(hints);
|
||||
logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes");
|
||||
});
|
||||
flux = flux.doOnNext(buffer -> logValue(buffer, hints));
|
||||
}
|
||||
|
||||
return flux;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer encodeValue(DataBuffer buffer, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||
logValue(buffer, hints);
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private void logValue(DataBuffer buffer, @Nullable Map<String, Object> hints) {
|
||||
String logPrefix = Hints.getLogPrefix(hints);
|
||||
logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,13 +60,35 @@ public interface Encoder<T> {
|
|||
* @param elementType the expected type of elements in the input stream;
|
||||
* this type must have been previously passed to the {@link #canEncode}
|
||||
* method and it must have returned {@code true}.
|
||||
* @param mimeType the MIME type for the output stream (optional)
|
||||
* @param hints additional information about how to do encode
|
||||
* @param mimeType the MIME type for the output content (optional)
|
||||
* @param hints additional information about how to encode
|
||||
* @return the output stream
|
||||
*/
|
||||
Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
|
||||
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints);
|
||||
|
||||
/**
|
||||
* Encode an Object of type T to a data buffer. This is useful for scenarios
|
||||
* that produce a stream of discrete messages (or events) and the
|
||||
* content for each is encoded individually.
|
||||
* <p>By default this method raises {@link UnsupportedOperationException}
|
||||
* and it is expected that some encoders cannot produce a single buffer or
|
||||
* cannot do so synchronously (e.g. encoding a {@code Resource}).
|
||||
* @param value the value to be encoded
|
||||
* @param bufferFactory for creating the output {@code DataBuffer}
|
||||
* @param valueType the type for the value being encoded
|
||||
* @param mimeType the MIME type for the output content (optional)
|
||||
* @param hints additional information about how to encode
|
||||
* @return the encoded content
|
||||
* @since 5.2
|
||||
*/
|
||||
default DataBuffer encodeValue(T value, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
// It may not be possible to produce a single DataBuffer synchronously
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the list of mime types this encoder supports.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -441,6 +441,10 @@ public abstract class DataBufferUtils {
|
|||
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
|
||||
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
|
||||
|
||||
if (dataBuffers instanceof Mono) {
|
||||
return (Mono<DataBuffer>) dataBuffers;
|
||||
}
|
||||
|
||||
return Flux.from(dataBuffers)
|
||||
.collectList()
|
||||
.filter(list -> !list.isEmpty())
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
|
||||
if (inputStream instanceof Mono) {
|
||||
return Mono.from(inputStream).map(value ->
|
||||
encodeValue(value, mimeType, bufferFactory, elementType, hints, encoding)).flux();
|
||||
encodeValue(value, bufferFactory, elementType, mimeType, hints, encoding)).flux();
|
||||
}
|
||||
else {
|
||||
return this.streamingMediaTypes.stream()
|
||||
|
|
@ -129,7 +129,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
byte[] separator = STREAM_SEPARATORS.getOrDefault(mediaType, NEWLINE_SEPARATOR);
|
||||
return Flux.from(inputStream).map(value -> {
|
||||
DataBuffer buffer = encodeValue(
|
||||
value, mimeType, bufferFactory, elementType, hints, encoding);
|
||||
value, bufferFactory, elementType, mimeType, hints, encoding);
|
||||
if (separator != null) {
|
||||
buffer.write(separator);
|
||||
}
|
||||
|
|
@ -139,13 +139,20 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
.orElseGet(() -> {
|
||||
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
|
||||
return Flux.from(inputStream).collectList().map(list ->
|
||||
encodeValue(list, mimeType, bufferFactory, listType, hints, encoding)).flux();
|
||||
encodeValue(list, bufferFactory, listType, mimeType, hints, encoding)).flux();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private DataBuffer encodeValue(Object value, @Nullable MimeType mimeType, DataBufferFactory bufferFactory,
|
||||
ResolvableType elementType, @Nullable Map<String, Object> hints, JsonEncoding encoding) {
|
||||
@Override
|
||||
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return encodeValue(value, bufferFactory, valueType, mimeType, hints, getJsonEncoding(mimeType));
|
||||
}
|
||||
|
||||
private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, ResolvableType valueType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints, JsonEncoding encoding) {
|
||||
|
||||
if (!Hints.isLoggingSuppressed(hints)) {
|
||||
LogFormatUtils.traceDebug(logger, traceOn -> {
|
||||
|
|
@ -154,7 +161,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
});
|
||||
}
|
||||
|
||||
JavaType javaType = getJavaType(elementType.getType(), null);
|
||||
JavaType javaType = getJavaType(valueType.getType(), null);
|
||||
Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null);
|
||||
ObjectWriter writer = (jsonView != null ?
|
||||
getObjectMapper().writerWithView(jsonView) : getObjectMapper().writer());
|
||||
|
|
@ -163,7 +170,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
writer = writer.forType(javaType);
|
||||
}
|
||||
|
||||
writer = customizeWriter(writer, mimeType, elementType, hints);
|
||||
writer = customizeWriter(writer, mimeType, valueType, hints);
|
||||
|
||||
DataBuffer buffer = bufferFactory.allocateBuffer();
|
||||
boolean release = true;
|
||||
|
|
|
|||
|
|
@ -73,29 +73,39 @@ public class ProtobufEncoder extends ProtobufCodecSupport implements HttpMessage
|
|||
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
|
||||
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream)
|
||||
.map(message -> {
|
||||
DataBuffer buffer = bufferFactory.allocateBuffer();
|
||||
boolean release = true;
|
||||
try {
|
||||
if (!(inputStream instanceof Mono)) {
|
||||
message.writeDelimitedTo(buffer.asOutputStream());
|
||||
}
|
||||
else {
|
||||
message.writeTo(buffer.asOutputStream());
|
||||
}
|
||||
release = false;
|
||||
return buffer;
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
|
||||
}
|
||||
finally {
|
||||
if (release) {
|
||||
DataBufferUtils.release(buffer);
|
||||
}
|
||||
}
|
||||
});
|
||||
return Flux.from(inputStream).map(message ->
|
||||
encodeValue(message, bufferFactory, !(inputStream instanceof Mono)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return encodeValue(message, bufferFactory, false);
|
||||
}
|
||||
|
||||
private DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, boolean delimited) {
|
||||
|
||||
DataBuffer buffer = bufferFactory.allocateBuffer();
|
||||
boolean release = true;
|
||||
try {
|
||||
if (delimited) {
|
||||
message.writeDelimitedTo(buffer.asOutputStream());
|
||||
}
|
||||
else {
|
||||
message.writeTo(buffer.asOutputStream());
|
||||
}
|
||||
release = false;
|
||||
return buffer;
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
|
||||
}
|
||||
finally {
|
||||
if (release) {
|
||||
DataBufferUtils.release(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -99,7 +99,15 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder<Object> {
|
|||
|
||||
@Override
|
||||
protected Flux<DataBuffer> encode(Object value, DataBufferFactory bufferFactory,
|
||||
ResolvableType type, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
// we're relying on doOnDiscard in base class
|
||||
return Mono.fromCallable(() -> encodeValue(value, bufferFactory, valueType, mimeType, hints)).flux();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
|
||||
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
if (!Hints.isLoggingSuppressed(hints)) {
|
||||
LogFormatUtils.traceDebug(logger, traceOn -> {
|
||||
|
|
@ -108,30 +116,27 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder<Object> {
|
|||
});
|
||||
}
|
||||
|
||||
return Flux.defer(() -> {
|
||||
boolean release = true;
|
||||
DataBuffer buffer = bufferFactory.allocateBuffer(1024);
|
||||
try {
|
||||
OutputStream outputStream = buffer.asOutputStream();
|
||||
Class<?> clazz = ClassUtils.getUserClass(value);
|
||||
Marshaller marshaller = initMarshaller(clazz);
|
||||
marshaller.marshal(value, outputStream);
|
||||
release = false;
|
||||
return Mono.fromCallable(() -> buffer); // relying on doOnDiscard in base class
|
||||
boolean release = true;
|
||||
DataBuffer buffer = bufferFactory.allocateBuffer(1024);
|
||||
try {
|
||||
OutputStream outputStream = buffer.asOutputStream();
|
||||
Class<?> clazz = ClassUtils.getUserClass(value);
|
||||
Marshaller marshaller = initMarshaller(clazz);
|
||||
marshaller.marshal(value, outputStream);
|
||||
release = false;
|
||||
return buffer;
|
||||
}
|
||||
catch (MarshalException ex) {
|
||||
throw new EncodingException("Could not marshal " + value.getClass() + " to XML", ex);
|
||||
}
|
||||
catch (JAXBException ex) {
|
||||
throw new CodecException("Invalid JAXB configuration", ex);
|
||||
}
|
||||
finally {
|
||||
if (release) {
|
||||
DataBufferUtils.release(buffer);
|
||||
}
|
||||
catch (MarshalException ex) {
|
||||
return Flux.error(new EncodingException(
|
||||
"Could not marshal " + value.getClass() + " to XML", ex));
|
||||
}
|
||||
catch (JAXBException ex) {
|
||||
return Flux.error(new CodecException("Invalid JAXB configuration", ex));
|
||||
}
|
||||
finally {
|
||||
if (release) {
|
||||
DataBufferUtils.release(buffer);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Marshaller initMarshaller(Class<?> clazz) throws JAXBException {
|
||||
|
|
|
|||
Loading…
Reference in New Issue