Tweaks to ByteBufPubInputStream to use available blockingQueue facilities

This commit is contained in:
Stephane Maldini 2015-10-16 09:10:07 +02:00 committed by Sebastien Deleuze
parent b11bef7a26
commit 80f9a21b9d
2 changed files with 29 additions and 240 deletions

View File

@ -16,28 +16,27 @@
package org.springframework.reactive.io;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
import reactor.Publishers;
import reactor.core.error.CancelException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.reactive.util.PublisherSignal;
import org.springframework.util.Assert;
/**
* {@code InputStream} implementation based on a byte array {@link Publisher}.
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @author Stephane Maldini
*/
public class ByteBufferPublisherInputStream extends InputStream {
private final BlockingQueue<PublisherSignal<ByteBuffer>> queue =
new LinkedBlockingQueue<>();
private final BlockingQueue<ByteBuffer> queue;
private ByteBufferInputStream currentStream;
@ -46,6 +45,7 @@ public class ByteBufferPublisherInputStream extends InputStream {
/**
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
*
* @param publisher the publisher to use
*/
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher) {
@ -54,14 +54,15 @@ public class ByteBufferPublisherInputStream extends InputStream {
/**
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
* @param publisher the publisher to use
*
* @param publisher the publisher to use
* @param requestSize the {@linkplain Subscription#request(long) request size} to use
* on the publisher
* on the publisher bound to Integer MAX
*/
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, long requestSize) {
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, int requestSize) {
Assert.notNull(publisher, "'publisher' must not be null");
publisher.subscribe(new BlockingQueueSubscriber(requestSize));
this.queue = Publishers.toReadQueue(publisher, requestSize);
}
@ -128,85 +129,29 @@ public class ByteBufferPublisherInputStream extends InputStream {
try {
if (this.currentStream != null && this.currentStream.available() > 0) {
return this.currentStream;
}
else {
// take() blocks, but that's OK since this is a *blocking* InputStream
PublisherSignal<ByteBuffer> signal = this.queue.take();
if (signal.isData()) {
ByteBuffer data = signal.data();
this.currentStream = new ByteBufferInputStream(data);
return this.currentStream;
}
else if (signal.isComplete()) {
} else {
// take() blocks until next or complete() then return null, but that's OK since this is a *blocking* InputStream
ByteBuffer signal = this.queue.take();
if(signal == null){
this.completed = true;
return null;
}
else if (signal.isError()) {
Throwable error = signal.error();
this.completed = true;
if (error instanceof IOException) {
throw (IOException) error;
}
else {
throw new IOException(error);
}
}
this.currentStream = new ByteBufferInputStream(signal);
return this.currentStream;
}
}
catch (CancelException ce) {
this.completed = true;
return null;
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (Throwable error ){
this.completed = true;
throw new IOException(error);
}
throw new IOException();
}
private class BlockingQueueSubscriber implements Subscriber<ByteBuffer> {
private final long requestSize;
private Subscription subscription;
public BlockingQueueSubscriber(long requestSize) {
this.requestSize = requestSize;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(this.requestSize);
}
@Override
public void onNext(ByteBuffer bytes) {
try {
queue.put(PublisherSignal.data(bytes));
this.subscription.request(requestSize);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void onError(Throwable t) {
try {
queue.put(PublisherSignal.error(t));
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void onComplete() {
try {
queue.put(PublisherSignal.complete());
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -1,156 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.util;
import org.reactivestreams.Publisher;
import org.springframework.util.Assert;
/**
* Represents a signal value object, useful for wrapping signals as published by a {@link
* Publisher}. Mostly used to store signals in buffers.
* @author Arjen Poutsma
*/
public abstract class PublisherSignal<T> {
protected PublisherSignal() {
}
/**
* Indicates whether this signal is an data signal, i.e. if {@link #data()} can be
* called safely.
* @return {@code true} if this signal contains data; {@code false} otherwise
*/
public boolean isData() {
return false;
}
/**
* Returns the data contained in this signal. Can only be safely called after {@link
* #isData()} returns {@code true}.
* @return the data
* @throws IllegalStateException if this signal does not contain data
*/
public T data() {
throw new IllegalStateException();
}
/**
* Indicates whether this signal is an error signal, i.e. if {@link #error()} can be
* called safely.
* @return {@code true} if this signal contains an error; {@code false} otherwise
*/
public boolean isError() {
return false;
}
/**
* Returns the error contained in this signal. Can only be safely called after {@link
* #isError()} returns {@code true}.
* @return the error
* @throws IllegalStateException if this signal does not contain an error
*/
public Throwable error() {
throw new IllegalStateException();
}
/**
* Indicates whether this signal completes the stream.
* @return {@code true} if this signal completes the stream; {@code false} otherwise
*/
public boolean isComplete() {
return false;
}
/**
* Creates a new data signal with the given {@code t}.
* @param t the data to base the signal on
* @return the newly created signal
*/
public static <T> PublisherSignal<T> data(T t) {
Assert.notNull(t, "'t' must not be null");
return new DataSignal<>(t);
}
/**
* Creates a new error signal with the given {@code Throwable}.
* @param t the exception to base the signal on
* @return the newly created signal
*/
public static <T> PublisherSignal<T> error(Throwable t) {
Assert.notNull(t, "'t' must not be null");
return new ErrorSignal<>(t);
}
/**
* Returns the complete signal, typically the last signal in a stream.
*/
@SuppressWarnings("unchecked")
public static <T> PublisherSignal<T> complete() {
return (PublisherSignal<T>)ON_COMPLETE;
}
private static final class DataSignal<T> extends PublisherSignal<T> {
private final T data;
public DataSignal(T data) {
this.data = data;
}
@Override
public boolean isData() {
return true;
}
@Override
public T data() {
return data;
}
}
private static final class ErrorSignal<T> extends PublisherSignal<T> {
private final Throwable error;
public ErrorSignal(Throwable error) {
this.error = error;
}
@Override
public boolean isError() {
return true;
}
@Override
public Throwable error() {
return error;
}
}
@SuppressWarnings("rawtypes")
private static final PublisherSignal ON_COMPLETE = new PublisherSignal() {
@Override
public boolean isComplete() {
return true;
}
};
}