diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java index e5ccea71e41..13f82b1c480 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java @@ -16,28 +16,27 @@ package org.springframework.reactive.io; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.util.Assert; +import reactor.Publishers; +import reactor.core.error.CancelException; + import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.reactive.util.PublisherSignal; -import org.springframework.util.Assert; /** * {@code InputStream} implementation based on a byte array {@link Publisher}. + * * @author Arjen Poutsma * @author Sebastien Deleuze + * @author Stephane Maldini */ public class ByteBufferPublisherInputStream extends InputStream { - private final BlockingQueue> queue = - new LinkedBlockingQueue<>(); + private final BlockingQueue queue; private ByteBufferInputStream currentStream; @@ -46,6 +45,7 @@ public class ByteBufferPublisherInputStream extends InputStream { /** * Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher. + * * @param publisher the publisher to use */ public ByteBufferPublisherInputStream(Publisher publisher) { @@ -54,14 +54,15 @@ public class ByteBufferPublisherInputStream extends InputStream { /** * Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher. - * @param publisher the publisher to use + * + * @param publisher the publisher to use * @param requestSize the {@linkplain Subscription#request(long) request size} to use - * on the publisher + * on the publisher bound to Integer MAX */ - public ByteBufferPublisherInputStream(Publisher publisher, long requestSize) { + public ByteBufferPublisherInputStream(Publisher publisher, int requestSize) { Assert.notNull(publisher, "'publisher' must not be null"); - publisher.subscribe(new BlockingQueueSubscriber(requestSize)); + this.queue = Publishers.toReadQueue(publisher, requestSize); } @@ -128,85 +129,29 @@ public class ByteBufferPublisherInputStream extends InputStream { try { if (this.currentStream != null && this.currentStream.available() > 0) { return this.currentStream; - } - else { - // take() blocks, but that's OK since this is a *blocking* InputStream - PublisherSignal signal = this.queue.take(); - - if (signal.isData()) { - ByteBuffer data = signal.data(); - this.currentStream = new ByteBufferInputStream(data); - return this.currentStream; - } - else if (signal.isComplete()) { + } else { + // take() blocks until next or complete() then return null, but that's OK since this is a *blocking* InputStream + ByteBuffer signal = this.queue.take(); + if(signal == null){ this.completed = true; return null; } - else if (signal.isError()) { - Throwable error = signal.error(); - this.completed = true; - if (error instanceof IOException) { - throw (IOException) error; - } - else { - throw new IOException(error); - } - } + this.currentStream = new ByteBufferInputStream(signal); + return this.currentStream; } } + catch (CancelException ce) { + this.completed = true; + return null; + } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } + catch (Throwable error ){ + this.completed = true; + throw new IOException(error); + } throw new IOException(); } - private class BlockingQueueSubscriber implements Subscriber { - - private final long requestSize; - - private Subscription subscription; - - public BlockingQueueSubscriber(long requestSize) { - this.requestSize = requestSize; - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - - this.subscription.request(this.requestSize); - } - - @Override - public void onNext(ByteBuffer bytes) { - try { - queue.put(PublisherSignal.data(bytes)); - this.subscription.request(requestSize); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void onError(Throwable t) { - try { - queue.put(PublisherSignal.error(t)); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void onComplete() { - try { - queue.put(PublisherSignal.complete()); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - } - } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java deleted file mode 100644 index 46ab3572eb6..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import org.reactivestreams.Publisher; - -import org.springframework.util.Assert; - -/** - * Represents a signal value object, useful for wrapping signals as published by a {@link - * Publisher}. Mostly used to store signals in buffers. - * @author Arjen Poutsma - */ -public abstract class PublisherSignal { - - protected PublisherSignal() { - } - - /** - * Indicates whether this signal is an data signal, i.e. if {@link #data()} can be - * called safely. - * @return {@code true} if this signal contains data; {@code false} otherwise - */ - public boolean isData() { - return false; - } - - /** - * Returns the data contained in this signal. Can only be safely called after {@link - * #isData()} returns {@code true}. - * @return the data - * @throws IllegalStateException if this signal does not contain data - */ - public T data() { - throw new IllegalStateException(); - } - - /** - * Indicates whether this signal is an error signal, i.e. if {@link #error()} can be - * called safely. - * @return {@code true} if this signal contains an error; {@code false} otherwise - */ - public boolean isError() { - return false; - } - - /** - * Returns the error contained in this signal. Can only be safely called after {@link - * #isError()} returns {@code true}. - * @return the error - * @throws IllegalStateException if this signal does not contain an error - */ - public Throwable error() { - throw new IllegalStateException(); - } - - /** - * Indicates whether this signal completes the stream. - * @return {@code true} if this signal completes the stream; {@code false} otherwise - */ - public boolean isComplete() { - return false; - } - - /** - * Creates a new data signal with the given {@code t}. - * @param t the data to base the signal on - * @return the newly created signal - */ - public static PublisherSignal data(T t) { - Assert.notNull(t, "'t' must not be null"); - return new DataSignal<>(t); - } - - /** - * Creates a new error signal with the given {@code Throwable}. - * @param t the exception to base the signal on - * @return the newly created signal - */ - public static PublisherSignal error(Throwable t) { - Assert.notNull(t, "'t' must not be null"); - return new ErrorSignal<>(t); - } - - /** - * Returns the complete signal, typically the last signal in a stream. - */ - @SuppressWarnings("unchecked") - public static PublisherSignal complete() { - return (PublisherSignal)ON_COMPLETE; - } - - private static final class DataSignal extends PublisherSignal { - - private final T data; - - public DataSignal(T data) { - this.data = data; - } - - @Override - public boolean isData() { - return true; - } - - @Override - public T data() { - return data; - } - } - - private static final class ErrorSignal extends PublisherSignal { - - private final Throwable error; - - public ErrorSignal(Throwable error) { - this.error = error; - } - - @Override - public boolean isError() { - return true; - } - - @Override - public Throwable error() { - return error; - } - - } - - @SuppressWarnings("rawtypes") - private static final PublisherSignal ON_COMPLETE = new PublisherSignal() { - - @Override - public boolean isComplete() { - return true; - } - - }; - -}