diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index c9c4c7150b2..0807ee5280a 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -27,6 +27,15 @@ dependencies { provided "javax.servlet:javax.servlet-api:3.1.0" testCompile "junit:junit:4.12" + testCompile "org.springframework:spring-web:4.1.2.RELEASE" + testCompile 'org.apache.tomcat:tomcat-util:8.0.23' + testCompile 'org.apache.tomcat.embed:tomcat-embed-core:8.0.23' + + testCompile 'org.eclipse.jetty:jetty-server:9.3.0.v20150612' + testCompile 'org.eclipse.jetty:jetty-servlet:9.3.0.v20150612' + + testCompile("log4j:log4j:1.2.16") + } diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java index d3251c04514..3f5b882f44b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherInputStream.java @@ -21,7 +21,6 @@ import java.io.InputStream; import org.reactivestreams.Publisher; import org.springframework.rx.util.BlockingSignalQueue; -import org.springframework.rx.util.BlockingSignalQueueSubscriber; import org.springframework.util.Assert; /** @@ -44,7 +43,7 @@ public class ByteArrayPublisherInputStream extends InputStream { Assert.notNull(publisher, "'publisher' must not be null"); this.queue = new BlockingSignalQueue(); - publisher.subscribe(new BlockingSignalQueueSubscriber(this.queue)); + publisher.subscribe(this.queue.subscriber()); } ByteArrayPublisherInputStream(BlockingSignalQueue queue) { diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java index a89b9daee89..80550a43c1a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/rx/io/ByteArrayPublisherOutputStream.java @@ -7,7 +7,6 @@ import java.util.Arrays; import org.reactivestreams.Publisher; import org.springframework.rx.util.BlockingSignalQueue; -import org.springframework.rx.util.BlockingSignalQueuePublisher; /** * {@code OutputStream} implementation that stores all written bytes, to be retrieved @@ -23,7 +22,7 @@ public class ByteArrayPublisherOutputStream extends OutputStream { * @return a publisher for the written bytes */ public Publisher toByteBufPublisher() { - return new BlockingSignalQueuePublisher(this.queue); + return this.queue.publisher(); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java index 476ab16eb46..890dc89eb6e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueue.java @@ -19,6 +19,10 @@ package org.springframework.rx.util; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + import org.springframework.util.Assert; /** @@ -35,6 +39,8 @@ import org.springframework.util.Assert; */ public class BlockingSignalQueue { + private static final int DEFAULT_REQUEST_SIZE_SUBSCRIBER = 1; + private final BlockingQueue> queue = new LinkedBlockingQueue>(); @@ -119,121 +125,151 @@ public class BlockingSignalQueue { return signal != null ? signal.error() : null; } - private interface Signal { - - boolean isOnNext(); - - T next(); - - boolean isOnError(); - - Throwable error(); - - boolean isComplete(); + /** + * Returns a {@code Publisher} backed by this queue. + */ + public Publisher publisher() { + return new BlockingSignalQueuePublisher(); } - private static class OnNext implements Signal { + /** + * Returns a {@code Subscriber} backed by this queue. + */ + public Subscriber subscriber() { + return subscriber(DEFAULT_REQUEST_SIZE_SUBSCRIBER); + } - private final T next; + /** + * Returns a {@code Subscriber} backed by this queue, with the given request size. + * @see Subscription#request(long) + */ + public Subscriber subscriber(long requestSize) { + return new BlockingSignalQueueSubscriber(requestSize); + } - public OnNext(T next) { - Assert.notNull(next, "'next' must not be null"); - this.next = next; - } + private class BlockingSignalQueuePublisher implements Publisher { + + private Subscriber subscriber; + + private final Object subscriberMutex = new Object(); @Override - public boolean isOnNext() { - return true; + public void subscribe(Subscriber subscriber) { + synchronized (this.subscriberMutex) { + if (this.subscriber != null) { + subscriber.onError( + new IllegalStateException("Only one subscriber allowed")); + } + else { + this.subscriber = subscriber; + final SubscriptionThread thread = new SubscriptionThread(); + this.subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + thread.request(n); + } + + @Override + public void cancel() { + thread.cancel(); + } + }); + thread.start(); + } + } } - @Override - public T next() { - return next; - } + private class SubscriptionThread extends Thread { - @Override - public boolean isOnError() { - return false; - } + private volatile long demand = 0; - @Override - public Throwable error() { - throw new IllegalStateException(); - } + @Override + public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + if (demand > 0 && isHeadSignal()) { + subscriber.onNext(pollSignal()); + if (demand != Long.MAX_VALUE) { + demand--; + } + } + else if (isHeadError()) { + subscriber.onError(pollError()); + break; + } + else if (isComplete()) { + subscriber.onComplete(); + break; + } + } + } + catch (InterruptedException ex) { + // Allow thread to exit + } + } - @Override - public boolean isComplete() { - return false; + public void request(long n) { + if (n != Long.MAX_VALUE) { + this.demand += n; + } + else { + this.demand = Long.MAX_VALUE; + } + } + + public void cancel() { + interrupt(); + } } } - private static final class OnError implements Signal { + private class BlockingSignalQueueSubscriber implements Subscriber { - private final Throwable error; + private final long requestSize; - public OnError(Throwable error) { - Assert.notNull(error, "'error' must not be null"); - this.error = error; + private Subscription subscription; + + public BlockingSignalQueueSubscriber(long requestSize) { + this.requestSize = requestSize; } @Override - public boolean isOnError() { - return true; + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + + this.subscription.request(this.requestSize); } @Override - public Throwable error() { - return error; + public void onNext(T t) { + try { + putSignal(t); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + this.subscription.request(requestSize); } @Override - public boolean isOnNext() { - return false; + public void onError(Throwable t) { + try { + putError(t); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + this.subscription.request(requestSize); } @Override - public T next() { - throw new IllegalStateException(); - } - - @Override - public boolean isComplete() { - return false; + public void onComplete() { + try { + complete(); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } } } - - private static class OnComplete implements Signal { - - private static final OnComplete INSTANCE = new OnComplete(); - - private OnComplete() { - } - - @Override - public boolean isComplete() { - return true; - } - - @Override - public boolean isOnNext() { - return false; - } - - @Override - public T next() { - throw new IllegalStateException(); - } - - @Override - public boolean isOnError() { - return false; - } - - @Override - public Throwable error() { - throw new IllegalStateException(); - } - - } - } diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueuePublisher.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueuePublisher.java deleted file mode 100644 index a542cd8dbf1..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueuePublisher.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.rx.util; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -public class BlockingSignalQueuePublisher implements Publisher { - - private final BlockingSignalQueue queue; - - private Subscriber subscriber; - - private final Object subscriberMutex = new Object(); - - public BlockingSignalQueuePublisher(BlockingSignalQueue queue) { - Assert.notNull(queue, "'queue' must not be null"); - this.queue = queue; - } - - @Override - public void subscribe(Subscriber subscriber) { - synchronized (this.subscriberMutex) { - if (this.subscriber != null) { - subscriber.onError( - new IllegalStateException("Only one subscriber allowed")); - } - else { - this.subscriber = subscriber; - final SubscriptionThread thread = new SubscriptionThread(); - this.subscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - thread.request(n); - } - - @Override - public void cancel() { - thread.cancel(); - } - }); - thread.start(); - } - } - } - - private class SubscriptionThread extends Thread { - - private volatile long requestCount = 0; - - private long l = 0; - - @Override - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - if ((l < requestCount || requestCount == Long.MAX_VALUE) && - queue.isHeadSignal()) { - subscriber.onNext(queue.pollSignal()); - l++; - } - else if (queue.isHeadError()) { - subscriber.onError(queue.pollError()); - break; - } - else if (queue.isComplete()) { - subscriber.onComplete(); - break; - } - } - } - catch (InterruptedException ex) { - // Allow thread to exit - } - } - - public void request(long n) { - if (n != Long.MAX_VALUE) { - this.requestCount += n; - } - else { - this.requestCount = Long.MAX_VALUE; - } - } - - public void cancel() { - interrupt(); - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java deleted file mode 100644 index b17c0d709cb..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/rx/util/BlockingSignalQueueSubscriber.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.rx.util; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.util.Assert; - -/** - * A simple byte array {@link Subscriber} that puts all published bytes on a - * {@link @BlockingSignalQueue}. - * - * @author Arjen Poutsma - */ -public class BlockingSignalQueueSubscriber implements Subscriber { - - /** - * The default request size to use. - */ - public static final int DEFAULT_REQUEST_SIZE = 1; - - private final BlockingSignalQueue queue; - - private Subscription subscription; - - private int initialRequestSize = DEFAULT_REQUEST_SIZE; - - private int requestSize = DEFAULT_REQUEST_SIZE; - - - /** - * Creates a new {@code BlockingSignalQueueSubscriber} using the given queue. - * @param queue the queue to use - */ - public BlockingSignalQueueSubscriber(BlockingSignalQueue queue) { - Assert.notNull(queue, "'queue' must not be null"); - this.queue = queue; - } - - /** - * Sets the request size used when subscribing, in {@link #onSubscribe(Subscription)}. - * Defaults to {@link #DEFAULT_REQUEST_SIZE}. - * @param initialRequestSize the initial request size - * @see Subscription#request(long) - */ - public void setInitialRequestSize(int initialRequestSize) { - this.initialRequestSize = initialRequestSize; - } - - /** - * Sets the request size used after data or an error comes in, in {@link - * #onNext(Object)} and {@link #onError(Throwable)}. Defaults to {@link - * #DEFAULT_REQUEST_SIZE}. - * @see Subscription#request(long) - */ - public void setRequestSize(int requestSize) { - this.requestSize = requestSize; - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - - this.subscription.request(this.initialRequestSize); - } - - @Override - public void onNext(T t) { - try { - this.queue.putSignal(t); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - this.subscription.request(requestSize); - } - - @Override - public void onError(Throwable t) { - try { - this.queue.putError(t); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - this.subscription.request(requestSize); - } - - @Override - public void onComplete() { - try { - this.queue.complete(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java new file mode 100644 index 00000000000..74e1cccc104 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnComplete.java @@ -0,0 +1,54 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +/** + * @author Arjen Poutsma + */ +class OnComplete implements Signal { + + public static final OnComplete INSTANCE = new OnComplete(); + + private OnComplete() { + } + + @Override + public boolean isComplete() { + return true; + } + + @Override + public boolean isOnNext() { + return false; + } + + @Override + public T next() { + throw new IllegalStateException(); + } + + @Override + public boolean isOnError() { + return false; + } + + @Override + public Throwable error() { + throw new IllegalStateException(); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java new file mode 100644 index 00000000000..250e6f77271 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnError.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +final class OnError implements Signal { + + private final Throwable error; + + public OnError(Throwable error) { + Assert.notNull(error, "'error' must not be null"); + this.error = error; + } + + @Override + public boolean isOnError() { + return true; + } + + @Override + public Throwable error() { + return error; + } + + @Override + public boolean isOnNext() { + return false; + } + + @Override + public T next() { + throw new IllegalStateException(); + } + + @Override + public boolean isComplete() { + return false; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java new file mode 100644 index 00000000000..6d1aa6d8ca7 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/OnNext.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +class OnNext implements Signal { + + private final T next; + + public OnNext(T next) { + Assert.notNull(next, "'next' must not be null"); + this.next = next; + } + + @Override + public boolean isOnNext() { + return true; + } + + @Override + public T next() { + return next; + } + + @Override + public boolean isOnError() { + return false; + } + + @Override + public Throwable error() { + throw new IllegalStateException(); + } + + @Override + public boolean isComplete() { + return false; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java b/spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java new file mode 100644 index 00000000000..74ea5eacac8 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/util/Signal.java @@ -0,0 +1,33 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.util; + +/** + * @author Arjen Poutsma + */ +interface Signal { + + boolean isOnNext(); + + T next(); + + boolean isOnError(); + + Throwable error(); + + boolean isComplete(); +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java index 9d2b8d08809..20be9c51b81 100644 --- a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java +++ b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/AsyncContextSynchronizer.java @@ -22,11 +22,16 @@ import javax.servlet.AsyncContext; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * @author Arjen Poutsma */ class AsyncContextSynchronizer { + private static final Log logger = LogFactory.getLog(AsyncContextSynchronizer.class); + private static final int READ_COMPLETE = 1; private static final int WRITE_COMPLETE = 1 << 1; @@ -50,7 +55,9 @@ class AsyncContextSynchronizer { } public void readComplete() { + logger.debug("Read complete"); if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) { + logger.debug("Complete"); this.asyncContext.complete(); } else { @@ -59,7 +66,9 @@ class AsyncContextSynchronizer { } public void writeComplete() { + logger.debug("Write complete"); if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) { + logger.debug("Complete"); this.asyncContext.complete(); } else { diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/HttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/HttpHandler.java new file mode 100644 index 00000000000..e9a36869592 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/HttpHandler.java @@ -0,0 +1,28 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import org.reactivestreams.Publisher; + +/** + * @author Arjen Poutsma + */ +public interface HttpHandler { + + Publisher handle(Publisher request); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/HttpHandlerServlet.java b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/HttpHandlerServlet.java new file mode 100644 index 00000000000..f19d3803145 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/HttpHandlerServlet.java @@ -0,0 +1,62 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import java.io.IOException; +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.reactivestreams.Publisher; + +/** + * @author Arjen Poutsma + */ +@WebServlet(asyncSupported = true ) +public class HttpHandlerServlet extends HttpServlet { + + private static final int BUFFER_SIZE = 4096; + + private HttpHandler handler; + + public void setHandler(HttpHandler handler) { + this.handler = handler; + } + + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + + AsyncContext context = request.startAsync(); + final AsyncContextSynchronizer contextSynchronizer = + new AsyncContextSynchronizer(context); + + RequestBodyPublisher requestPublisher = new RequestBodyPublisher(contextSynchronizer, BUFFER_SIZE); + request.getInputStream().setReadListener(requestPublisher); + + ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(contextSynchronizer); + response.getOutputStream().setWriteListener(responseSubscriber); + + Publisher responsePublisher = this.handler.handle(requestPublisher); + + responsePublisher.subscribe(responseSubscriber); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/RequestBodyPublisher.java new file mode 100644 index 00000000000..6022922c83e --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/RequestBodyPublisher.java @@ -0,0 +1,142 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Arrays; +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * @author Arjen Poutsma + */ +public class RequestBodyPublisher implements ReadListener, Publisher { + + private final Charset UTF_8 = Charset.forName("UTF-8"); + + private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class); + + private final AsyncContextSynchronizer synchronizer; + + private final byte[] buffer; + + private long demand; + + private Subscriber subscriber; + + public RequestBodyPublisher(AsyncContextSynchronizer synchronizer, int bufferSize) { + this.synchronizer = synchronizer; + this.buffer = new byte[bufferSize]; + } + + @Override + public void subscribe(Subscriber s) { + this.subscriber = s; + + this.subscriber.onSubscribe(new RequestBodySubscription()); + } + + @Override + public void onDataAvailable() throws IOException { + ServletInputStream input = this.synchronizer.getInputStream(); + + while (true) { + logger.debug("Demand: " + this.demand); + + if (demand <= 0) { + break; + } + + boolean ready = input.isReady(); + logger.debug("Input " + ready + "/" + input.isFinished()); + + if (!ready) { + break; + } + + int read = input.read(buffer); + logger.debug("Input read:" + read); + + if (read == -1) { + break; + } + else if (read > 0) { + if (demand != Long.MAX_VALUE) { + demand--; + } + byte[] copy = Arrays.copyOf(this.buffer, read); + +// logger.debug("Next: " + new String(copy, UTF_8)); + + this.subscriber.onNext(copy); + + } + } + } + + @Override + public void onAllDataRead() throws IOException { + logger.debug("All data read"); + this.synchronizer.readComplete(); + this.subscriber.onComplete(); + } + + @Override + public void onError(Throwable t) { + logger.error("RequestBodyPublisher Error", t); + this.subscriber.onError(t); + } + + private class RequestBodySubscription implements Subscription { + + @Override + public void request(long n) { + logger.debug("Updating demand " + demand + " by " + n); + + boolean stalled = demand <= 0; + + if (n != Long.MAX_VALUE && demand != Long.MAX_VALUE) { + demand += n; + } + else { + demand = Long.MAX_VALUE; + } + + if (stalled) { + try { + onDataAvailable(); + } + catch (IOException ex) { + onError(ex); + } + } + } + + @Override + public void cancel() { + synchronizer.readComplete(); + demand = 0; + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/ResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/ResponseBodySubscriber.java new file mode 100644 index 00000000000..f7faa4b16ad --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/rx/web/servlet/ResponseBodySubscriber.java @@ -0,0 +1,111 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.util.Assert; + +/** + * @author Arjen Poutsma + */ +public class ResponseBodySubscriber implements WriteListener, Subscriber { + + private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); + + private final AsyncContextSynchronizer synchronizer; + + private Subscription subscription; + + private byte[] buffer; + + private AtomicBoolean complete = new AtomicBoolean(false); + + public ResponseBodySubscriber(AsyncContextSynchronizer synchronizer) { + this.synchronizer = synchronizer; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + this.subscription.request(1); + } + + @Override + public void onNext(byte[] bytes) { + logger.debug("Next: " + bytes.length + " bytes"); + + Assert.isNull(buffer); + + this.buffer = bytes; + try { + onWritePossible(); + } + catch (IOException e) { + onError(e); + } + } + + @Override + public void onComplete() { + logger.debug("Complete buffer: " + (buffer == null)); + + if (complete.compareAndSet(false, true) && buffer == null) { + this.synchronizer.writeComplete(); + } + } + + @Override + public void onWritePossible() throws IOException { + ServletOutputStream output = this.synchronizer.getOutputStream(); + + boolean ready = output.isReady(); + logger.debug("Output: " + ready + " buffer: " + (buffer == null)); + + if (this.buffer != null && ready) { + output.write(this.buffer); + this.buffer = null; + + if (!complete.get()) { + this.subscription.request(1); + } + else { + this.synchronizer.writeComplete(); + } + } + else if (this.buffer == null && ready) { + this.subscription.request(1); + } + } + + @Override + public void onError(Throwable t) { + logger.error("ResponseBodySubscriber error", t); + } + + + private void complete() { + } +} diff --git a/spring-web-reactive/src/main/resources/log4j.properties b/spring-web-reactive/src/main/resources/log4j.properties new file mode 100644 index 00000000000..a8eb1d47748 --- /dev/null +++ b/spring-web-reactive/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootCategory=INFO, stdout +log4j.logger.org.springframework.rx=DEBUG + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d %p [%25.25c{1}] <%t> - %m%n \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java b/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java index ea06be0397f..b09781b1072 100644 --- a/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/rx/util/BlockingByteBufQueuePublisherTests.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -34,12 +35,12 @@ public class BlockingByteBufQueuePublisherTests { private BlockingSignalQueue queue; - private BlockingSignalQueuePublisher publisher; + private Publisher publisher; @Before public void setUp() throws Exception { queue = new BlockingSignalQueue(); - publisher = new BlockingSignalQueuePublisher(queue); + publisher = queue.publisher(); } @Test diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/AbstractHttpHandlerServletIntegrationTestCase.java b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/AbstractHttpHandlerServletIntegrationTestCase.java new file mode 100644 index 00000000000..50e009b2977 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/AbstractHttpHandlerServletIntegrationTestCase.java @@ -0,0 +1,96 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import java.net.URI; +import java.util.Random; + +import org.junit.Test; + +import org.springframework.http.HttpMethod; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.util.SocketUtils; +import org.springframework.web.client.RestTemplate; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public abstract class AbstractHttpHandlerServletIntegrationTestCase { + + private static final int REQUEST_SIZE = 4096 * 3; + + protected static int port = SocketUtils.findAvailableTcpPort(); + + private Random rnd = new Random(); + + + @Test + public void bytes() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + + byte[] body = randomBytes(); + RequestEntity + request = new RequestEntity(body, HttpMethod.POST, new URI(url())); + ResponseEntity response = restTemplate.exchange(request, byte[].class); + + assertArrayEquals(body, response.getBody()); + } + + @Test + public void string() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + + String body = randomString(); + RequestEntity request = new RequestEntity(body, HttpMethod.POST, new URI(url())); + ResponseEntity response = restTemplate.exchange(request, String.class); + + assertEquals(body, response.getBody()); + } + + private static String url() { + return "http://localhost:" + port + "/rx"; + } + + private String randomString() { + StringBuilder builder = new StringBuilder(); + int i = 1; + while (builder.length() < REQUEST_SIZE) { + builder.append(randomChar()); + if (i % 5 == 0) { + builder.append(' '); + } + if (i % 80 == 0) { + builder.append('\n'); + } + i++; + } + return builder.toString(); + } + + private char randomChar() { + return (char) (rnd.nextInt(26) + 'a'); + } + + private byte[] randomBytes() { + byte[] buffer = new byte[REQUEST_SIZE]; + rnd.nextBytes(buffer); + return buffer; + } + + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/CountingHttpHandler.java b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/CountingHttpHandler.java new file mode 100644 index 00000000000..962fd7be681 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/CountingHttpHandler.java @@ -0,0 +1,64 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * @author Arjen Poutsma + */ +public class CountingHttpHandler implements HttpHandler { + + private static final Log logger = LogFactory.getLog(CountingHttpHandler.class); + + @Override + public Publisher handle(Publisher request) { + request.subscribe(new Subscriber() { + private Subscription subscription; + + private int byteCount = 0; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + this.subscription.request(1); + } + + @Override + public void onNext(byte[] bytes) { + byteCount += bytes.length; + this.subscription.request(1); + } + + @Override + public void onError(Throwable t) { + logger.error("CountingHttpHandler Error", t); + t.printStackTrace(); + } + + @Override + public void onComplete() { + logger.info("Processed " + byteCount + " bytes"); + } + }); + return null; + } +} diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/EchoHandler.java b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/EchoHandler.java new file mode 100644 index 00000000000..faf728b0f19 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/EchoHandler.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import org.reactivestreams.Publisher; + +/** + * @author Arjen Poutsma + */ +public class EchoHandler implements HttpHandler { + + @Override + public Publisher handle(Publisher request) { + return request; + } +} diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/HttpHandlerServletJettyIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/HttpHandlerServletJettyIntegrationTests.java new file mode 100644 index 00000000000..be3ef6f52f1 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/HttpHandlerServletJettyIntegrationTests.java @@ -0,0 +1,56 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.springframework.util.SocketUtils; + +/** + * @author Arjen Poutsma + */ +public class HttpHandlerServletJettyIntegrationTests + extends AbstractHttpHandlerServletIntegrationTestCase { + + private static Server jettyServer; + + @BeforeClass + public static void startServer() throws Exception { + jettyServer = new Server(); + ServerConnector connector = new ServerConnector(jettyServer); + port = SocketUtils.findAvailableTcpPort(); + connector.setPort(port); + ServletContextHandler handler = new ServletContextHandler(jettyServer, "", false, false); + HttpHandlerServlet servlet = new HttpHandlerServlet(); + servlet.setHandler(new EchoHandler()); + ServletHolder servletHolder = new ServletHolder(servlet); + handler.addServlet(servletHolder, "/rx"); + jettyServer.addConnector(connector); + jettyServer.start(); + } + + @AfterClass + public static void stopServer() throws Exception { + jettyServer.stop(); + } + +} \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/HttpHandlerServletTomcatIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/HttpHandlerServletTomcatIntegrationTests.java new file mode 100644 index 00000000000..a9d07171b66 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/rx/web/servlet/HttpHandlerServletTomcatIntegrationTests.java @@ -0,0 +1,55 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rx.web.servlet; + +import java.io.File; + +import org.apache.catalina.Context; +import org.apache.catalina.LifecycleException; +import org.apache.catalina.startup.Tomcat; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * @author Arjen Poutsma + */ +public class HttpHandlerServletTomcatIntegrationTests extends AbstractHttpHandlerServletIntegrationTestCase { + + private static Tomcat tomcatServer; + + @BeforeClass + public static void startServer() throws LifecycleException, InterruptedException { + tomcatServer = new Tomcat(); + tomcatServer.setPort(port); + File base = new File(System.getProperty("java.io.tmpdir")); + Context rootCtx = tomcatServer.addContext("", base.getAbsolutePath()); + + HttpHandlerServlet servlet = new HttpHandlerServlet(); + servlet.setHandler(new EchoHandler()); + + tomcatServer.addServlet(rootCtx, "handlerServlet", servlet); + rootCtx.addServletMapping("/rx", "handlerServlet"); + + tomcatServer.start(); + } + + @AfterClass + public static void stopServer() throws LifecycleException { + tomcatServer.stop(); + } + +} \ No newline at end of file