InputStreamSubscriber compiler issues

See gh-31677
This commit is contained in:
rstoyanchev 2024-10-25 14:55:11 +01:00
parent a366ea0e15
commit d4b31fd4b2
4 changed files with 42 additions and 36 deletions

View File

@ -175,7 +175,7 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
}
catch (Throwable t) {
this.closed = true;
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
throw Exceptions.propagate(t);
}
@ -217,7 +217,7 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
return j;
}
} else if (bytes == CLOSED) {
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
return -1;
}
@ -230,7 +230,7 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
}
catch (Throwable t) {
this.closed = true;
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
throw Exceptions.propagate(t);
}
@ -258,7 +258,7 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
this.available = t;
if (consumed == this.limit) {
this.consumed = 0;
this.s.request(this.limit);
requiredSubscriber().request(this.limit);
}
break;
}
@ -315,7 +315,7 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
}
try {
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
}
finally {
@ -323,6 +323,11 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
}
}
private Subscription requiredSubscriber() {
Assert.state(this.s != null, "Subscriber must be subscribed to use InputStream");
return this.s;
}
private void await() {
Thread toUnpark = Thread.currentThread();

View File

@ -28,7 +28,6 @@ import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;

View File

@ -16,9 +16,6 @@ import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import org.springframework.core.io.buffer.DataBuffer;
@ -75,26 +72,26 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
}
/**
* Subscribes to given {@link Publisher} and returns subscription
* Subscribes to given {@link Flow.Publisher} and returns subscription
* as {@link InputStream} that allows reading all propagated {@link DataBuffer} messages via its imperative API.
* Given the {@link InputStream} implementation buffers messages as per configuration.
* The returned {@link InputStream} is considered terminated when the given {@link Publisher} signaled one of the
* terminal signal ({@link Subscriber#onComplete() or {@link Subscriber#onError(Throwable)}})
* The returned {@link InputStream} is considered terminated when the given {@link Flow.Publisher} signaled one of the
* terminal signal ({@link Flow.Subscriber#onComplete() or {@link Flow.Subscriber#onError(Throwable)}})
* and all the stored {@link DataBuffer} polled from the internal buffer.
* The returned {@link InputStream} will call {@link Subscription#cancel()} and release all stored {@link DataBuffer}
* The returned {@link InputStream} will call {@link Flow.Subscription#cancel()} and release all stored {@link DataBuffer}
* when {@link InputStream#close()} is called.
* <p>
* Note: The implementation of the returned {@link InputStream} disallow concurrent call on
* any of the {@link InputStream#read} methods
* <p>
* Note: {@link Subscription#request(long)} happens eagerly for the first time upon subscription
* Note: {@link Flow.Subscription#request(long)} happens eagerly for the first time upon subscription
* and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed
*
* @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream}
* @param mapper function to transform &lt;T&gt; element to {@code byte[]}. Note, &lt;T&gt; should be released during the mapping if needed.
* @param onDiscardHandler &lt;T&gt; element consumer if returned {@link InputStream} is closed prematurely.
* @param bufferSize the maximum amount of &lt;T&gt; elements prefetched in advance and stored inside {@link InputStream}
* @return an {@link InputStream} instance representing given {@link Publisher} messages
* @return an {@link InputStream} instance representing given {@link Flow.Publisher} messages
*/
public static <T> InputStream subscribeTo(Flow.Publisher<T> publisher, Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int bufferSize) {
@ -221,7 +218,7 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
}
catch (Throwable t) {
this.closed = true;
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
throw Exceptions.propagate(t);
}
@ -263,7 +260,7 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
return j;
}
} else if (bytes == CLOSED) {
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
return -1;
}
@ -278,7 +275,7 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
}
catch (Throwable t) {
this.closed = true;
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
throw Exceptions.propagate(t);
}
@ -305,7 +302,7 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
this.available = Objects.requireNonNull(this.mapper.apply(t));
if (consumed == this.limit) {
this.consumed = 0;
this.s.request(this.limit);
requiredSubscriber().request(this.limit);
}
break;
}
@ -367,7 +364,7 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
}
try {
this.s.cancel();
requiredSubscriber().cancel();
cleanAndFinalize();
}
finally {
@ -375,6 +372,11 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
}
}
private Flow.Subscription requiredSubscriber() {
Assert.state(this.s != null, "Subscriber must be subscribed to use InputStream");
return this.s;
}
private void await() {
Thread toUnpark = Thread.currentThread();

View File

@ -16,11 +16,6 @@
package org.springframework.http.client;
import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
@ -32,6 +27,11 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIOException;
@ -68,11 +68,11 @@ class InputStreamSubscriberTests {
@Test
void basic() {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
outputStream.write(FOO);
outputStream.write(BAR);
outputStream.write(BAZ);
}, this.byteMapper, this.executor);
}, this.byteMapper, this.executor, null);
Flux<String> flux = toString(flowPublisher);
StepVerifier.create(flux)
@ -82,14 +82,14 @@ class InputStreamSubscriberTests {
@Test
void flush() {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
outputStream.write(FOO);
outputStream.flush();
outputStream.write(BAR);
outputStream.flush();
outputStream.write(BAZ);
outputStream.flush();
}, this.byteMapper, this.executor);
}, this.byteMapper, this.executor, null);
Flux<String> flux = toString(flowPublisher);
try (InputStream is = InputStreamSubscriber.subscribeTo(FlowAdapters.toFlowPublisher(flux), (s) -> s.getBytes(StandardCharsets.UTF_8), (ignore) -> {}, 1)) {
@ -110,7 +110,7 @@ class InputStreamSubscriberTests {
@Test
void chunkSize() {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
outputStream.write(FOO);
outputStream.write(BAR);
outputStream.write(BAZ);
@ -146,7 +146,7 @@ class InputStreamSubscriberTests {
void cancel() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
assertThatIOException()
.isThrownBy(() -> {
outputStream.write(FOO);
@ -159,7 +159,7 @@ class InputStreamSubscriberTests {
.withMessage("Subscription has been terminated");
latch.countDown();
}, this.byteMapper, this.executor);
}, this.byteMapper, this.executor, null);
Flux<String> flux = toString(flowPublisher);
List<String> discarded = new ArrayList<>();
@ -182,14 +182,14 @@ class InputStreamSubscriberTests {
void closed() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
writer.write("foo");
writer.close();
assertThatIOException().isThrownBy(() -> writer.write("bar"))
.withMessage("Stream closed");
latch.countDown();
}, this.byteMapper, this.executor);
}, this.byteMapper, this.executor, null);
Flux<String> flux = toString(flowPublisher);
try (InputStream is = InputStreamSubscriber.subscribeTo(FlowAdapters.toFlowPublisher(flux), (s) -> s.getBytes(StandardCharsets.UTF_8), ig -> {}, 1)) {
@ -211,7 +211,7 @@ class InputStreamSubscriberTests {
void mapperThrowsException() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
outputStream.write(FOO);
outputStream.flush();
assertThatIOException().isThrownBy(() -> {
@ -219,7 +219,7 @@ class InputStreamSubscriberTests {
outputStream.flush();
}).withMessage("Subscription has been terminated");
latch.countDown();
}, this.byteMapper, this.executor);
}, this.byteMapper, this.executor, null);
Throwable ex = null;
StringBuilder stringBuilder = new StringBuilder();