From 5bbeb9c204055eb3024ea31d77d86beda609dbf3 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 10 Sep 2015 11:05:30 +0200 Subject: [PATCH] Removed BlockingSignalQueue in favor of PublisherSignal. --- .../io/ByteArrayPublisherInputStream.java | 130 +++++++-- .../io/ByteArrayPublisherOutputStream.java | 46 +-- .../reactive/util/BlockingSignalQueue.java | 268 ------------------ .../reactive/util/OnComplete.java | 54 ---- .../reactive/util/OnError.java | 57 ---- .../springframework/reactive/util/OnNext.java | 57 ---- .../reactive/util/PublisherSignal.java | 154 ++++++++++ .../ByteArrayPublisherInputStreamTests.java | 108 +++++++ .../io/ByteBufPublisherInputStreamTests.java | 96 ------- .../BlockingByteBufQueuePublisherTests.java | 231 --------------- .../util/BlockingByteBufQueueTests.java | 78 ----- 11 files changed, 394 insertions(+), 885 deletions(-) delete mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/util/OnComplete.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/util/OnError.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/util/OnNext.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java delete mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteBufPublisherInputStreamTests.java delete mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueuePublisherTests.java delete mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueueTests.java diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java index 03b81aafa3..2156f654bf 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java @@ -1,4 +1,4 @@ -package org.springframework.reactive.io;/* +/* * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,57 +14,77 @@ package org.springframework.reactive.io;/* * limitations under the License. */ +package org.springframework.reactive.io; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; -import org.springframework.reactive.util.BlockingSignalQueue; +import org.springframework.reactive.util.PublisherSignal; import org.springframework.util.Assert; /** * {@code InputStream} implementation based on a byte array {@link Publisher}. - * * @author Arjen Poutsma */ public class ByteArrayPublisherInputStream extends InputStream { - private final BlockingSignalQueue queue; + private final BlockingQueue> queue = + new LinkedBlockingQueue<>(); private ByteArrayInputStream currentStream; + private boolean completed; + /** * Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher. * @param publisher the publisher to use */ public ByteArrayPublisherInputStream(Publisher publisher) { + this(publisher, 1); + } + + /** + * Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher. + * @param publisher the publisher to use + * @param requestSize the {@linkplain Subscription#request(long) request size} to use + * on the publisher + */ + public ByteArrayPublisherInputStream(Publisher publisher, long requestSize) { Assert.notNull(publisher, "'publisher' must not be null"); - this.queue = new BlockingSignalQueue(); - publisher.subscribe(this.queue.subscriber()); + publisher.subscribe(new BlockingQueueSubscriber(requestSize)); } - ByteArrayPublisherInputStream(BlockingSignalQueue queue) { - Assert.notNull(queue, "'queue' must not be null"); - this.queue = queue; - } @Override public int available() throws IOException { + if (completed) { + return 0; + } InputStream is = currentStream(); return is != null ? is.available() : 0; } @Override public int read() throws IOException { + if (completed) { + return -1; + } InputStream is = currentStream(); while (is != null) { int ch = is.read(); if (ch != -1) { return ch; - } else { + } + else { is = currentStream(); } } @@ -73,6 +93,9 @@ public class ByteArrayPublisherInputStream extends InputStream { @Override public int read(byte[] b, int off, int len) throws IOException { + if (completed) { + return -1; + } InputStream is = currentStream(); if (is == null) { return -1; @@ -105,23 +128,84 @@ public class ByteArrayPublisherInputStream extends InputStream { if (this.currentStream != null && this.currentStream.available() > 0) { return this.currentStream; } - else if (this.queue.isComplete()) { - return null; - } - else if (this.queue.isHeadSignal()) { - byte[] current = this.queue.pollSignal(); - this.currentStream = new ByteArrayInputStream(current); - return this.currentStream; - } - else if (this.queue.isHeadError()) { - Throwable t = this.queue.pollError(); - throw t instanceof IOException ? (IOException) t : new IOException(t); + else { + // take() blocks, but that's OK since this is a *blocking* InputStream + PublisherSignal signal = this.queue.take(); + + if (signal.isData()) { + byte[] data = signal.data(); + this.currentStream = new ByteArrayInputStream(data); + return this.currentStream; + } + else if (signal.isComplete()) { + this.completed = true; + return null; + } + else if (signal.isError()) { + Throwable error = signal.error(); + this.completed = true; + if (error instanceof IOException) { + throw (IOException) error; + } + else { + throw new IOException(error); + } + } } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } - return null; + throw new IOException(); + } + + private class BlockingQueueSubscriber implements Subscriber { + + private final long requestSize; + + private Subscription subscription; + + public BlockingQueueSubscriber(long requestSize) { + this.requestSize = requestSize; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + + this.subscription.request(this.requestSize); + } + + @Override + public void onNext(byte[] bytes) { + try { + queue.put(PublisherSignal.data(bytes)); + this.subscription.request(requestSize); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void onError(Throwable t) { + try { + queue.put(PublisherSignal.error(t)); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void onComplete() { + try { + queue.put(PublisherSignal.complete()); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java index 7cd84c6085..806a13c238 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java @@ -1,12 +1,28 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.reactive.io; import java.io.IOException; import java.io.OutputStream; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; import org.reactivestreams.Publisher; - -import org.springframework.reactive.util.BlockingSignalQueue; +import reactor.rx.Streams; /** * {@code OutputStream} implementation that stores all written bytes, to be retrieved @@ -15,14 +31,15 @@ import org.springframework.reactive.util.BlockingSignalQueue; */ public class ByteArrayPublisherOutputStream extends OutputStream { - private final BlockingSignalQueue queue = new BlockingSignalQueue<>(); + private final List buffers = new ArrayList<>(); + /** * Returns the written data as a {@code Publisher}. * @return a publisher for the written bytes */ public Publisher toByteArrayPublisher() { - return this.queue.publisher(); + return Streams.from(buffers); } @Override @@ -32,22 +49,9 @@ public class ByteArrayPublisherOutputStream extends OutputStream { @Override public void write(byte[] b, int off, int len) throws IOException { - byte[] copy = Arrays.copyOf(b, len); - try { - this.queue.putSignal(copy); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } + byte[] copy = new byte[len - off]; + System.arraycopy(b, off, copy, 0, len); + buffers.add(copy); } - @Override - public void close() throws IOException { - try { - this.queue.complete(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java deleted file mode 100644 index e8406f4dd7..0000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.util.Assert; - -/** - * A {@link BlockingQueue} aimed at working with {@code Publisher} instances. - * Mainly meant to bridge between reactive and non-reactive APIs, such as blocking - * streams. - * - *

Typically, this class will be used by two threads: one thread to put new elements on - * the stack by calling {@link #putSignal(Object)}, possibly {@link #putError(Throwable)} - * and finally {@link #complete()}. The other thread will read elements by calling {@link - * #isHeadSignal()}/{@link #pollSignal()} and {@link #isHeadError()}/{@link #pollError()}, - * while keeping an eye on {@link #isComplete()}. - * @author Arjen Poutsma - */ -public class BlockingSignalQueue { - - private static final int DEFAULT_REQUEST_SIZE_SUBSCRIBER = 1; - - private final BlockingQueue> queue = new LinkedBlockingQueue>(); - - - /** - * Inserts the specified signal into this queue, waiting if necessary for space to - * become available. - * @param t the signal to add - */ - public void putSignal(T t) throws InterruptedException { - Assert.notNull(t, "'t' must not be null"); - Assert.state(!isComplete(), "Cannot put signal in queue after complete()"); - this.queue.put(new OnNext(t)); - } - - /** - * Inserts the specified error into this queue, waiting if necessary for space to - * become available. - * @param error the error to add - */ - public void putError(Throwable error) throws InterruptedException { - Assert.notNull(error, "'error' must not be null"); - Assert.state(!isComplete(), "Cannot putSignal errors in queue after complete()"); - this.queue.put(new OnError(error)); - } - - /** - * Marks the queue as complete. - */ - @SuppressWarnings("unchecked") - public void complete() throws InterruptedException { - this.queue.put(OnComplete.INSTANCE); - } - - /** - * Indicates whether the current head of this queue is a signal. - * @return {@code true} if the current head is a signal; {@code false} otherwise - */ - public boolean isHeadSignal() { - Signal signal = this.queue.peek(); - return signal != null && signal.isOnNext(); - } - - /** - * Indicates whether the current head of this queue is a {@link Throwable}. - * @return {@code true} if the current head is an error; {@code false} otherwise - */ - public boolean isHeadError() { - Signal signal = this.queue.peek(); - return signal != null && signal.isOnError(); - } - - /** - * Indicates whether there are more buffers or errors in this queue. - * @return {@code true} if there more elements in this queue; {@code false} otherwise - */ - public boolean isComplete() { - Signal signal = this.queue.peek(); - return signal != null && signal.isComplete(); - } - - /** - * Retrieves and removes the signal head of this queue. Should only be called after - * {@link #isHeadSignal()} returns {@code true}. - * @return the head of the queue - * @throws IllegalStateException if the current head of this queue is not a buffer - * @see #isHeadSignal() - */ - public T pollSignal() throws InterruptedException { - Signal signal = this.queue.take(); - return signal != null ? signal.next() : null; - } - - /** - * Retrieves and removes the buffer error of this queue. Should only be called after - * {@link #isHeadError()} returns {@code true}. - * @return the head of the queue, as error - * @throws IllegalStateException if the current head of this queue is not a error - * @see #isHeadError() - */ - public Throwable pollError() throws InterruptedException { - Signal signal = this.queue.take(); - return signal != null ? signal.error() : null; - } - - /** - * Returns a {@code Publisher} backed by this queue. - */ - public Publisher publisher() { - return new BlockingSignalQueuePublisher(); - } - - /** - * Returns a {@code Subscriber} backed by this queue. - */ - public Subscriber subscriber() { - return subscriber(DEFAULT_REQUEST_SIZE_SUBSCRIBER); - } - - /** - * Returns a {@code Subscriber} backed by this queue, with the given request size. - * @see Subscription#request(long) - */ - public Subscriber subscriber(long requestSize) { - return new BlockingSignalQueueSubscriber(requestSize); - } - - private class BlockingSignalQueuePublisher implements Publisher { - - private Subscriber subscriber; - - private final Object subscriberMutex = new Object(); - - @Override - public void subscribe(Subscriber subscriber) { - synchronized (this.subscriberMutex) { - if (this.subscriber != null) { - subscriber.onError( - new IllegalStateException("Only one subscriber allowed")); - } - else { - this.subscriber = subscriber; - final SubscriptionThread thread = new SubscriptionThread(); - this.subscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - thread.request(n); - } - - @Override - public void cancel() { - thread.cancel(); - } - }); - thread.start(); - } - } - } - - private class SubscriptionThread extends Thread { - - private final DemandCounter demand = new DemandCounter(); - - @Override - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - if (this.demand.hasDemand() && isHeadSignal()) { - subscriber.onNext(pollSignal()); - this.demand.decrement(); - } - else if (isHeadError()) { - subscriber.onError(pollError()); - break; - } - else if (isComplete()) { - subscriber.onComplete(); - break; - } - } - } - catch (InterruptedException ex) { - // Allow thread to exit - } - } - - public void request(long n) { - this.demand.increase(n); - } - - public void cancel() { - interrupt(); - } - } - } - - private class BlockingSignalQueueSubscriber implements Subscriber { - - private final long requestSize; - - private Subscription subscription; - - public BlockingSignalQueueSubscriber(long requestSize) { - this.requestSize = requestSize; - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - - this.subscription.request(this.requestSize); - } - - @Override - public void onNext(T t) { - try { - putSignal(t); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - this.subscription.request(requestSize); - } - - @Override - public void onError(Throwable t) { - try { - putError(t); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - this.subscription.request(requestSize); - } - - @Override - public void onComplete() { - try { - complete(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnComplete.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnComplete.java deleted file mode 100644 index 47aeb326a8..0000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnComplete.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -/** - * @author Arjen Poutsma - */ -class OnComplete implements Signal { - - public static final OnComplete INSTANCE = new OnComplete(); - - private OnComplete() { - } - - @Override - public boolean isComplete() { - return true; - } - - @Override - public boolean isOnNext() { - return false; - } - - @Override - public T next() { - throw new IllegalStateException(); - } - - @Override - public boolean isOnError() { - return false; - } - - @Override - public Throwable error() { - throw new IllegalStateException(); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnError.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnError.java deleted file mode 100644 index 0286ac06a1..0000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnError.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -final class OnError implements Signal { - - private final Throwable error; - - public OnError(Throwable error) { - Assert.notNull(error, "'error' must not be null"); - this.error = error; - } - - @Override - public boolean isOnError() { - return true; - } - - @Override - public Throwable error() { - return error; - } - - @Override - public boolean isOnNext() { - return false; - } - - @Override - public T next() { - throw new IllegalStateException(); - } - - @Override - public boolean isComplete() { - return false; - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnNext.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnNext.java deleted file mode 100644 index f8e48e4b65..0000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/OnNext.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -class OnNext implements Signal { - - private final T next; - - public OnNext(T next) { - Assert.notNull(next, "'next' must not be null"); - this.next = next; - } - - @Override - public boolean isOnNext() { - return true; - } - - @Override - public T next() { - return next; - } - - @Override - public boolean isOnError() { - return false; - } - - @Override - public Throwable error() { - throw new IllegalStateException(); - } - - @Override - public boolean isComplete() { - return false; - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java new file mode 100644 index 0000000000..9fdd51c963 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/util/PublisherSignal.java @@ -0,0 +1,154 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.util; + +import org.springframework.util.Assert; + +/** + * Represents a signal value object, useful for wrapping signals as published by a {@link + * #Publisher()}. Mostly used to store signals in buffers. + * @author Arjen Poutsma + */ +public abstract class PublisherSignal { + + protected PublisherSignal() { + } + + /** + * Indicates whether this signal is an data signal, i.e. if {@link #data()} can be + * called safely. + * @return {@code true} if this signal contains data; {@code false} otherwise + */ + public boolean isData() { + return false; + } + + /** + * Returns the data contained in this signal. Can only be safely called after {@link + * #isData()} returns {@code true}. + * @return the data + * @throws IllegalStateException if this signal does not contain data + */ + public T data() { + throw new IllegalStateException(); + } + + /** + * Indicates whether this signal is an error signal, i.e. if {@link #error()} can be + * called safely. + * @return {@code true} if this signal contains an error; {@code false} otherwise + */ + public boolean isError() { + return false; + } + + /** + * Returns the error contained in this signal. Can only be safely called after {@link + * #isError()} returns {@code true}. + * @return the error + * @throws IllegalStateException if this signal does not contain an error + */ + public Throwable error() { + throw new IllegalStateException(); + } + + /** + * Indicates whether this signal completes the stream. + * @return {@code true} if this signal completes the stream; {@code false} otherwise + */ + public boolean isComplete() { + return false; + } + + /** + * Creates a new data signal with the given {@code t}. + * @param t the data to base the signal on + * @return the newly created signal + */ + public static PublisherSignal data(T t) { + Assert.notNull(t, "'t' must not be null"); + return new DataSignal<>(t); + } + + /** + * Creates a new error signal with the given {@code Throwable}. + * @param t the exception to base the signal on + * @return the newly created signal + */ + public static PublisherSignal error(Throwable t) { + Assert.notNull(t, "'t' must not be null"); + return new ErrorSignal<>(t); + } + + /** + * Returns the complete signal, typically the last signal in a stream. + */ + @SuppressWarnings("unchecked") + public static PublisherSignal complete() { + return (PublisherSignal)ON_COMPLETE; + } + + private static final class DataSignal extends PublisherSignal { + + private final T data; + + public DataSignal(T data) { + this.data = data; + } + + @Override + public boolean isData() { + return true; + } + + @Override + public T data() { + return data; + } + } + + private static final class ErrorSignal extends PublisherSignal { + + private final Throwable error; + + public ErrorSignal(Throwable error) { + this.error = error; + } + + @Override + public boolean isError() { + return true; + } + + @Override + public Throwable error() { + return error; + } + + } + + @SuppressWarnings("rawtypes") + private static final PublisherSignal ON_COMPLETE = new PublisherSignal() { + + @Override + public boolean isComplete() { + return true; + } + + }; + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java new file mode 100644 index 0000000000..134b07b501 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java @@ -0,0 +1,108 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.io; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.rx.Stream; +import reactor.rx.Streams; + +import org.springframework.util.FileCopyUtils; + +import static org.junit.Assert.*; + +/** + * @author Arjen Poutsma + */ +public class ByteArrayPublisherInputStreamTests { + + + private ByteArrayPublisherInputStream is; + + @Before + public void createStream() { + Stream stream = + Streams.just(new byte[]{'a', 'b', 'c'}, new byte[]{'d', 'e'}); + + is = new ByteArrayPublisherInputStream(stream); + } + + @Test + public void reactor() throws Exception { + assertEquals(3, is.available()); + + int ch = is.read(); + assertEquals('a', ch); + ch = is.read(); + assertEquals('b', ch); + ch = is.read(); + assertEquals('c', ch); + + assertEquals(2, is.available()); + ch = is.read(); + assertEquals('d', ch); + ch = is.read(); + assertEquals('e', ch); + + ch = is.read(); + assertEquals(-1, ch); + + assertEquals(0, is.available()); + } + + @Test + public void copy() throws Exception { + ByteArrayPublisherOutputStream os = new ByteArrayPublisherOutputStream(); + + FileCopyUtils.copy(is, os); + + Publisher publisher = os.toByteArrayPublisher(); + + publisher.subscribe(new Subscriber() { + List result = new ArrayList<>(); + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(byte[] bytes) { + result.add(bytes); + } + + @Override + public void onError(Throwable t) { + fail(t.getMessage()); + } + + @Override + public void onComplete() { + assertArrayEquals(result.get(0), new byte[]{'a', 'b', 'c'}); + assertArrayEquals(result.get(0), new byte[]{'d', 'e'}); + } + }); + + } + +} \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteBufPublisherInputStreamTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteBufPublisherInputStreamTests.java deleted file mode 100644 index 45ae841646..0000000000 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteBufPublisherInputStreamTests.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.io; - -import org.junit.Before; -import org.junit.Test; - -import org.springframework.reactive.util.BlockingSignalQueue; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -/** - * @author Arjen Poutsma - */ -public class ByteBufPublisherInputStreamTests { - - private BlockingSignalQueue queue; - - private ByteArrayPublisherInputStream is; - - - @Before - public void setUp() throws Exception { - queue = new BlockingSignalQueue(); - is = new ByteArrayPublisherInputStream(queue); - - } - - @Test - public void readSingleByte() throws Exception { - queue.putSignal(new byte[]{'a', 'b', 'c'}); - queue.putSignal(new byte[]{'d', 'e', 'f'}); - queue.complete(); - - - int ch = is.read(); - assertEquals('a', ch); - ch = is.read(); - assertEquals('b', ch); - ch = is.read(); - assertEquals('c', ch); - - ch = is.read(); - assertEquals('d', ch); - ch = is.read(); - assertEquals('e', ch); - ch = is.read(); - assertEquals('f', ch); - - ch = is.read(); - assertEquals(-1, ch); - } - - @Test - public void readBytes() throws Exception { - queue.putSignal(new byte[]{'a', 'b', 'c'}); - queue.putSignal(new byte[]{'d', 'e', 'f'}); - queue.complete(); - - byte[] buf = new byte[2]; - int read = this.is.read(buf); - assertEquals(2, read); - assertArrayEquals(new byte[] { 'a', 'b'}, buf); - - read = this.is.read(buf); - assertEquals(1, read); - assertEquals('c', buf[0]); - - read = this.is.read(buf); - assertEquals(2, read); - assertArrayEquals(new byte[] { 'd', 'e'}, buf); - - read = this.is.read(buf); - assertEquals(1, read); - assertEquals('f', buf[0]); - - read = this.is.read(buf); - assertEquals(-1, read); - } - -} \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueuePublisherTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueuePublisherTests.java deleted file mode 100644 index 6f332c62d8..0000000000 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueuePublisherTests.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.Before; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author Arjen Poutsma - */ -public class BlockingByteBufQueuePublisherTests { - - private BlockingSignalQueue queue; - - private Publisher publisher; - - @Before - public void setUp() throws Exception { - queue = new BlockingSignalQueue(); - publisher = queue.publisher(); - } - - @Test - public void normal() throws Exception { - byte[] abc = new byte[]{'a', 'b', 'c'}; - byte[] def = new byte[]{'d', 'e', 'f'}; - - queue.putSignal(abc); - queue.putSignal(def); - queue.complete(); - - final AtomicBoolean complete = new AtomicBoolean(false); - final List received = new ArrayList(2); - - publisher.subscribe(new Subscriber() { - private Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - s.request(1); - this.subscription = s; - } - - @Override - public void onNext(byte[] bytes) { - received.add(bytes); - this.subscription.request(1); - } - - @Override - public void onError(Throwable t) { - fail("onError not expected"); - } - - @Override - public void onComplete() { - complete.set(true); - } - }); - - while (!complete.get()) { - } - - assertEquals(2, received.size()); - assertSame(abc, received.get(0)); - assertSame(def, received.get(1)); - } - - @Test - public void unbounded() throws Exception { - byte[] abc = new byte[]{'a', 'b', 'c'}; - byte[] def = new byte[]{'d', 'e', 'f'}; - - queue.putSignal(abc); - queue.putSignal(def); - queue.complete(); - - final AtomicBoolean complete = new AtomicBoolean(false); - final List received = new ArrayList(2); - - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(byte[] bytes) { - received.add(bytes); - } - - @Override - public void onError(Throwable t) { - fail("onError not expected"); - } - - @Override - public void onComplete() { - complete.set(true); - } - }); - - while (!complete.get()) { - } - - assertEquals(2, received.size()); - assertSame(abc, received.get(0)); - assertSame(def, received.get(1)); - } - - @Test - public void multipleSubscribe() throws Exception { - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - - } - - @Override - public void onNext(byte[] bytes) { - - } - - @Override - public void onError(Throwable t) { - - } - - @Override - public void onComplete() { - - } - }); - publisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - fail("onSubscribe not expected"); - } - - @Override - public void onNext(byte[] bytes) { - fail("onNext not expected"); - } - - @Override - public void onError(Throwable t) { - assertTrue(t instanceof IllegalStateException); - } - - @Override - public void onComplete() { - fail("onComplete not expected"); - } - }); - } - - @Test - public void cancel() throws Exception { - byte[] abc = new byte[]{'a', 'b', 'c'}; - byte[] def = new byte[]{'d', 'e', 'f'}; - - queue.putSignal(abc); - queue.putSignal(def); - queue.complete(); - - final AtomicBoolean complete = new AtomicBoolean(false); - final List received = new ArrayList(1); - - publisher.subscribe(new Subscriber() { - - private Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - s.request(1); - this.subscription = s; - } - - @Override - public void onNext(byte[] bytes) { - received.add(bytes); - this.subscription.cancel(); - complete.set(true); - } - - @Override - public void onError(Throwable t) { - fail("onError not expected"); - } - - @Override - public void onComplete() { - } - }); - - while (!complete.get()) { - } - - assertEquals(1, received.size()); - assertSame(abc, received.get(0)); - } - - - -} \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueueTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueueTests.java deleted file mode 100644 index d39772d851..0000000000 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/util/BlockingByteBufQueueTests.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -/** - * @author Arjen Poutsma - */ -public class BlockingByteBufQueueTests { - - private BlockingSignalQueue queue; - - @Before - public void setUp() throws Exception { - queue = new BlockingSignalQueue(); - } - - @Test - public void normal() throws Exception { - byte[] abc = new byte[]{'a', 'b', 'c'}; - byte[] def = new byte[]{'d', 'e', 'f'}; - - queue.putSignal(abc); - queue.putSignal(def); - queue.complete(); - - assertTrue(queue.isHeadSignal()); - assertFalse(queue.isHeadError()); - assertSame(abc, queue.pollSignal()); - - assertTrue(queue.isHeadSignal()); - assertFalse(queue.isHeadError()); - assertSame(def, queue.pollSignal()); - - assertTrue(queue.isComplete()); - } - - - @Test - public void error() throws Exception { - byte[] abc = new byte[]{'a', 'b', 'c'}; - Throwable error = new IllegalStateException(); - - queue.putSignal(abc); - queue.putError(error); - queue.complete(); - - assertTrue(queue.isHeadSignal()); - assertFalse(queue.isHeadError()); - assertSame(abc, queue.pollSignal()); - - assertTrue(queue.isHeadError()); - assertFalse(queue.isHeadSignal()); - assertSame(error, queue.pollError()); - - assertTrue(queue.isComplete()); - } -} \ No newline at end of file