From d4b31fd4b2fde976a5a666454e36c797f040ec6c Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Fri, 25 Oct 2024 14:55:11 +0100 Subject: [PATCH] InputStreamSubscriber compiler issues See gh-31677 --- .../core/io/buffer/InputStreamSubscriber.java | 15 ++++++--- .../core/io/buffer/DataBufferUtilsTests.java | 1 - .../http/client/InputStreamSubscriber.java | 30 +++++++++-------- .../client/InputStreamSubscriberTests.java | 32 +++++++++---------- 4 files changed, 42 insertions(+), 36 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java b/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java index b5a89c9bae3..0dd94fd6507 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java @@ -175,7 +175,7 @@ final class InputStreamSubscriber extends InputStream implements Subscriber extends InputStream implements Flow.Subscri } /** - * Subscribes to given {@link Publisher} and returns subscription + * Subscribes to given {@link Flow.Publisher} and returns subscription * as {@link InputStream} that allows reading all propagated {@link DataBuffer} messages via its imperative API. * Given the {@link InputStream} implementation buffers messages as per configuration. - * The returned {@link InputStream} is considered terminated when the given {@link Publisher} signaled one of the - * terminal signal ({@link Subscriber#onComplete() or {@link Subscriber#onError(Throwable)}}) + * The returned {@link InputStream} is considered terminated when the given {@link Flow.Publisher} signaled one of the + * terminal signal ({@link Flow.Subscriber#onComplete() or {@link Flow.Subscriber#onError(Throwable)}}) * and all the stored {@link DataBuffer} polled from the internal buffer. - * The returned {@link InputStream} will call {@link Subscription#cancel()} and release all stored {@link DataBuffer} + * The returned {@link InputStream} will call {@link Flow.Subscription#cancel()} and release all stored {@link DataBuffer} * when {@link InputStream#close()} is called. *

* Note: The implementation of the returned {@link InputStream} disallow concurrent call on * any of the {@link InputStream#read} methods *

- * Note: {@link Subscription#request(long)} happens eagerly for the first time upon subscription + * Note: {@link Flow.Subscription#request(long)} happens eagerly for the first time upon subscription * and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed * * @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream} * @param mapper function to transform <T> element to {@code byte[]}. Note, <T> should be released during the mapping if needed. * @param onDiscardHandler <T> element consumer if returned {@link InputStream} is closed prematurely. * @param bufferSize the maximum amount of <T> elements prefetched in advance and stored inside {@link InputStream} - * @return an {@link InputStream} instance representing given {@link Publisher} messages + * @return an {@link InputStream} instance representing given {@link Flow.Publisher} messages */ public static InputStream subscribeTo(Flow.Publisher publisher, Function mapper, Consumer onDiscardHandler, int bufferSize) { @@ -221,7 +218,7 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri } catch (Throwable t) { this.closed = true; - this.s.cancel(); + requiredSubscriber().cancel(); cleanAndFinalize(); throw Exceptions.propagate(t); } @@ -263,7 +260,7 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri return j; } } else if (bytes == CLOSED) { - this.s.cancel(); + requiredSubscriber().cancel(); cleanAndFinalize(); return -1; } @@ -278,7 +275,7 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri } catch (Throwable t) { this.closed = true; - this.s.cancel(); + requiredSubscriber().cancel(); cleanAndFinalize(); throw Exceptions.propagate(t); } @@ -305,7 +302,7 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri this.available = Objects.requireNonNull(this.mapper.apply(t)); if (consumed == this.limit) { this.consumed = 0; - this.s.request(this.limit); + requiredSubscriber().request(this.limit); } break; } @@ -367,7 +364,7 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri } try { - this.s.cancel(); + requiredSubscriber().cancel(); cleanAndFinalize(); } finally { @@ -375,6 +372,11 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri } } + private Flow.Subscription requiredSubscriber() { + Assert.state(this.s != null, "Subscriber must be subscribed to use InputStream"); + return this.s; + } + private void await() { Thread toUnpark = Thread.currentThread(); diff --git a/spring-web/src/test/java/org/springframework/http/client/InputStreamSubscriberTests.java b/spring-web/src/test/java/org/springframework/http/client/InputStreamSubscriberTests.java index 9dd635dffba..66eaf2eced6 100644 --- a/spring-web/src/test/java/org/springframework/http/client/InputStreamSubscriberTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/InputStreamSubscriberTests.java @@ -16,11 +16,6 @@ package org.springframework.http.client; -import org.junit.jupiter.api.Test; -import org.reactivestreams.FlowAdapters; -import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; @@ -32,6 +27,11 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; +import org.junit.jupiter.api.Test; +import org.reactivestreams.FlowAdapters; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIOException; @@ -68,11 +68,11 @@ class InputStreamSubscriberTests { @Test void basic() { - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = new OutputStreamPublisher<>(outputStream -> { outputStream.write(FOO); outputStream.write(BAR); outputStream.write(BAZ); - }, this.byteMapper, this.executor); + }, this.byteMapper, this.executor, null); Flux flux = toString(flowPublisher); StepVerifier.create(flux) @@ -82,14 +82,14 @@ class InputStreamSubscriberTests { @Test void flush() { - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = new OutputStreamPublisher<>(outputStream -> { outputStream.write(FOO); outputStream.flush(); outputStream.write(BAR); outputStream.flush(); outputStream.write(BAZ); outputStream.flush(); - }, this.byteMapper, this.executor); + }, this.byteMapper, this.executor, null); Flux flux = toString(flowPublisher); try (InputStream is = InputStreamSubscriber.subscribeTo(FlowAdapters.toFlowPublisher(flux), (s) -> s.getBytes(StandardCharsets.UTF_8), (ignore) -> {}, 1)) { @@ -110,7 +110,7 @@ class InputStreamSubscriberTests { @Test void chunkSize() { - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = new OutputStreamPublisher<>(outputStream -> { outputStream.write(FOO); outputStream.write(BAR); outputStream.write(BAZ); @@ -146,7 +146,7 @@ class InputStreamSubscriberTests { void cancel() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = new OutputStreamPublisher<>(outputStream -> { assertThatIOException() .isThrownBy(() -> { outputStream.write(FOO); @@ -159,7 +159,7 @@ class InputStreamSubscriberTests { .withMessage("Subscription has been terminated"); latch.countDown(); - }, this.byteMapper, this.executor); + }, this.byteMapper, this.executor, null); Flux flux = toString(flowPublisher); List discarded = new ArrayList<>(); @@ -182,14 +182,14 @@ class InputStreamSubscriberTests { void closed() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = new OutputStreamPublisher<>(outputStream -> { OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); writer.write("foo"); writer.close(); assertThatIOException().isThrownBy(() -> writer.write("bar")) .withMessage("Stream closed"); latch.countDown(); - }, this.byteMapper, this.executor); + }, this.byteMapper, this.executor, null); Flux flux = toString(flowPublisher); try (InputStream is = InputStreamSubscriber.subscribeTo(FlowAdapters.toFlowPublisher(flux), (s) -> s.getBytes(StandardCharsets.UTF_8), ig -> {}, 1)) { @@ -211,7 +211,7 @@ class InputStreamSubscriberTests { void mapperThrowsException() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = new OutputStreamPublisher<>(outputStream -> { outputStream.write(FOO); outputStream.flush(); assertThatIOException().isThrownBy(() -> { @@ -219,7 +219,7 @@ class InputStreamSubscriberTests { outputStream.flush(); }).withMessage("Subscription has been terminated"); latch.countDown(); - }, this.byteMapper, this.executor); + }, this.byteMapper, this.executor, null); Throwable ex = null; StringBuilder stringBuilder = new StringBuilder();