From 11898daed726cee7c3ea189e058f48fd611f4848 Mon Sep 17 00:00:00 2001 From: Felipe Date: Sat, 20 Jan 2024 14:36:51 +0100 Subject: [PATCH] Add support for JSON streams to Kotlin Serialization Closes gh-32074 --- .../KotlinSerializationStringDecoder.java | 14 +++++++--- .../KotlinSerializationStringEncoder.java | 25 ++++++++++++----- .../json/KotlinSerializationJsonDecoder.java | 3 +- .../json/KotlinSerializationJsonEncoder.java | 6 +++- ...stomKotlinSerializationJsonDecoderTests.kt | 13 ++++++--- .../KotlinSerializationJsonDecoderTests.kt | 19 +++++++++++++ .../KotlinSerializationJsonEncoderTests.kt | 28 +++++++++++++++++-- 7 files changed, 88 insertions(+), 20 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringDecoder.java index ef682563395..62e23d203e5 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringDecoder.java @@ -16,8 +16,7 @@ package org.springframework.http.codec; -import java.util.List; -import java.util.Map; +import java.util.*; import kotlinx.serialization.KSerializer; import kotlinx.serialization.StringFormat; @@ -95,13 +94,20 @@ public abstract class KotlinSerializationStringDecoder e public Flux decode(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.error(new UnsupportedOperationException()); + return Flux.defer(() -> { + KSerializer serializer = serializer(elementType); + if (serializer == null) { + return Mono.error(new DecodingException("Could not find KSerializer for " + elementType)); + } + return this.stringDecoder + .decode(inputStream, elementType, mimeType, hints) + .map(string -> format().decodeFromString(serializer, string)); + }); } @Override public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Mono.defer(() -> { KSerializer serializer = serializer(elementType); if (serializer == null) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringEncoder.java index e5a360ac0df..8cdaef51018 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringEncoder.java @@ -16,12 +16,13 @@ package org.springframework.http.codec; -import java.util.List; -import java.util.Map; +import java.util.*; +import kotlin.text.Charsets; import kotlinx.serialization.KSerializer; import kotlinx.serialization.StringFormat; import org.reactivestreams.Publisher; +import org.springframework.http.MediaType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,11 +50,17 @@ public abstract class KotlinSerializationStringEncoder e // CharSequence encoding needed for now, see https://github.com/Kotlin/kotlinx.serialization/issues/204 for more details private final CharSequenceEncoder charSequenceEncoder = CharSequenceEncoder.allMimeTypes(); + private final Set streamingMediaTypes = new HashSet<>(); protected KotlinSerializationStringEncoder(T format, MimeType... supportedMimeTypes) { super(format, supportedMimeTypes); } + public void setStreamingMediaTypes(Collection streamingMediaTypes) { + this.streamingMediaTypes.clear(); + this.streamingMediaTypes.addAll(streamingMediaTypes); + } + @Override public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) { return canSerialize(elementType, mimeType); @@ -79,13 +86,17 @@ public abstract class KotlinSerializationStringEncoder e .map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints)) .flux(); } - else { - ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); + + if (mimeType != null && streamingMediaTypes.contains(mimeType)) { return Flux.from(inputStream) - .collectList() - .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) - .flux(); + .map(list -> encodeValue(list, bufferFactory, elementType, mimeType, hints).write("\n", Charsets.UTF_8)); } + + ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); + return Flux.from(inputStream) + .collectList() + .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) + .flux(); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonDecoder.java index 74e10f76ebb..8c6ea40efd7 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonDecoder.java @@ -46,7 +46,8 @@ public class KotlinSerializationJsonDecoder extends KotlinSerializationStringDec } public KotlinSerializationJsonDecoder(Json json) { - super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json")); + super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"), + MediaType.APPLICATION_NDJSON); } } diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonEncoder.java index 45f7972de5b..afa1b5eb41b 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonEncoder.java @@ -21,6 +21,8 @@ import kotlinx.serialization.json.Json; import org.springframework.http.MediaType; import org.springframework.http.codec.KotlinSerializationStringEncoder; +import java.util.List; + /** * Encode from an {@code Object} stream to a byte stream of JSON objects using * kotlinx.serialization. @@ -42,7 +44,9 @@ public class KotlinSerializationJsonEncoder extends KotlinSerializationStringEnc } public KotlinSerializationJsonEncoder(Json json) { - super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json")); + super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"), + MediaType.APPLICATION_NDJSON); + setStreamingMediaTypes(List.of(MediaType.APPLICATION_NDJSON)); } } diff --git a/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonDecoderTests.kt b/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonDecoderTests.kt index 8885152820d..0f5d816eb7b 100644 --- a/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonDecoderTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonDecoderTests.kt @@ -23,6 +23,7 @@ import org.springframework.core.io.buffer.DataBuffer import org.springframework.core.testfixture.codec.AbstractDecoderTests import org.springframework.http.MediaType import org.springframework.http.customJson +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.test.StepVerifier import java.math.BigDecimal @@ -45,12 +46,16 @@ class CustomKotlinSerializationJsonDecoderTests : @Test override fun decode() { - val output = decoder.decode(Mono.empty(), - ResolvableType.forClass(KotlinSerializationJsonDecoderTests.Pojo::class.java), null, emptyMap()) + val input = Flux.concat( + stringBuffer("1.0\n"), + stringBuffer("2.0\n") + ) + val output = decoder.decode(input, ResolvableType.forClass(BigDecimal::class.java), null, emptyMap()) StepVerifier .create(output) - .expectError(UnsupportedOperationException::class.java) - .verify() + .expectNext(BigDecimal.valueOf(1.0)) + .expectNext(BigDecimal.valueOf(2.0)) + .verifyComplete() } @Test diff --git a/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonDecoderTests.kt b/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonDecoderTests.kt index 7950a136f83..ffa4dfc1ea1 100644 --- a/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonDecoderTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonDecoderTests.kt @@ -63,6 +63,8 @@ class KotlinSerializationJsonDecoderTests : AbstractDecoderTests -> + step + .expectNext(Pojo("f1", "b1")) + .expectNext(Pojo("f2", "b2")) + .expectComplete() + .verify() + }, null, null) + } + @Test override fun decodeToMono() { val input = Flux.concat( diff --git a/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonEncoderTests.kt b/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonEncoderTests.kt index ae47f580c23..843eb71d945 100644 --- a/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonEncoderTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonEncoderTests.kt @@ -58,14 +58,16 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests -> step .consumeNextWith(expectString("[" + @@ -76,6 +78,26 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests -> + step + .consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n")) + .consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n")) + .consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n")) + .verifyComplete() + } + } @Test fun encodeMono() {