Make OutputStreamPublisher more generic

This commit improves the OutputStreamPublisher so that it is capable
of publishing other types that ByteBuffers.
This commit is contained in:
Arjen Poutsma 2023-07-05 15:20:34 +02:00
parent d0a2820af4
commit a6c5692586
3 changed files with 113 additions and 44 deletions

View File

@ -47,6 +47,8 @@ import org.springframework.util.StringUtils;
*/
class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
private static final OutputStreamPublisher.ByteMapper<ByteBuffer> BYTE_MAPPER = new ByteBufferMapper();
private static final Set<String> DISALLOWED_HEADERS = disallowedHeaders();
/**
@ -142,6 +144,7 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
if (body != null) {
Flow.Publisher<ByteBuffer> outputStreamPublisher = OutputStreamPublisher.create(
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
BYTE_MAPPER,
this.executor);
long contentLength = headers.getContentLength();
@ -157,4 +160,25 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
}
}
private static final class ByteBufferMapper implements OutputStreamPublisher.ByteMapper<ByteBuffer> {
@Override
public ByteBuffer map(int b) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1);
byteBuffer.put((byte) b);
byteBuffer.flip();
return byteBuffer;
}
@Override
public ByteBuffer map(byte[] b, int off, int len) {
ByteBuffer byteBuffer = ByteBuffer.allocate(len);
byteBuffer.put(b, off, len);
byteBuffer.flip();
return byteBuffer;
}
}
}

View File

@ -19,7 +19,6 @@ package org.springframework.http.client;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
@ -32,37 +31,42 @@ import org.springframework.util.Assert;
/**
* Bridges between {@link OutputStream} and
* {@link Flow.Publisher Flow.Publisher&lt;ByteBuffer&gt;}.
*
* {@link Flow.Publisher Flow.Publisher&lt;T&gt;}.
* @author Oleh Dokuka
* @author Arjen Poutsma
* @since 6.1
* @see #create(OutputStreamHandler, Executor)
* @param <T> the published item type
* @see #create(OutputStreamHandler, ByteMapper, Executor)
*/
final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
private final OutputStreamHandler outputStreamHandler;
private final ByteMapper<T> byteMapper;
private final Executor executor;
private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, Executor executor) {
private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper, Executor executor) {
this.outputStreamHandler = outputStreamHandler;
this.byteMapper = byteMapper;
this.executor = executor;
}
/**
* Creates a new {@code Publisher<ByteBuffer>} based on bytes written to a
* {@code OutputStream}.
* Creates a new {@code Publisher<T>} based on bytes written to a
* {@code OutputStream}. The parameter {@code byteMapper} is used to map
* from written bytes to the published type.
* <ul>
* <li>The parameter {@code outputStreamHandler} is invoked once per
* subscription of the returned {@code Publisher}, when the first
* {@code ByteBuffer} is
* item is
* {@linkplain Flow.Subscription#request(long) requested}.</li>
* <li>Each {@link OutputStream#write(byte[], int, int) OutputStream.write()}
* invocation that {@code outputStreamHandler} makes will result in a
* {@linkplain Flow.Subscriber#onNext(Object) published} {@code ByteBuffer}
* {@linkplain Flow.Subscriber#onNext(Object) published} item
* if there is {@linkplain Flow.Subscription#request(long) demand}.</li>
* <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block
* until there is.</li>
@ -74,23 +78,28 @@ final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
* be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}.
* </ul>
* @param outputStreamHandler invoked when the first buffer is requested
* @param byteMapper maps written bytes to {@code T}
* @param executor used to invoke the {@code outputStreamHandler}
* @return a {@code Publisher<ByteBuffer>} based on bytes written by
* {@code outputStreamHandler}
* @param <T> the publisher type
* @return a {@code Publisher<T>} based on bytes written by
* {@code outputStreamHandler} mapped by {@code byteMapper}
*/
public static Flow.Publisher<ByteBuffer> create(OutputStreamHandler outputStreamHandler, Executor executor) {
public static <T> Flow.Publisher<T> create(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
Executor executor) {
Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null");
Assert.notNull(byteMapper, "ByteMapper must not be null");
Assert.notNull(executor, "Executor must not be null");
return new OutputStreamPublisher(outputStreamHandler, executor);
return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor);
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "Subscriber must not be null");
OutputStreamSubscription subscription = new OutputStreamSubscription(subscriber, this.outputStreamHandler);
OutputStreamSubscription<T> subscription = new OutputStreamSubscription<>(subscriber, this.outputStreamHandler,
this.byteMapper);
subscriber.onSubscribe(subscription);
this.executor.execute(subscription::invokeHandler);
}
@ -109,7 +118,8 @@ final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
* <li>If the linked subscription has
* {@linkplain Flow.Subscription#request(long) demand}, any
* {@linkplain OutputStream#write(byte[], int, int) written} bytes
* will be {@linkplain Flow.Subscriber#onNext(Object) published} to the
* will be {@linkplain ByteMapper#map(byte[], int, int) mapped}
* and {@linkplain Flow.Subscriber#onNext(Object) published} to the
* {@link Flow.Subscriber Subscriber}.</li>
* <li>If there is no demand, any
* {@link OutputStream#write(byte[], int, int) write()} invocations will
@ -128,14 +138,37 @@ final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
}
private static final class OutputStreamSubscription extends OutputStream implements Flow.Subscription {
/**
* Maps bytes written to in {@link OutputStreamHandler#handle(OutputStream)}
* to published items.
* @param <T> the type to map to
*/
public interface ByteMapper<T> {
/**
* Maps a single byte to {@code T}.
*/
T map(int b);
/**
* Maps a byte array to {@code T}.
*/
T map(byte[] b, int off, int len);
}
private static final class OutputStreamSubscription<T> extends OutputStream implements Flow.Subscription {
static final Object READY = new Object();
private final Flow.Subscriber<? super ByteBuffer> actual;
private final Flow.Subscriber<? super T> actual;
private final OutputStreamHandler outputStreamHandler;
private final ByteMapper<T> byteMapper;
private final AtomicLong requested = new AtomicLong();
private final AtomicReference<Object> parkedThreadAtomic = new AtomicReference<>();
@ -146,9 +179,10 @@ final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
private long produced;
public OutputStreamSubscription(Flow.Subscriber<? super ByteBuffer> actual,
OutputStreamHandler outputStreamHandler) {
public OutputStreamSubscription(Flow.Subscriber<? super T> actual, OutputStreamHandler outputStreamHandler,
ByteMapper<T> byteMapper) {
this.actual = actual;
this.byteMapper = byteMapper;
this.outputStreamHandler = outputStreamHandler;
}
@ -156,11 +190,9 @@ final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
public void write(int b) throws IOException {
checkDemandAndAwaitIfNeeded();
ByteBuffer byteBuffer = ByteBuffer.allocate(1);
byteBuffer.put((byte) b);
byteBuffer.flip();
T next = this.byteMapper.map(b);
this.actual.onNext(byteBuffer);
this.actual.onNext(next);
this.produced++;
}
@ -174,11 +206,9 @@ final class OutputStreamPublisher implements Flow.Publisher<ByteBuffer> {
public void write(byte[] b, int off, int len) throws IOException {
checkDemandAndAwaitIfNeeded();
ByteBuffer byteBuffer = ByteBuffer.allocate(len);
byteBuffer.put(b, off, len);
byteBuffer.flip();
T next = this.byteMapper.map(b, off, len);
this.actual.onNext(byteBuffer);
this.actual.onNext(next);
this.produced++;
}

View File

@ -18,7 +18,6 @@ package org.springframework.http.client;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -41,15 +40,31 @@ class OutputStreamPublisherTests {
private final Executor executor = Executors.newSingleThreadExecutor();
private final OutputStreamPublisher.ByteMapper<byte[]> byteMapper =
new OutputStreamPublisher.ByteMapper<>() {
@Override
public byte[] map(int b) {
return new byte[]{(byte) b};
}
@Override
public byte[] map(byte[] b, int off, int len) {
byte[] result = new byte[len];
System.arraycopy(b, off, result, 0, len);
return result;
}
};
@Test
void basic() {
Flow.Publisher<ByteBuffer> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
writer.write("foo");
writer.write("bar");
writer.write("baz");
}
}, this.executor);
}, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher);
StepVerifier.create(flux)
@ -59,7 +74,7 @@ class OutputStreamPublisherTests {
@Test
void flush() {
Flow.Publisher<ByteBuffer> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
writer.write("foo");
writer.flush();
@ -68,7 +83,7 @@ class OutputStreamPublisherTests {
writer.write("baz");
writer.flush();
}
}, this.executor);
}, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher);
StepVerifier.create(flux)
@ -82,7 +97,7 @@ class OutputStreamPublisherTests {
void cancel() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<ByteBuffer> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
assertThatIOException()
.isThrownBy(() -> {
@ -94,7 +109,7 @@ class OutputStreamPublisherTests {
.withMessage("Subscription has been terminated");
latch.countDown();
}
}, this.executor);
}, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher);
StepVerifier.create(flux, 1)
@ -109,14 +124,14 @@ class OutputStreamPublisherTests {
void closed() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<ByteBuffer> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
writer.write("foo");
writer.close();
assertThatIOException().isThrownBy(() -> writer.write("bar"))
.withMessage("Stream closed");
latch.countDown();
}, this.executor);
}, this.byteMapper, this.executor);
Flux<String> flux = toString(flowPublisher);
StepVerifier.create(flux)
@ -130,7 +145,7 @@ class OutputStreamPublisherTests {
void negativeRequestN() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flow.Publisher<ByteBuffer> flowPublisher = OutputStreamPublisher.create(outputStream -> {
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
try(Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
writer.write("foo");
writer.flush();
@ -140,7 +155,7 @@ class OutputStreamPublisherTests {
finally {
latch.countDown();
}
}, this.executor);
}, this.byteMapper, this.executor);
Flow.Subscription[] subscriptions = new Flow.Subscription[1];
Flux<String> flux = toString(a-> flowPublisher.subscribe(new Flow.Subscriber<>() {
@Override
@ -150,7 +165,7 @@ class OutputStreamPublisherTests {
}
@Override
public void onNext(ByteBuffer item) {
public void onNext(byte[] item) {
a.onNext(item);
}
@ -174,9 +189,9 @@ class OutputStreamPublisherTests {
latch.await();
}
private static Flux<String> toString(Flow.Publisher<ByteBuffer> flowPublisher) {
private static Flux<String> toString(Flow.Publisher<byte[]> flowPublisher) {
return Flux.from(FlowAdapters.toPublisher(flowPublisher))
.map(bb -> StandardCharsets.UTF_8.decode(bb).toString());
.map(bytes -> new String(bytes, StandardCharsets.UTF_8));
}
}