Align OutputStreamPublisher's

Align internal handling and contracts. The core copy could do without
those contracts, but it helps with alignment, and it's internal to
the implementation.

Closes gh-33592
This commit is contained in:
rstoyanchev 2024-09-25 11:41:06 +01:00
parent f6c31bb6c3
commit 7051cddcf7
3 changed files with 99 additions and 81 deletions

View File

@ -431,16 +431,17 @@ public abstract class DataBufferUtils {
* <li>Any exceptions thrown from {@code outputStreamHandler} will
* be dispatched to the {@linkplain Subscriber#onError(Throwable) Subscriber}.
* </ul>
* @param outputStreamConsumer invoked when the first buffer is requested
* @param consumer invoked when the first buffer is requested
* @param executor used to invoke the {@code outputStreamHandler}
* @return a {@code Publisher<DataBuffer>} based on bytes written by
* {@code outputStreamHandler}
* @since 6.1
*/
public static Publisher<DataBuffer> outputStreamPublisher(Consumer<OutputStream> outputStreamConsumer,
DataBufferFactory bufferFactory, Executor executor) {
public static Publisher<DataBuffer> outputStreamPublisher(
Consumer<OutputStream> consumer, DataBufferFactory bufferFactory, Executor executor) {
return new OutputStreamPublisher(outputStreamConsumer, bufferFactory, executor, null);
return new OutputStreamPublisher<>(
consumer::accept, new DataBufferMapper(bufferFactory), executor, null);
}
/**
@ -448,10 +449,11 @@ public abstract class DataBufferUtils {
* providing control over the chunk sizes to be produced by the publisher.
* @since 6.1
*/
public static Publisher<DataBuffer> outputStreamPublisher(Consumer<OutputStream> outputStreamConsumer,
DataBufferFactory bufferFactory, Executor executor, int chunkSize) {
public static Publisher<DataBuffer> outputStreamPublisher(
Consumer<OutputStream> consumer, DataBufferFactory bufferFactory, Executor executor, int chunkSize) {
return new OutputStreamPublisher(outputStreamConsumer, bufferFactory, executor, chunkSize);
return new OutputStreamPublisher<>(
consumer::accept, new DataBufferMapper(bufferFactory), executor, chunkSize);
}
@ -1256,4 +1258,29 @@ public abstract class DataBufferUtils {
private record Attachment(ByteBuffer byteBuffer, DataBuffer dataBuffer, DataBuffer.ByteBufferIterator iterator) {}
}
private static final class DataBufferMapper implements OutputStreamPublisher.ByteMapper<DataBuffer> {
private final DataBufferFactory bufferFactory;
private DataBufferMapper(DataBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
}
@Override
public DataBuffer map(int b) {
DataBuffer buffer = this.bufferFactory.allocateBuffer(1);
buffer.write((byte) b);
return buffer;
}
@Override
public DataBuffer map(byte[] b, int off, int len) {
DataBuffer buffer = this.bufferFactory.allocateBuffer(len);
buffer.write(b, off, len);
return buffer;
}
}
}

View File

@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -36,21 +35,27 @@ import org.springframework.util.Assert;
/**
* Bridges between {@link OutputStream} and {@link Publisher Publisher&lt;DataBuffer&gt;}.
*
* <p>When there is demand on the Reactive Streams subscription, any write to
* the OutputStream is mapped to a buffer and published to the subscriber.
* If there is no demand, writes block until demand materializes.
* If the subscription is cancelled, further writes raise {@code IOException}.
*
* <p>Note that this class has a near duplicate in
* {@link org.springframework.http.client.OutputStreamPublisher}.
*
* @author Oleh Dokuka
* @author Arjen Poutsma
* @since 6.1
* @param <T> the published byte buffer type
*/
final class OutputStreamPublisher implements Publisher<DataBuffer> {
final class OutputStreamPublisher<T> implements Publisher<T> {
private static final int DEFAULT_CHUNK_SIZE = 1024;
private final Consumer<OutputStream> outputStreamConsumer;
private final OutputStreamHandler outputStreamHandler;
private final DataBufferFactory bufferFactory;
private final ByteMapper<T> byteMapper;
private final Executor executor;
@ -59,50 +64,74 @@ final class OutputStreamPublisher implements Publisher<DataBuffer> {
/**
* Create an instance.
* @param outputStreamConsumer invoked when the first buffer is requested
* @param bufferFactory to create data buffers with
* @param outputStreamHandler invoked when the first buffer is requested
* @param byteMapper maps written bytes to {@code T}
* @param executor used to invoke the {@code outputStreamHandler}
* @param chunkSize the chunk sizes to be produced by the publisher
*/
OutputStreamPublisher(
Consumer<OutputStream> outputStreamConsumer, DataBufferFactory bufferFactory,
OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
Executor executor, @Nullable Integer chunkSize) {
Assert.notNull(outputStreamConsumer, "OutputStreamConsumer must not be null");
Assert.notNull(bufferFactory, "BufferFactory must not be null");
Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null");
Assert.notNull(byteMapper, "ByteMapper must not be null");
Assert.notNull(executor, "Executor must not be null");
Assert.isTrue(chunkSize == null || chunkSize > 0, "ChunkSize must be larger than 0");
this.outputStreamConsumer = outputStreamConsumer;
this.bufferFactory = bufferFactory;
this.outputStreamHandler = outputStreamHandler;
this.byteMapper = byteMapper;
this.executor = executor;
this.chunkSize = (chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE);
}
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
public void subscribe(Subscriber<? super T> subscriber) {
// We don't use Assert.notNull(), because a NullPointerException is required
// for Reactive Streams compliance.
Objects.requireNonNull(subscriber, "Subscriber must not be null");
OutputStreamSubscription subscription = new OutputStreamSubscription(
subscriber, this.outputStreamConsumer, this.bufferFactory, this.chunkSize);
OutputStreamSubscription<T> subscription = new OutputStreamSubscription<>(
subscriber, this.outputStreamHandler, this.byteMapper, this.chunkSize);
subscriber.onSubscribe(subscription);
this.executor.execute(subscription::invokeHandler);
}
private static final class OutputStreamSubscription extends OutputStream implements Subscription {
/**
* Contract to provide callback access to the {@link OutputStream}.
*/
@FunctionalInterface
public interface OutputStreamHandler {
void handle(OutputStream outputStream) throws Exception;
}
/**
* Maps from bytes to byte buffers.
* @param <T> the type of byte buffer to map to
*/
public interface ByteMapper<T> {
T map(int b);
T map(byte[] b, int off, int len);
}
private static final class OutputStreamSubscription<T> extends OutputStream implements Subscription {
private static final Object READY = new Object();
private final Subscriber<? super DataBuffer> actual;
private final Subscriber<? super T> actual;
private final Consumer<OutputStream> outputStreamHandler;
private final OutputStreamHandler outputStreamHandler;
private final DataBufferFactory bufferFactory;
private final ByteMapper<T> byteMapper;
private final int chunkSize;
@ -116,24 +145,20 @@ final class OutputStreamPublisher implements Publisher<DataBuffer> {
private long produced;
OutputStreamSubscription(
Subscriber<? super DataBuffer> actual, Consumer<OutputStream> outputStreamConsumer,
DataBufferFactory bufferFactory, int chunkSize) {
Subscriber<? super T> actual, OutputStreamHandler outputStreamHandler,
ByteMapper<T> byteMapper, int chunkSize) {
this.actual = actual;
this.outputStreamHandler = outputStreamConsumer;
this.bufferFactory = bufferFactory;
this.outputStreamHandler = outputStreamHandler;
this.byteMapper = byteMapper;
this.chunkSize = chunkSize;
}
@Override
public void write(int b) throws IOException {
checkDemandAndAwaitIfNeeded();
DataBuffer next = this.bufferFactory.allocateBuffer(1);
next.write((byte) b);
T next = this.byteMapper.map(b);
this.actual.onNext(next);
this.produced++;
}
@ -145,12 +170,8 @@ final class OutputStreamPublisher implements Publisher<DataBuffer> {
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkDemandAndAwaitIfNeeded();
DataBuffer next = this.bufferFactory.allocateBuffer(len);
next.write(b, off, len);
T next = this.byteMapper.map(b, off, len);
this.actual.onNext(next);
this.produced++;
}
@ -190,7 +211,7 @@ final class OutputStreamPublisher implements Publisher<DataBuffer> {
// use BufferedOutputStream, so that written bytes are buffered
// before publishing as byte buffer
try (OutputStream outputStream = new BufferedOutputStream(this, this.chunkSize)) {
this.outputStreamHandler.accept(outputStream);
this.outputStreamHandler.handle(outputStream);
}
catch (Exception ex) {
long previousState = tryTerminate();

View File

@ -32,6 +32,11 @@ import org.springframework.util.Assert;
/**
* Bridges between {@link OutputStream} and {@link Flow.Publisher Flow.Publisher&lt;T&gt;}.
*
* <p>When there is demand on the Reactive Streams subscription, any write to
* the OutputStream is mapped to a buffer and published to the subscriber.
* If there is no demand, writes block until demand materializes.
* If the subscription is cancelled, further writes raise {@code IOException}.
*
* <p>Note that this class has a near duplicate in
* {@link org.springframework.core.io.buffer.OutputStreamPublisher}.
*
@ -92,53 +97,24 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
/**
* Defines the contract for handling the {@code OutputStream} provided by
* the {@code OutputStreamPublisher}.
* Contract to provide callback access to the {@link OutputStream}.
*/
@FunctionalInterface
public interface OutputStreamHandler {
/**
* Use the given stream for writing.
* <ul>
* <li>If the linked subscription has
* {@linkplain Flow.Subscription#request(long) demand}, any
* {@linkplain OutputStream#write(byte[], int, int) written} bytes
* will be {@linkplain ByteMapper#map(byte[], int, int) mapped}
* and {@linkplain Flow.Subscriber#onNext(Object) published} to the
* {@link Flow.Subscriber Subscriber}.</li>
* <li>If there is no demand, any
* {@link OutputStream#write(byte[], int, int) write()} invocations will
* block until there is demand.</li>
* <li>If the linked subscription is
* {@linkplain Flow.Subscription#cancel() cancelled},
* {@link OutputStream#write(byte[], int, int) write()} invocations will
* result in a {@code IOException}.</li>
* </ul>
* @param outputStream the stream to write to
* @throws IOException any thrown I/O errors will be dispatched to the
* {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}
*/
void handle(OutputStream outputStream) throws IOException;
void handle(OutputStream outputStream) throws Exception;
}
/**
* Maps bytes written to in {@link OutputStreamHandler#handle(OutputStream)}
* to published items.
* @param <T> the type to map to
* Maps from bytes to byte buffers.
* @param <T> the type of byte buffer to map to
*/
public interface ByteMapper<T> {
/**
* Maps a single byte to {@code T}.
*/
T map(int b);
/**
* Maps a byte array to {@code T}.
*/
T map(byte[] b, int off, int len);
}
@ -146,7 +122,7 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
private static final class OutputStreamSubscription<T> extends OutputStream implements Flow.Subscription {
static final Object READY = new Object();
private static final Object READY = new Object();
private final Flow.Subscriber<? super T> actual;
@ -178,11 +154,8 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
@Override
public void write(int b) throws IOException {
checkDemandAndAwaitIfNeeded();
T next = this.byteMapper.map(b);
this.actual.onNext(next);
this.produced++;
}
@ -194,11 +167,8 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkDemandAndAwaitIfNeeded();
T next = this.byteMapper.map(b, off, len);
this.actual.onNext(next);
this.produced++;
}
@ -240,7 +210,7 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
try (OutputStream outputStream = new BufferedOutputStream(this, this.chunkSize)) {
this.outputStreamHandler.handle(outputStream);
}
catch (IOException ex) {
catch (Exception ex) {
long previousState = tryTerminate();
if (isCancelled(previousState)) {
return;