Add codecs for JSON support with Protobuf

Prior to this commit, WebFlux had Protobuf codecs for managing the
`Message`/`"application/x-protobuf"` encoding and decoding.
The `com.google.protobuf:protobuf-java-util` library has additional
support for JSON (de)serialization, but this is not supported by
existing codecs.

This commit adds the new `ProtobufJsonEncode` and `ProtobufJsonDecoder`
classes that support this use case. Note, the `ProtobufJsonDecoder` has
a significant limitation: it cannot decode JSON arrays as
`Flux<Message>` because there is no available non-blocking parser able
to tokenize JSON arrays into streams of `Databuffer`. Instead,
applications should decode to `Mono<List<Message>>` which causes
additional buffering but is properly supported.

Closes gh-25457
This commit is contained in:
Brian Clozel 2024-06-18 15:48:46 +02:00
parent 24bbc6d80d
commit 17abb4d25d
6 changed files with 560 additions and 5 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -47,7 +47,7 @@ import org.springframework.util.MimeType;
*
* <p>To generate {@code Message} Java classes, you need to install the {@code protoc} binary.
*
* <p>This encoder requires Protobuf 3 or higher, and supports
* <p>This encoder requires Protobuf 3.29 or higher, and supports
* {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official
* {@code "com.google.protobuf:protobuf-java"} library.
*

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,8 +28,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.EncodingException;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.EncoderHttpMessageWriter;
@ -97,7 +97,7 @@ public class ProtobufHttpMessageWriter extends EncoderHttpMessageWriter<Message>
return super.write(inputStream, elementType, mediaType, message, hints);
}
catch (Exception ex) {
return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
return Mono.error(new EncodingException("Could not write Protobuf message: " + ex.getMessage(), ex));
}
}

View File

@ -0,0 +1,173 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.protobuf;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.MimeType;
/**
* A {@code Decoder} that reads a JSON byte stream and converts it to
* <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>
* {@link com.google.protobuf.Message}s.
*
* <p>Flux deserialized via
* {@link #decode(Publisher, ResolvableType, MimeType, Map)} are not supported because
* the Protobuf Java Util library does not provide a non-blocking parser
* that splits a JSON stream into tokens.
* Applications should consider decoding to {@code Mono<Message>} or
* {@code Mono<List<Message>>}, which will use the supported
* {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)}.
*
* <p>To generate {@code Message} Java classes, you need to install the
* {@code protoc} binary.
*
* <p>This decoder requires Protobuf 3.29 or higher, and supports
* {@code "application/json"} and {@code "application/*+json"} with
* the official {@code "com.google.protobuf:protobuf-java-util"} library.
*
* @author Brian Clozel
* @since 6.2
* @see ProtobufJsonEncoder
*/
public class ProtobufJsonDecoder implements Decoder<Message> {
/** The default max size for aggregating messages. */
protected static final int DEFAULT_MESSAGE_MAX_SIZE = 256 * 1024;
private static final List<MimeType> defaultMimeTypes = List.of(MediaType.APPLICATION_JSON,
new MediaType("application", "*+json"));
private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();
private final JsonFormat.Parser parser;
private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
/**
* Construct a new {@link ProtobufJsonDecoder} using a default {@link JsonFormat.Parser} instance.
*/
public ProtobufJsonDecoder() {
this(JsonFormat.parser());
}
/**
* Construct a new {@link ProtobufJsonDecoder} using the given {@link JsonFormat.Parser} instance.
*/
public ProtobufJsonDecoder(JsonFormat.Parser parser) {
this.parser = parser;
}
/**
* Return the {@link #setMaxMessageSize configured} message size limit.
*/
public int getMaxMessageSize() {
return this.maxMessageSize;
}
/**
* The max size allowed per message.
* <p>By default, this is set to 256K.
* @param maxMessageSize the max size per message, or -1 for unlimited
*/
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}
private static boolean supportsMimeType(@Nullable MimeType mimeType) {
if (mimeType == null) {
return false;
}
for (MimeType m : defaultMimeTypes) {
if (m.isCompatibleWith(mimeType)) {
return true;
}
}
return false;
}
@Override
public List<MimeType> getDecodableMimeTypes() {
return defaultMimeTypes;
}
@Override
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.error(new UnsupportedOperationException("Protobuf decoder does not support Flux, use Mono<List<...>> instead."));
}
@Override
public Message decode(DataBuffer dataBuffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
try {
Message.Builder builder = getMessageBuilder(targetType.toClass());
this.parser.merge(new InputStreamReader(dataBuffer.asInputStream()), builder);
return builder.build();
}
catch (Exception ex) {
throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
}
finally {
DataBufferUtils.release(dataBuffer);
}
}
/**
* Create a new {@code Message.Builder} instance for the given class.
* <p>This method uses a ConcurrentHashMap for caching method lookups.
*/
private static Message.Builder getMessageBuilder(Class<?> clazz) throws Exception {
Method method = methodCache.get(clazz);
if (method == null) {
method = clazz.getMethod("newBuilder");
methodCache.put(clazz, method);
}
return (Message.Builder) method.invoke(clazz);
}
@Override
public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(inputStream, this.maxMessageSize)
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints))
.onErrorMap(DataBufferLimitException.class, exc -> new DecodingException("Could not decode JSON as Protobuf message", exc));
}
}

View File

@ -0,0 +1,173 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.protobuf;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageEncoder;
import org.springframework.lang.Nullable;
import org.springframework.util.FastByteArrayOutputStream;
import org.springframework.util.MimeType;
/**
* A {@code Encoder} that writes {@link com.google.protobuf.Message}s as JSON.
*
* <p>To generate {@code Message} Java classes, you need to install the
* {@code protoc} binary.
*
* <p>This encoder requires Protobuf 3.29 or higher, and supports
* {@code "application/json"} and {@code "application/*+json"} with
* the official {@code "com.google.protobuf:protobuf-java-util"} library.
*
* @author Brian Clozel
* @since 6.2
* @see ProtobufJsonDecoder
*/
public class ProtobufJsonEncoder implements HttpMessageEncoder<Message> {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final ResolvableType MESSAGE_TYPE = ResolvableType.forClass(Message.class);
private static final List<MimeType> defaultMimeTypes = List.of(
MediaType.APPLICATION_JSON,
new MediaType("application", "*+json"));
private final JsonFormat.Printer printer;
/**
* Construct a new {@link ProtobufJsonEncoder} using a default {@link JsonFormat.Printer} instance.
*/
public ProtobufJsonEncoder() {
this(JsonFormat.printer());
}
/**
* Construct a new {@link ProtobufJsonEncoder} using the given {@link JsonFormat.Printer} instance.
*/
public ProtobufJsonEncoder(JsonFormat.Printer printer) {
this.printer = printer;
}
@Override
public List<MediaType> getStreamingMediaTypes() {
return List.of(MediaType.APPLICATION_NDJSON);
}
@Override
public List<MimeType> getEncodableMimeTypes() {
return defaultMimeTypes;
}
@Override
public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) {
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}
private static boolean supportsMimeType(@Nullable MimeType mimeType) {
if (mimeType == null) {
return false;
}
for (MimeType m : defaultMimeTypes) {
if (m.isCompatibleWith(mimeType)) {
return true;
}
}
return false;
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (inputStream instanceof Mono) {
return Mono.from(inputStream)
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
.flux();
}
JsonArrayJoinHelper helper = new JsonArrayJoinHelper();
// Do not prepend JSON array prefix until first signal is known, onNext vs onError
// Keeps response not committed for error handling
return Flux.from(inputStream)
.map(value -> {
byte[] prefix = helper.getPrefix();
byte[] delimiter = helper.getDelimiter();
DataBuffer dataBuffer = encodeValue(value, bufferFactory, MESSAGE_TYPE, mimeType, hints);
return (prefix.length > 0 ?
bufferFactory.join(List.of(bufferFactory.wrap(prefix), bufferFactory.wrap(delimiter), dataBuffer)) :
bufferFactory.join(List.of(bufferFactory.wrap(delimiter), dataBuffer)));
})
.switchIfEmpty(Mono.fromCallable(() -> bufferFactory.wrap(helper.getPrefix())))
.concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix())));
}
@Override
public DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(bos, StandardCharsets.UTF_8);
try {
this.printer.appendTo(message, writer);
writer.flush();
byte[] bytes = bos.toByteArrayUnsafe();
return bufferFactory.wrap(bytes);
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
}
}
private static class JsonArrayJoinHelper {
private static final byte[] COMMA_SEPARATOR = {','};
private static final byte[] OPEN_BRACKET = {'['};
private static final byte[] CLOSE_BRACKET = {']'};
private boolean firstItemEmitted;
public byte[] getDelimiter() {
if (this.firstItemEmitted) {
return COMMA_SEPARATOR;
}
this.firstItemEmitted = true;
return EMPTY_BYTES;
}
public byte[] getPrefix() {
return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET);
}
public byte[] getSuffix() {
return CLOSE_BRACKET;
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.protobuf;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.testfixture.codec.AbstractDecoderTests;
import org.springframework.http.MediaType;
import org.springframework.protobuf.Msg;
import org.springframework.protobuf.SecondMsg;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link ProtobufJsonDecoder}.
* @author Brian Clozel
*/
public class ProtobufJsonDecoderTests extends AbstractDecoderTests<ProtobufJsonDecoder> {
private Msg msg1 = Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build();
public ProtobufJsonDecoderTests() {
super(new ProtobufJsonDecoder());
}
@Test
@Override
protected void canDecode() throws Exception {
ResolvableType msgType = ResolvableType.forClass(Msg.class);
assertThat(this.decoder.canDecode(msgType, null)).isFalse();
assertThat(this.decoder.canDecode(msgType, MediaType.APPLICATION_JSON)).isTrue();
assertThat(this.decoder.canDecode(msgType, MediaType.APPLICATION_PROTOBUF)).isFalse();
assertThat(this.decoder.canDecode(ResolvableType.forClass(Object.class), MediaType.APPLICATION_JSON)).isFalse();
}
@Test
@Override
protected void decode() throws Exception {
ResolvableType msgType = ResolvableType.forClass(Msg.class);
Flux<DataBuffer> input = Flux.just(dataBuffer("[{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"),
dataBuffer(",{\"foo\":\"Bar\",\"blah\":{\"blah\":456}}"),
dataBuffer("]"));
testDecode(input, msgType, step -> step.consumeErrorWith(error -> assertThat(error).isInstanceOf(UnsupportedOperationException.class)),
MediaType.APPLICATION_JSON, null);
}
@Test
@Override
protected void decodeToMono() throws Exception {
DataBuffer dataBuffer = dataBuffer("{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}");
testDecodeToMonoAll(Mono.just(dataBuffer), Msg.class, step -> step
.expectNext(this.msg1)
.verifyComplete());
}
@Test
void exceedMaxSize() {
this.decoder.setMaxMessageSize(1);
DataBuffer first = dataBuffer("{\"foo\":\"Foo\",");
DataBuffer second = dataBuffer("\"blah\":{\"blah\":123}}");
testDecodeToMono(Flux.just(first, second), Msg.class, step -> step.verifyError(DecodingException.class));
}
private DataBuffer dataBuffer(String json) {
return this.bufferFactory.wrap(json.getBytes());
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.protobuf;
import java.nio.charset.StandardCharsets;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.testfixture.codec.AbstractEncoderTests;
import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils;
import org.springframework.http.MediaType;
import org.springframework.protobuf.Msg;
import org.springframework.protobuf.SecondMsg;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.core.ResolvableType.forClass;
/**
* Tests for {@link ProtobufJsonEncoder}.
* @author Brian Clozel
*/
class ProtobufJsonEncoderTests extends AbstractEncoderTests<ProtobufJsonEncoder> {
private Msg msg1 =
Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build();
private Msg msg2 =
Msg.newBuilder().setFoo("Bar").setBlah(SecondMsg.newBuilder().setBlah(456).build()).build();
public ProtobufJsonEncoderTests() {
super(new ProtobufJsonEncoder(JsonFormat.printer().omittingInsignificantWhitespace()));
}
@Override
@Test
protected void canEncode() throws Exception {
assertThat(this.encoder.canEncode(forClass(Msg.class), null)).isFalse();
assertThat(this.encoder.canEncode(forClass(Msg.class), MediaType.APPLICATION_JSON)).isTrue();
assertThat(this.encoder.canEncode(forClass(Msg.class), MediaType.APPLICATION_NDJSON)).isFalse();
assertThat(this.encoder.canEncode(forClass(Object.class), MediaType.APPLICATION_JSON)).isFalse();
}
@Override
@Test
protected void encode() throws Exception {
Mono<Message> input = Mono.just(this.msg1);
ResolvableType inputType = forClass(Msg.class);
testEncode(input, inputType, MediaType.APPLICATION_JSON, null, step -> step
.assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"))
.verifyComplete());
testEncodeError(input, inputType, MediaType.APPLICATION_JSON, null);
testEncodeCancel(input, inputType, MediaType.APPLICATION_JSON, null);
}
@Test
void encodeEmptyMono() {
Mono<Message> input = Mono.empty();
ResolvableType inputType = forClass(Msg.class);
Flux<DataBuffer> result = this.encoder.encode(input, this.bufferFactory, inputType,
MediaType.APPLICATION_JSON, null);
StepVerifier.create(result)
.verifyComplete();
}
@Test
void encodeStream() {
Flux<Message> input = Flux.just(this.msg1, this.msg2);
ResolvableType inputType = forClass(Msg.class);
testEncode(input, inputType, MediaType.APPLICATION_JSON, null, step -> step
.assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "[{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"))
.assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, ",{\"foo\":\"Bar\",\"blah\":{\"blah\":456}}"))
.assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "]"))
.verifyComplete());
}
@Test
void encodeEmptyFlux() {
Flux<Message> input = Flux.empty();
ResolvableType inputType = forClass(Msg.class);
Flux<DataBuffer> result = this.encoder.encode(input, this.bufferFactory, inputType,
MediaType.APPLICATION_JSON, null);
StepVerifier.create(result)
.assertNext(buffer -> assertBufferEqualsJson(buffer, "["))
.assertNext(buffer -> assertBufferEqualsJson(buffer, "]"))
.verifyComplete();
}
private void assertBufferEqualsJson(DataBuffer actual, String expected) {
byte[] bytes = DataBufferTestUtils.dumpBytes(actual);
String json = new String(bytes, StandardCharsets.UTF_8);
assertThat(json).isEqualTo(expected);
DataBufferUtils.release(actual);
}
}