Change OutputStreamPublisher default chunk size

This commit set the default chunk size to 1024 (from 8192).
This commit is contained in:
Arjen Poutsma 2023-07-06 11:55:17 +02:00
parent 3d2befc84a
commit 68b5eedde1
2 changed files with 119 additions and 48 deletions

View File

@ -41,17 +41,23 @@ import org.springframework.util.Assert;
*/ */
final class OutputStreamPublisher<T> implements Flow.Publisher<T> { final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
private static final int DEFAULT_CHUNK_SIZE = 1024;
private final OutputStreamHandler outputStreamHandler; private final OutputStreamHandler outputStreamHandler;
private final ByteMapper<T> byteMapper; private final ByteMapper<T> byteMapper;
private final Executor executor; private final Executor executor;
private final int chunkSize;
private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper, Executor executor) {
private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper, Executor executor, int chunkSize) {
this.outputStreamHandler = outputStreamHandler; this.outputStreamHandler = outputStreamHandler;
this.byteMapper = byteMapper; this.byteMapper = byteMapper;
this.executor = executor; this.executor = executor;
this.chunkSize = chunkSize;
} }
@ -64,16 +70,18 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
* subscription of the returned {@code Publisher}, when the first * subscription of the returned {@code Publisher}, when the first
* item is * item is
* {@linkplain Flow.Subscription#request(long) requested}.</li> * {@linkplain Flow.Subscription#request(long) requested}.</li>
* <li>Each {@link OutputStream#write(byte[], int, int) OutputStream.write()} * <li>{@link OutputStream#write(byte[], int, int) OutputStream.write()}
* invocation that {@code outputStreamHandler} makes will result in a * invocations made by {@code outputStreamHandler} are buffered until they
* exceed the default chunk size of 1024, and then result in a
* {@linkplain Flow.Subscriber#onNext(Object) published} item * {@linkplain Flow.Subscriber#onNext(Object) published} item
* if there is {@linkplain Flow.Subscription#request(long) demand}.</li> * if there is {@linkplain Flow.Subscription#request(long) demand}.</li>
* <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block * <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block
* until there is.</li> * until there is.</li>
* <li>If the subscription is {@linkplain Flow.Subscription#cancel() cancelled}, * <li>If the subscription is {@linkplain Flow.Subscription#cancel() cancelled},
* {@code OutputStream.write()} will throw a {@code IOException}.</li> * {@code OutputStream.write()} will throw a {@code IOException}.</li>
* <li>{@linkplain OutputStream#close() Closing} the {@code OutputStream} * <li>The subscription is
* will result in a {@linkplain Flow.Subscriber#onComplete() complete} signal.</li> * {@linkplain Flow.Subscriber#onComplete() completed} when
* {@code outputStreamHandler} completes.</li>
* <li>Any {@code IOException}s thrown from {@code outputStreamHandler} will * <li>Any {@code IOException}s thrown from {@code outputStreamHandler} will
* be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}. * be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}.
* </ul> * </ul>
@ -91,15 +99,58 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
Assert.notNull(byteMapper, "ByteMapper must not be null"); Assert.notNull(byteMapper, "ByteMapper must not be null");
Assert.notNull(executor, "Executor must not be null"); Assert.notNull(executor, "Executor must not be null");
return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor); return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor, DEFAULT_CHUNK_SIZE);
} }
/**
* Creates a new {@code Publisher<T>} based on bytes written to a
* {@code OutputStream}. The parameter {@code byteMapper} is used to map
* from written bytes to the published type.
* <ul>
* <li>The parameter {@code outputStreamHandler} is invoked once per
* subscription of the returned {@code Publisher}, when the first
* item is
* {@linkplain Flow.Subscription#request(long) requested}.</li>
* <li>{@link OutputStream#write(byte[], int, int) OutputStream.write()}
* invocations made by {@code outputStreamHandler} are buffered until they
* exceed {@code chunkSize}, and then result in a
* {@linkplain Flow.Subscriber#onNext(Object) published} item
* if there is {@linkplain Flow.Subscription#request(long) demand}.</li>
* <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block
* until there is.</li>
* <li>If the subscription is {@linkplain Flow.Subscription#cancel() cancelled},
* {@code OutputStream.write()} will throw a {@code IOException}.</li>
* <li>The subscription is
* {@linkplain Flow.Subscriber#onComplete() completed} when
* {@code outputStreamHandler} completes.</li>
* <li>Any {@code IOException}s thrown from {@code outputStreamHandler} will
* be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}.
* </ul>
* @param outputStreamHandler invoked when the first buffer is requested
* @param byteMapper maps written bytes to {@code T}
* @param executor used to invoke the {@code outputStreamHandler}
* @param <T> the publisher type
* @return a {@code Publisher<T>} based on bytes written by
* {@code outputStreamHandler} mapped by {@code byteMapper}
*/
public static <T> Flow.Publisher<T> create(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
Executor executor, int chunkSize) {
Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null");
Assert.notNull(byteMapper, "ByteMapper must not be null");
Assert.notNull(executor, "Executor must not be null");
Assert.isTrue(chunkSize > 0, "ChunkSize must be larger than 0");
return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor, chunkSize);
}
@Override @Override
public void subscribe(Flow.Subscriber<? super T> subscriber) { public void subscribe(Flow.Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "Subscriber must not be null"); Objects.requireNonNull(subscriber, "Subscriber must not be null");
OutputStreamSubscription<T> subscription = new OutputStreamSubscription<>(subscriber, this.outputStreamHandler, OutputStreamSubscription<T> subscription = new OutputStreamSubscription<>(subscriber, this.outputStreamHandler,
this.byteMapper); this.byteMapper, this.chunkSize);
subscriber.onSubscribe(subscription); subscriber.onSubscribe(subscription);
this.executor.execute(subscription::invokeHandler); this.executor.execute(subscription::invokeHandler);
} }
@ -162,16 +213,18 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
static final Object READY = new Object(); static final Object READY = new Object();
private final Flow.Subscriber<? super T> actual;
private final Flow.Subscriber<? super T> actual;
private final OutputStreamHandler outputStreamHandler; private final OutputStreamHandler outputStreamHandler;
private final ByteMapper<T> byteMapper; private final ByteMapper<T> byteMapper;
private final int chunkSize;
private final AtomicLong requested = new AtomicLong(); private final AtomicLong requested = new AtomicLong();
private final AtomicReference<Object> parkedThreadAtomic = new AtomicReference<>(); private final AtomicReference<Object> parkedThread = new AtomicReference<>();
@Nullable @Nullable
private volatile Throwable error; private volatile Throwable error;
@ -180,10 +233,11 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
public OutputStreamSubscription(Flow.Subscriber<? super T> actual, OutputStreamHandler outputStreamHandler, public OutputStreamSubscription(Flow.Subscriber<? super T> actual, OutputStreamHandler outputStreamHandler,
ByteMapper<T> byteMapper) { ByteMapper<T> byteMapper, int chunkSize) {
this.actual = actual; this.actual = actual;
this.byteMapper = byteMapper; this.byteMapper = byteMapper;
this.outputStreamHandler = outputStreamHandler; this.outputStreamHandler = outputStreamHandler;
this.chunkSize = chunkSize;
} }
@Override @Override
@ -248,7 +302,7 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
// use BufferedOutputStream, so that written bytes are buffered // use BufferedOutputStream, so that written bytes are buffered
// before publishing as byte buffer // before publishing as byte buffer
try (OutputStream outputStream = new BufferedOutputStream(this)) { try (OutputStream outputStream = new BufferedOutputStream(this, this.chunkSize)) {
this.outputStreamHandler.handle(outputStream); this.outputStreamHandler.handle(outputStream);
} }
catch (IOException ex) { catch (IOException ex) {
@ -323,7 +377,7 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
Thread toUnpark = Thread.currentThread(); Thread toUnpark = Thread.currentThread();
while (true) { while (true) {
Object current = this.parkedThreadAtomic.get(); Object current = this.parkedThread.get();
if (current == READY) { if (current == READY) {
break; break;
} }
@ -332,19 +386,19 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
throw new IllegalStateException("Only one (Virtual)Thread can await!"); throw new IllegalStateException("Only one (Virtual)Thread can await!");
} }
if (this.parkedThreadAtomic.compareAndSet(null, toUnpark)) { if (this.parkedThread.compareAndSet(null, toUnpark)) {
LockSupport.park(); LockSupport.park();
// we don't just break here because park() can wake up spuriously // we don't just break here because park() can wake up spuriously
// if we got a proper resume, get() == READY and the loop will quit above // if we got a proper resume, get() == READY and the loop will quit above
} }
} }
// clear the resume indicator so that the next await call will park without a resume() // clear the resume indicator so that the next await call will park without a resume()
this.parkedThreadAtomic.lazySet(null); this.parkedThread.lazySet(null);
} }
private void resume() { private void resume() {
if (this.parkedThreadAtomic.get() != READY) { if (this.parkedThread.get() != READY) {
Object old = this.parkedThreadAtomic.getAndSet(READY); Object old = this.parkedThread.getAndSet(READY);
if (old != READY) { if (old != READY) {
LockSupport.unpark((Thread)old); LockSupport.unpark((Thread)old);
} }

View File

@ -17,7 +17,6 @@
package org.springframework.http.client; package org.springframework.http.client;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -38,6 +37,13 @@ import static org.assertj.core.api.Assertions.assertThatIOException;
*/ */
class OutputStreamPublisherTests { class OutputStreamPublisherTests {
private static final byte[] FOO = "foo".getBytes(StandardCharsets.UTF_8);
private static final byte[] BAR = "bar".getBytes(StandardCharsets.UTF_8);
private static final byte[] BAZ = "baz".getBytes(StandardCharsets.UTF_8);
private final Executor executor = Executors.newSingleThreadExecutor(); private final Executor executor = Executors.newSingleThreadExecutor();
private final OutputStreamPublisher.ByteMapper<byte[]> byteMapper = private final OutputStreamPublisher.ByteMapper<byte[]> byteMapper =
@ -59,11 +65,9 @@ class OutputStreamPublisherTests {
@Test @Test
void basic() { void basic() {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> { Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { outputStream.write(FOO);
writer.write("foo"); outputStream.write(BAR);
writer.write("bar"); outputStream.write(BAZ);
writer.write("baz");
}
}, this.byteMapper, this.executor); }, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher); Flux<String> flux = toString(flowPublisher);
@ -75,14 +79,12 @@ class OutputStreamPublisherTests {
@Test @Test
void flush() { void flush() {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> { Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { outputStream.write(FOO);
writer.write("foo"); outputStream.flush();
writer.flush(); outputStream.write(BAR);
writer.write("bar"); outputStream.flush();
writer.flush(); outputStream.write(BAZ);
writer.write("baz"); outputStream.flush();
writer.flush();
}
}, this.byteMapper, this.executor); }, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher); Flux<String> flux = toString(flowPublisher);
@ -93,22 +95,37 @@ class OutputStreamPublisherTests {
.verifyComplete(); .verifyComplete();
} }
@Test
void chunkSize() {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
outputStream.write(FOO);
outputStream.write(BAR);
outputStream.write(BAZ);
}, this.byteMapper, this.executor, 3);
Flux<String> flux = toString(flowPublisher);
StepVerifier.create(flux)
.assertNext(s -> assertThat(s).isEqualTo("foo"))
.assertNext(s -> assertThat(s).isEqualTo("bar"))
.assertNext(s -> assertThat(s).isEqualTo("baz"))
.verifyComplete();
}
@Test @Test
void cancel() throws InterruptedException { void cancel() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> { Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { assertThatIOException()
assertThatIOException() .isThrownBy(() -> {
.isThrownBy(() -> { outputStream.write(FOO);
writer.write("foo"); outputStream.flush();
writer.flush(); outputStream.write(BAR);
writer.write("bar"); outputStream.flush();
writer.flush(); })
}) .withMessage("Subscription has been terminated");
.withMessage("Subscription has been terminated"); latch.countDown();
latch.countDown();
}
}, this.byteMapper, this.executor); }, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher); Flux<String> flux = toString(flowPublisher);
@ -125,7 +142,7 @@ class OutputStreamPublisherTests {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> { Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
writer.write("foo"); writer.write("foo");
writer.close(); writer.close();
assertThatIOException().isThrownBy(() -> writer.write("bar")) assertThatIOException().isThrownBy(() -> writer.write("bar"))
@ -146,12 +163,12 @@ class OutputStreamPublisherTests {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> { Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try(Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { try (outputStream) {
writer.write("foo"); outputStream.write(FOO);
writer.flush(); outputStream.flush();
writer.write("foo"); outputStream.write(BAR);
writer.flush(); outputStream.flush();
} }
finally { finally {
latch.countDown(); latch.countDown();
} }