Removed BlockingSignalQueue in favor of PublisherSignal.
This commit is contained in:
parent
0ec29d1c67
commit
5bbeb9c204
|
@ -1,4 +1,4 @@
|
||||||
package org.springframework.reactive.io;/*
|
/*
|
||||||
* Copyright 2002-2015 the original author or authors.
|
* Copyright 2002-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
@ -14,57 +14,77 @@ package org.springframework.reactive.io;/*
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
package org.springframework.reactive.io;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
import org.reactivestreams.Subscriber;
|
||||||
|
import org.reactivestreams.Subscription;
|
||||||
|
|
||||||
import org.springframework.reactive.util.BlockingSignalQueue;
|
import org.springframework.reactive.util.PublisherSignal;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@code InputStream} implementation based on a byte array {@link Publisher}.
|
* {@code InputStream} implementation based on a byte array {@link Publisher}.
|
||||||
*
|
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
*/
|
*/
|
||||||
public class ByteArrayPublisherInputStream extends InputStream {
|
public class ByteArrayPublisherInputStream extends InputStream {
|
||||||
|
|
||||||
private final BlockingSignalQueue<byte[]> queue;
|
private final BlockingQueue<PublisherSignal<byte[]>> queue =
|
||||||
|
new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
private ByteArrayInputStream currentStream;
|
private ByteArrayInputStream currentStream;
|
||||||
|
|
||||||
|
private boolean completed;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
|
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
|
||||||
* @param publisher the publisher to use
|
* @param publisher the publisher to use
|
||||||
*/
|
*/
|
||||||
public ByteArrayPublisherInputStream(Publisher<byte[]> publisher) {
|
public ByteArrayPublisherInputStream(Publisher<byte[]> publisher) {
|
||||||
|
this(publisher, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
|
||||||
|
* @param publisher the publisher to use
|
||||||
|
* @param requestSize the {@linkplain Subscription#request(long) request size} to use
|
||||||
|
* on the publisher
|
||||||
|
*/
|
||||||
|
public ByteArrayPublisherInputStream(Publisher<byte[]> publisher, long requestSize) {
|
||||||
Assert.notNull(publisher, "'publisher' must not be null");
|
Assert.notNull(publisher, "'publisher' must not be null");
|
||||||
|
|
||||||
this.queue = new BlockingSignalQueue<byte[]>();
|
publisher.subscribe(new BlockingQueueSubscriber(requestSize));
|
||||||
publisher.subscribe(this.queue.subscriber());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteArrayPublisherInputStream(BlockingSignalQueue<byte[]> queue) {
|
|
||||||
Assert.notNull(queue, "'queue' must not be null");
|
|
||||||
this.queue = queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int available() throws IOException {
|
public int available() throws IOException {
|
||||||
|
if (completed) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
InputStream is = currentStream();
|
InputStream is = currentStream();
|
||||||
return is != null ? is.available() : 0;
|
return is != null ? is.available() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
|
if (completed) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
InputStream is = currentStream();
|
InputStream is = currentStream();
|
||||||
while (is != null) {
|
while (is != null) {
|
||||||
int ch = is.read();
|
int ch = is.read();
|
||||||
if (ch != -1) {
|
if (ch != -1) {
|
||||||
return ch;
|
return ch;
|
||||||
} else {
|
}
|
||||||
|
else {
|
||||||
is = currentStream();
|
is = currentStream();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,6 +93,9 @@ public class ByteArrayPublisherInputStream extends InputStream {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read(byte[] b, int off, int len) throws IOException {
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (completed) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
InputStream is = currentStream();
|
InputStream is = currentStream();
|
||||||
if (is == null) {
|
if (is == null) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -105,23 +128,84 @@ public class ByteArrayPublisherInputStream extends InputStream {
|
||||||
if (this.currentStream != null && this.currentStream.available() > 0) {
|
if (this.currentStream != null && this.currentStream.available() > 0) {
|
||||||
return this.currentStream;
|
return this.currentStream;
|
||||||
}
|
}
|
||||||
else if (this.queue.isComplete()) {
|
else {
|
||||||
return null;
|
// take() blocks, but that's OK since this is a *blocking* InputStream
|
||||||
}
|
PublisherSignal<byte[]> signal = this.queue.take();
|
||||||
else if (this.queue.isHeadSignal()) {
|
|
||||||
byte[] current = this.queue.pollSignal();
|
if (signal.isData()) {
|
||||||
this.currentStream = new ByteArrayInputStream(current);
|
byte[] data = signal.data();
|
||||||
return this.currentStream;
|
this.currentStream = new ByteArrayInputStream(data);
|
||||||
}
|
return this.currentStream;
|
||||||
else if (this.queue.isHeadError()) {
|
}
|
||||||
Throwable t = this.queue.pollError();
|
else if (signal.isComplete()) {
|
||||||
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
this.completed = true;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
else if (signal.isError()) {
|
||||||
|
Throwable error = signal.error();
|
||||||
|
this.completed = true;
|
||||||
|
if (error instanceof IOException) {
|
||||||
|
throw (IOException) error;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IOException(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (InterruptedException ex) {
|
catch (InterruptedException ex) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
return null;
|
throw new IOException();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class BlockingQueueSubscriber implements Subscriber<byte[]> {
|
||||||
|
|
||||||
|
private final long requestSize;
|
||||||
|
|
||||||
|
private Subscription subscription;
|
||||||
|
|
||||||
|
public BlockingQueueSubscriber(long requestSize) {
|
||||||
|
this.requestSize = requestSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Subscription subscription) {
|
||||||
|
this.subscription = subscription;
|
||||||
|
|
||||||
|
this.subscription.request(this.requestSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(byte[] bytes) {
|
||||||
|
try {
|
||||||
|
queue.put(PublisherSignal.data(bytes));
|
||||||
|
this.subscription.request(requestSize);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ex) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
try {
|
||||||
|
queue.put(PublisherSignal.error(t));
|
||||||
|
}
|
||||||
|
catch (InterruptedException ex) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
try {
|
||||||
|
queue.put(PublisherSignal.complete());
|
||||||
|
}
|
||||||
|
catch (InterruptedException ex) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
package org.springframework.reactive.io;
|
package org.springframework.reactive.io;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Arrays;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
import reactor.rx.Streams;
|
||||||
import org.springframework.reactive.util.BlockingSignalQueue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@code OutputStream} implementation that stores all written bytes, to be retrieved
|
* {@code OutputStream} implementation that stores all written bytes, to be retrieved
|
||||||
|
@ -15,14 +31,15 @@ import org.springframework.reactive.util.BlockingSignalQueue;
|
||||||
*/
|
*/
|
||||||
public class ByteArrayPublisherOutputStream extends OutputStream {
|
public class ByteArrayPublisherOutputStream extends OutputStream {
|
||||||
|
|
||||||
private final BlockingSignalQueue<byte[]> queue = new BlockingSignalQueue<>();
|
private final List<byte[]> buffers = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the written data as a {@code Publisher}.
|
* Returns the written data as a {@code Publisher}.
|
||||||
* @return a publisher for the written bytes
|
* @return a publisher for the written bytes
|
||||||
*/
|
*/
|
||||||
public Publisher<byte[]> toByteArrayPublisher() {
|
public Publisher<byte[]> toByteArrayPublisher() {
|
||||||
return this.queue.publisher();
|
return Streams.from(buffers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -32,22 +49,9 @@ public class ByteArrayPublisherOutputStream extends OutputStream {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(byte[] b, int off, int len) throws IOException {
|
public void write(byte[] b, int off, int len) throws IOException {
|
||||||
byte[] copy = Arrays.copyOf(b, len);
|
byte[] copy = new byte[len - off];
|
||||||
try {
|
System.arraycopy(b, off, copy, 0, len);
|
||||||
this.queue.putSignal(copy);
|
buffers.add(copy);
|
||||||
}
|
|
||||||
catch (InterruptedException ex) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
try {
|
|
||||||
this.queue.complete();
|
|
||||||
}
|
|
||||||
catch (InterruptedException ex) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,268 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.util;
|
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
|
||||||
import org.reactivestreams.Subscriber;
|
|
||||||
import org.reactivestreams.Subscription;
|
|
||||||
|
|
||||||
import org.springframework.util.Assert;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A {@link BlockingQueue} aimed at working with {@code Publisher<byte[]>} instances.
|
|
||||||
* Mainly meant to bridge between reactive and non-reactive APIs, such as blocking
|
|
||||||
* streams.
|
|
||||||
*
|
|
||||||
* <p>Typically, this class will be used by two threads: one thread to put new elements on
|
|
||||||
* the stack by calling {@link #putSignal(Object)}, possibly {@link #putError(Throwable)}
|
|
||||||
* and finally {@link #complete()}. The other thread will read elements by calling {@link
|
|
||||||
* #isHeadSignal()}/{@link #pollSignal()} and {@link #isHeadError()}/{@link #pollError()},
|
|
||||||
* while keeping an eye on {@link #isComplete()}.
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
public class BlockingSignalQueue<T> {
|
|
||||||
|
|
||||||
private static final int DEFAULT_REQUEST_SIZE_SUBSCRIBER = 1;
|
|
||||||
|
|
||||||
private final BlockingQueue<Signal<T>> queue = new LinkedBlockingQueue<Signal<T>>();
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Inserts the specified signal into this queue, waiting if necessary for space to
|
|
||||||
* become available.
|
|
||||||
* @param t the signal to add
|
|
||||||
*/
|
|
||||||
public void putSignal(T t) throws InterruptedException {
|
|
||||||
Assert.notNull(t, "'t' must not be null");
|
|
||||||
Assert.state(!isComplete(), "Cannot put signal in queue after complete()");
|
|
||||||
this.queue.put(new OnNext<T>(t));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Inserts the specified error into this queue, waiting if necessary for space to
|
|
||||||
* become available.
|
|
||||||
* @param error the error to add
|
|
||||||
*/
|
|
||||||
public void putError(Throwable error) throws InterruptedException {
|
|
||||||
Assert.notNull(error, "'error' must not be null");
|
|
||||||
Assert.state(!isComplete(), "Cannot putSignal errors in queue after complete()");
|
|
||||||
this.queue.put(new OnError<T>(error));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Marks the queue as complete.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void complete() throws InterruptedException {
|
|
||||||
this.queue.put(OnComplete.INSTANCE);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether the current head of this queue is a signal.
|
|
||||||
* @return {@code true} if the current head is a signal; {@code false} otherwise
|
|
||||||
*/
|
|
||||||
public boolean isHeadSignal() {
|
|
||||||
Signal signal = this.queue.peek();
|
|
||||||
return signal != null && signal.isOnNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether the current head of this queue is a {@link Throwable}.
|
|
||||||
* @return {@code true} if the current head is an error; {@code false} otherwise
|
|
||||||
*/
|
|
||||||
public boolean isHeadError() {
|
|
||||||
Signal signal = this.queue.peek();
|
|
||||||
return signal != null && signal.isOnError();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether there are more buffers or errors in this queue.
|
|
||||||
* @return {@code true} if there more elements in this queue; {@code false} otherwise
|
|
||||||
*/
|
|
||||||
public boolean isComplete() {
|
|
||||||
Signal signal = this.queue.peek();
|
|
||||||
return signal != null && signal.isComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieves and removes the signal head of this queue. Should only be called after
|
|
||||||
* {@link #isHeadSignal()} returns {@code true}.
|
|
||||||
* @return the head of the queue
|
|
||||||
* @throws IllegalStateException if the current head of this queue is not a buffer
|
|
||||||
* @see #isHeadSignal()
|
|
||||||
*/
|
|
||||||
public T pollSignal() throws InterruptedException {
|
|
||||||
Signal<T> signal = this.queue.take();
|
|
||||||
return signal != null ? signal.next() : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieves and removes the buffer error of this queue. Should only be called after
|
|
||||||
* {@link #isHeadError()} returns {@code true}.
|
|
||||||
* @return the head of the queue, as error
|
|
||||||
* @throws IllegalStateException if the current head of this queue is not a error
|
|
||||||
* @see #isHeadError()
|
|
||||||
*/
|
|
||||||
public Throwable pollError() throws InterruptedException {
|
|
||||||
Signal signal = this.queue.take();
|
|
||||||
return signal != null ? signal.error() : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@code Publisher} backed by this queue.
|
|
||||||
*/
|
|
||||||
public Publisher<T> publisher() {
|
|
||||||
return new BlockingSignalQueuePublisher();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@code Subscriber} backed by this queue.
|
|
||||||
*/
|
|
||||||
public Subscriber<T> subscriber() {
|
|
||||||
return subscriber(DEFAULT_REQUEST_SIZE_SUBSCRIBER);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@code Subscriber} backed by this queue, with the given request size.
|
|
||||||
* @see Subscription#request(long)
|
|
||||||
*/
|
|
||||||
public Subscriber<T> subscriber(long requestSize) {
|
|
||||||
return new BlockingSignalQueueSubscriber(requestSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
private class BlockingSignalQueuePublisher implements Publisher<T> {
|
|
||||||
|
|
||||||
private Subscriber<? super T> subscriber;
|
|
||||||
|
|
||||||
private final Object subscriberMutex = new Object();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void subscribe(Subscriber<? super T> subscriber) {
|
|
||||||
synchronized (this.subscriberMutex) {
|
|
||||||
if (this.subscriber != null) {
|
|
||||||
subscriber.onError(
|
|
||||||
new IllegalStateException("Only one subscriber allowed"));
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
this.subscriber = subscriber;
|
|
||||||
final SubscriptionThread thread = new SubscriptionThread();
|
|
||||||
this.subscriber.onSubscribe(new Subscription() {
|
|
||||||
@Override
|
|
||||||
public void request(long n) {
|
|
||||||
thread.request(n);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancel() {
|
|
||||||
thread.cancel();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class SubscriptionThread extends Thread {
|
|
||||||
|
|
||||||
private final DemandCounter demand = new DemandCounter();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
while (!Thread.currentThread().isInterrupted()) {
|
|
||||||
if (this.demand.hasDemand() && isHeadSignal()) {
|
|
||||||
subscriber.onNext(pollSignal());
|
|
||||||
this.demand.decrement();
|
|
||||||
}
|
|
||||||
else if (isHeadError()) {
|
|
||||||
subscriber.onError(pollError());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (isComplete()) {
|
|
||||||
subscriber.onComplete();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (InterruptedException ex) {
|
|
||||||
// Allow thread to exit
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void request(long n) {
|
|
||||||
this.demand.increase(n);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void cancel() {
|
|
||||||
interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class BlockingSignalQueueSubscriber implements Subscriber<T> {
|
|
||||||
|
|
||||||
private final long requestSize;
|
|
||||||
|
|
||||||
private Subscription subscription;
|
|
||||||
|
|
||||||
public BlockingSignalQueueSubscriber(long requestSize) {
|
|
||||||
this.requestSize = requestSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription subscription) {
|
|
||||||
this.subscription = subscription;
|
|
||||||
|
|
||||||
this.subscription.request(this.requestSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(T t) {
|
|
||||||
try {
|
|
||||||
putSignal(t);
|
|
||||||
}
|
|
||||||
catch (InterruptedException ex) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
this.subscription.request(requestSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
try {
|
|
||||||
putError(t);
|
|
||||||
}
|
|
||||||
catch (InterruptedException ex) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
this.subscription.request(requestSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
try {
|
|
||||||
complete();
|
|
||||||
}
|
|
||||||
catch (InterruptedException ex) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,54 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.util;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
class OnComplete<T> implements Signal<T> {
|
|
||||||
|
|
||||||
public static final OnComplete INSTANCE = new OnComplete();
|
|
||||||
|
|
||||||
private OnComplete() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isComplete() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOnNext() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T next() {
|
|
||||||
throw new IllegalStateException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOnError() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Throwable error() {
|
|
||||||
throw new IllegalStateException();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.util;
|
|
||||||
|
|
||||||
import org.springframework.util.Assert;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
final class OnError<T> implements Signal<T> {
|
|
||||||
|
|
||||||
private final Throwable error;
|
|
||||||
|
|
||||||
public OnError(Throwable error) {
|
|
||||||
Assert.notNull(error, "'error' must not be null");
|
|
||||||
this.error = error;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOnError() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Throwable error() {
|
|
||||||
return error;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOnNext() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T next() {
|
|
||||||
throw new IllegalStateException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isComplete() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.util;
|
|
||||||
|
|
||||||
import org.springframework.util.Assert;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
class OnNext<T> implements Signal<T> {
|
|
||||||
|
|
||||||
private final T next;
|
|
||||||
|
|
||||||
public OnNext(T next) {
|
|
||||||
Assert.notNull(next, "'next' must not be null");
|
|
||||||
this.next = next;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOnNext() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T next() {
|
|
||||||
return next;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOnError() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Throwable error() {
|
|
||||||
throw new IllegalStateException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isComplete() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.reactive.util;
|
||||||
|
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a signal value object, useful for wrapping signals as published by a {@link
|
||||||
|
* #Publisher()}. Mostly used to store signals in buffers.
|
||||||
|
* @author Arjen Poutsma
|
||||||
|
*/
|
||||||
|
public abstract class PublisherSignal<T> {
|
||||||
|
|
||||||
|
protected PublisherSignal() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates whether this signal is an data signal, i.e. if {@link #data()} can be
|
||||||
|
* called safely.
|
||||||
|
* @return {@code true} if this signal contains data; {@code false} otherwise
|
||||||
|
*/
|
||||||
|
public boolean isData() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the data contained in this signal. Can only be safely called after {@link
|
||||||
|
* #isData()} returns {@code true}.
|
||||||
|
* @return the data
|
||||||
|
* @throws IllegalStateException if this signal does not contain data
|
||||||
|
*/
|
||||||
|
public T data() {
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates whether this signal is an error signal, i.e. if {@link #error()} can be
|
||||||
|
* called safely.
|
||||||
|
* @return {@code true} if this signal contains an error; {@code false} otherwise
|
||||||
|
*/
|
||||||
|
public boolean isError() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the error contained in this signal. Can only be safely called after {@link
|
||||||
|
* #isError()} returns {@code true}.
|
||||||
|
* @return the error
|
||||||
|
* @throws IllegalStateException if this signal does not contain an error
|
||||||
|
*/
|
||||||
|
public Throwable error() {
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates whether this signal completes the stream.
|
||||||
|
* @return {@code true} if this signal completes the stream; {@code false} otherwise
|
||||||
|
*/
|
||||||
|
public boolean isComplete() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new data signal with the given {@code t}.
|
||||||
|
* @param t the data to base the signal on
|
||||||
|
* @return the newly created signal
|
||||||
|
*/
|
||||||
|
public static <T> PublisherSignal<T> data(T t) {
|
||||||
|
Assert.notNull(t, "'t' must not be null");
|
||||||
|
return new DataSignal<>(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new error signal with the given {@code Throwable}.
|
||||||
|
* @param t the exception to base the signal on
|
||||||
|
* @return the newly created signal
|
||||||
|
*/
|
||||||
|
public static <T> PublisherSignal<T> error(Throwable t) {
|
||||||
|
Assert.notNull(t, "'t' must not be null");
|
||||||
|
return new ErrorSignal<>(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the complete signal, typically the last signal in a stream.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <T> PublisherSignal<T> complete() {
|
||||||
|
return (PublisherSignal<T>)ON_COMPLETE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class DataSignal<T> extends PublisherSignal<T> {
|
||||||
|
|
||||||
|
private final T data;
|
||||||
|
|
||||||
|
public DataSignal(T data) {
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isData() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T data() {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class ErrorSignal<T> extends PublisherSignal<T> {
|
||||||
|
|
||||||
|
private final Throwable error;
|
||||||
|
|
||||||
|
public ErrorSignal(Throwable error) {
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isError() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Throwable error() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private static final PublisherSignal ON_COMPLETE = new PublisherSignal() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isComplete() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,108 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.reactive.io;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
|
import org.reactivestreams.Subscriber;
|
||||||
|
import org.reactivestreams.Subscription;
|
||||||
|
import reactor.rx.Stream;
|
||||||
|
import reactor.rx.Streams;
|
||||||
|
|
||||||
|
import org.springframework.util.FileCopyUtils;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Arjen Poutsma
|
||||||
|
*/
|
||||||
|
public class ByteArrayPublisherInputStreamTests {
|
||||||
|
|
||||||
|
|
||||||
|
private ByteArrayPublisherInputStream is;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void createStream() {
|
||||||
|
Stream<byte[]> stream =
|
||||||
|
Streams.just(new byte[]{'a', 'b', 'c'}, new byte[]{'d', 'e'});
|
||||||
|
|
||||||
|
is = new ByteArrayPublisherInputStream(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void reactor() throws Exception {
|
||||||
|
assertEquals(3, is.available());
|
||||||
|
|
||||||
|
int ch = is.read();
|
||||||
|
assertEquals('a', ch);
|
||||||
|
ch = is.read();
|
||||||
|
assertEquals('b', ch);
|
||||||
|
ch = is.read();
|
||||||
|
assertEquals('c', ch);
|
||||||
|
|
||||||
|
assertEquals(2, is.available());
|
||||||
|
ch = is.read();
|
||||||
|
assertEquals('d', ch);
|
||||||
|
ch = is.read();
|
||||||
|
assertEquals('e', ch);
|
||||||
|
|
||||||
|
ch = is.read();
|
||||||
|
assertEquals(-1, ch);
|
||||||
|
|
||||||
|
assertEquals(0, is.available());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void copy() throws Exception {
|
||||||
|
ByteArrayPublisherOutputStream os = new ByteArrayPublisherOutputStream();
|
||||||
|
|
||||||
|
FileCopyUtils.copy(is, os);
|
||||||
|
|
||||||
|
Publisher<byte[]> publisher = os.toByteArrayPublisher();
|
||||||
|
|
||||||
|
publisher.subscribe(new Subscriber<byte[]>() {
|
||||||
|
List<byte[]> result = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Subscription s) {
|
||||||
|
s.request(Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(byte[] bytes) {
|
||||||
|
result.add(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
fail(t.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
assertArrayEquals(result.get(0), new byte[]{'a', 'b', 'c'});
|
||||||
|
assertArrayEquals(result.get(0), new byte[]{'d', 'e'});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,96 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.io;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import org.springframework.reactive.util.BlockingSignalQueue;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
public class ByteBufPublisherInputStreamTests {
|
|
||||||
|
|
||||||
private BlockingSignalQueue<byte[]> queue;
|
|
||||||
|
|
||||||
private ByteArrayPublisherInputStream is;
|
|
||||||
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
queue = new BlockingSignalQueue<byte[]>();
|
|
||||||
is = new ByteArrayPublisherInputStream(queue);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void readSingleByte() throws Exception {
|
|
||||||
queue.putSignal(new byte[]{'a', 'b', 'c'});
|
|
||||||
queue.putSignal(new byte[]{'d', 'e', 'f'});
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
|
|
||||||
int ch = is.read();
|
|
||||||
assertEquals('a', ch);
|
|
||||||
ch = is.read();
|
|
||||||
assertEquals('b', ch);
|
|
||||||
ch = is.read();
|
|
||||||
assertEquals('c', ch);
|
|
||||||
|
|
||||||
ch = is.read();
|
|
||||||
assertEquals('d', ch);
|
|
||||||
ch = is.read();
|
|
||||||
assertEquals('e', ch);
|
|
||||||
ch = is.read();
|
|
||||||
assertEquals('f', ch);
|
|
||||||
|
|
||||||
ch = is.read();
|
|
||||||
assertEquals(-1, ch);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void readBytes() throws Exception {
|
|
||||||
queue.putSignal(new byte[]{'a', 'b', 'c'});
|
|
||||||
queue.putSignal(new byte[]{'d', 'e', 'f'});
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
byte[] buf = new byte[2];
|
|
||||||
int read = this.is.read(buf);
|
|
||||||
assertEquals(2, read);
|
|
||||||
assertArrayEquals(new byte[] { 'a', 'b'}, buf);
|
|
||||||
|
|
||||||
read = this.is.read(buf);
|
|
||||||
assertEquals(1, read);
|
|
||||||
assertEquals('c', buf[0]);
|
|
||||||
|
|
||||||
read = this.is.read(buf);
|
|
||||||
assertEquals(2, read);
|
|
||||||
assertArrayEquals(new byte[] { 'd', 'e'}, buf);
|
|
||||||
|
|
||||||
read = this.is.read(buf);
|
|
||||||
assertEquals(1, read);
|
|
||||||
assertEquals('f', buf[0]);
|
|
||||||
|
|
||||||
read = this.is.read(buf);
|
|
||||||
assertEquals(-1, read);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,231 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.util;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.reactivestreams.Publisher;
|
|
||||||
import org.reactivestreams.Subscriber;
|
|
||||||
import org.reactivestreams.Subscription;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
public class BlockingByteBufQueuePublisherTests {
|
|
||||||
|
|
||||||
private BlockingSignalQueue<byte[]> queue;
|
|
||||||
|
|
||||||
private Publisher<byte[]> publisher;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
queue = new BlockingSignalQueue<byte[]>();
|
|
||||||
publisher = queue.publisher();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void normal() throws Exception {
|
|
||||||
byte[] abc = new byte[]{'a', 'b', 'c'};
|
|
||||||
byte[] def = new byte[]{'d', 'e', 'f'};
|
|
||||||
|
|
||||||
queue.putSignal(abc);
|
|
||||||
queue.putSignal(def);
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
final AtomicBoolean complete = new AtomicBoolean(false);
|
|
||||||
final List<byte[]> received = new ArrayList<byte[]>(2);
|
|
||||||
|
|
||||||
publisher.subscribe(new Subscriber<byte[]>() {
|
|
||||||
private Subscription subscription;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
s.request(1);
|
|
||||||
this.subscription = s;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(byte[] bytes) {
|
|
||||||
received.add(bytes);
|
|
||||||
this.subscription.request(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
fail("onError not expected");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
complete.set(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while (!complete.get()) {
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(2, received.size());
|
|
||||||
assertSame(abc, received.get(0));
|
|
||||||
assertSame(def, received.get(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void unbounded() throws Exception {
|
|
||||||
byte[] abc = new byte[]{'a', 'b', 'c'};
|
|
||||||
byte[] def = new byte[]{'d', 'e', 'f'};
|
|
||||||
|
|
||||||
queue.putSignal(abc);
|
|
||||||
queue.putSignal(def);
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
final AtomicBoolean complete = new AtomicBoolean(false);
|
|
||||||
final List<byte[]> received = new ArrayList<byte[]>(2);
|
|
||||||
|
|
||||||
publisher.subscribe(new Subscriber<byte[]>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
s.request(Long.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(byte[] bytes) {
|
|
||||||
received.add(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
fail("onError not expected");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
complete.set(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while (!complete.get()) {
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(2, received.size());
|
|
||||||
assertSame(abc, received.get(0));
|
|
||||||
assertSame(def, received.get(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void multipleSubscribe() throws Exception {
|
|
||||||
publisher.subscribe(new Subscriber<byte[]>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(byte[] bytes) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
|
|
||||||
}
|
|
||||||
});
|
|
||||||
publisher.subscribe(new Subscriber<byte[]>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
fail("onSubscribe not expected");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(byte[] bytes) {
|
|
||||||
fail("onNext not expected");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
assertTrue(t instanceof IllegalStateException);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
fail("onComplete not expected");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void cancel() throws Exception {
|
|
||||||
byte[] abc = new byte[]{'a', 'b', 'c'};
|
|
||||||
byte[] def = new byte[]{'d', 'e', 'f'};
|
|
||||||
|
|
||||||
queue.putSignal(abc);
|
|
||||||
queue.putSignal(def);
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
final AtomicBoolean complete = new AtomicBoolean(false);
|
|
||||||
final List<byte[]> received = new ArrayList<byte[]>(1);
|
|
||||||
|
|
||||||
publisher.subscribe(new Subscriber<byte[]>() {
|
|
||||||
|
|
||||||
private Subscription subscription;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
s.request(1);
|
|
||||||
this.subscription = s;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(byte[] bytes) {
|
|
||||||
received.add(bytes);
|
|
||||||
this.subscription.cancel();
|
|
||||||
complete.set(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
fail("onError not expected");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while (!complete.get()) {
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(1, received.size());
|
|
||||||
assertSame(abc, received.get(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,78 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2015 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.reactive.util;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Arjen Poutsma
|
|
||||||
*/
|
|
||||||
public class BlockingByteBufQueueTests {
|
|
||||||
|
|
||||||
private BlockingSignalQueue<byte[]> queue;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
queue = new BlockingSignalQueue<byte[]>();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void normal() throws Exception {
|
|
||||||
byte[] abc = new byte[]{'a', 'b', 'c'};
|
|
||||||
byte[] def = new byte[]{'d', 'e', 'f'};
|
|
||||||
|
|
||||||
queue.putSignal(abc);
|
|
||||||
queue.putSignal(def);
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
assertTrue(queue.isHeadSignal());
|
|
||||||
assertFalse(queue.isHeadError());
|
|
||||||
assertSame(abc, queue.pollSignal());
|
|
||||||
|
|
||||||
assertTrue(queue.isHeadSignal());
|
|
||||||
assertFalse(queue.isHeadError());
|
|
||||||
assertSame(def, queue.pollSignal());
|
|
||||||
|
|
||||||
assertTrue(queue.isComplete());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void error() throws Exception {
|
|
||||||
byte[] abc = new byte[]{'a', 'b', 'c'};
|
|
||||||
Throwable error = new IllegalStateException();
|
|
||||||
|
|
||||||
queue.putSignal(abc);
|
|
||||||
queue.putError(error);
|
|
||||||
queue.complete();
|
|
||||||
|
|
||||||
assertTrue(queue.isHeadSignal());
|
|
||||||
assertFalse(queue.isHeadError());
|
|
||||||
assertSame(abc, queue.pollSignal());
|
|
||||||
|
|
||||||
assertTrue(queue.isHeadError());
|
|
||||||
assertFalse(queue.isHeadSignal());
|
|
||||||
assertSame(error, queue.pollError());
|
|
||||||
|
|
||||||
assertTrue(queue.isComplete());
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue