From 3595c51d40a2e735964f22fd29be607b12f600b4 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 29 Jul 2015 14:21:10 +0200 Subject: [PATCH] Introduced DemandCounter --- spring-web-reactive/build.gradle | 7 ++ .../reactive/util/BlockingSignalQueue.java | 15 ++-- .../reactive/util/DemandCounter.java | 70 +++++++++++++++++++ .../web/servlet/AsyncContextSynchronizer.java | 46 ++++++++---- .../web/servlet/RequestBodyPublisher.java | 21 +++--- .../AsyncContextSynchronizerTests.java | 57 +++++++++++++++ 6 files changed, 180 insertions(+), 36 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/util/DemandCounter.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizerTests.java diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 0807ee5280..91ce186840 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -35,6 +35,13 @@ dependencies { testCompile 'org.eclipse.jetty:jetty-servlet:9.3.0.v20150612' testCompile("log4j:log4j:1.2.16") + testCompile("org.mockito:mockito-core:1.10.19") { + exclude group:'org.hamcrest', module:'hamcrest-core' + } + testCompile("org.hamcrest:hamcrest-all:1.3") + testCompile "org.springframework:spring-test:4.1.2.RELEASE" + + } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java index 1b854ff674..67e81601d2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/util/BlockingSignalQueue.java @@ -181,17 +181,15 @@ public class BlockingSignalQueue { private class SubscriptionThread extends Thread { - private volatile long demand = 0; + private final DemandCounter demand = new DemandCounter(); @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { - if (demand > 0 && isHeadSignal()) { + if (this.demand.hasDemand() && isHeadSignal()) { subscriber.onNext(pollSignal()); - if (demand != Long.MAX_VALUE) { - demand--; - } + this.demand.decrement(); } else if (isHeadError()) { subscriber.onError(pollError()); @@ -209,12 +207,7 @@ public class BlockingSignalQueue { } public void request(long n) { - if (n != Long.MAX_VALUE) { - this.demand += n; - } - else { - this.demand = Long.MAX_VALUE; - } + this.demand.increase(n); } public void cancel() { diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/DemandCounter.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/DemandCounter.java new file mode 100644 index 0000000000..cf8cf42c45 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/util/DemandCounter.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.util; + +import java.util.concurrent.atomic.AtomicLong; + +import org.springframework.util.Assert; + +/** + * Small utility class for keeping track of Reactive Streams demand. + * @author Arjen Poutsma + */ +public final class DemandCounter { + + private final AtomicLong demand = new AtomicLong(); + + /** + * Increases the demand by the given number + * @param n the positive number to increase demand by + * @return the increased demand + * @see org.reactivestreams.Subscription#request(long) + */ + public long increase(long n) { + Assert.isTrue(n > 0, "'n' must be higher than 0"); + return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d + n : Long.MAX_VALUE); + } + + /** + * Decreases the demand by one. + * @return the decremented demand + */ + public long decrement() { + return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d - 1 : Long.MAX_VALUE); + } + + /** + * Indicates whether this counter has demand, i.e. whether it is higher than 0. + * @return {@code true} if this counter has demand; {@code false} otherwise + */ + public boolean hasDemand() { + return this.demand.get() > 0; + } + + /** + * Resets this counter to 0. + * @see org.reactivestreams.Subscription#cancel() + */ + public void reset() { + this.demand.set(0); + } + + @Override + public String toString() { + return demand.toString(); + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java index 80882923fa..f158923136 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java @@ -22,15 +22,17 @@ import javax.servlet.AsyncContext; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** + * Utility class for synchronizing between the reading and writing side of an + * {@link AsyncContext}. This class will simply call {@link AsyncContext#complete()} when + * both {@link #readComplete()} and {@link #writeComplete()} have been called. + * * @author Arjen Poutsma + * @see AsyncContext */ -class AsyncContextSynchronizer { +final class AsyncContextSynchronizer { - private static final Log logger = LogFactory.getLog(AsyncContextSynchronizer.class); + private static final int NONE_COMPLETE = 0; private static final int READ_COMPLETE = 1; @@ -40,39 +42,59 @@ class AsyncContextSynchronizer { private final AsyncContext asyncContext; - private final AtomicInteger complete = new AtomicInteger(0); + private final AtomicInteger complete = new AtomicInteger(NONE_COMPLETE); + /** + * Creates a new {@code AsyncContextSynchronizer} based on the given context. + * @param asyncContext the context to base this synchronizer on + */ public AsyncContextSynchronizer(AsyncContext asyncContext) { this.asyncContext = asyncContext; } + /** + * Returns the input stream of this synchronizer. + * @return the input stream + * @throws IOException if an input or output exception occurred + */ public ServletInputStream getInputStream() throws IOException { return this.asyncContext.getRequest().getInputStream(); } + /** + * Returns the output stream of this synchronizer. + * @return the output stream + * @throws IOException if an input or output exception occurred + */ public ServletOutputStream getOutputStream() throws IOException { return this.asyncContext.getResponse().getOutputStream(); } + /** + * Completes the reading side of the asynchronous operation. When both this method and + * {@link #writeComplete()} have been called, the {@code AsyncContext} will be + * {@linkplain AsyncContext#complete() fully completed}. + */ public void readComplete() { - logger.debug("Read complete"); if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) { - logger.debug("Complete"); this.asyncContext.complete(); } else { - this.complete.compareAndSet(0, READ_COMPLETE); + this.complete.compareAndSet(NONE_COMPLETE, READ_COMPLETE); } } + /** + * Completes the writing side of the asynchronous operation. When both this method and + * {@link #readComplete()} have been called, the {@code AsyncContext} will be + * {@linkplain AsyncContext#complete() fully completed}. + */ public void writeComplete() { - logger.debug("Write complete"); if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) { - logger.debug("Complete"); this.asyncContext.complete(); } else { - this.complete.compareAndSet(0, WRITE_COMPLETE); + this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE); } } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/RequestBodyPublisher.java index 24f28b7bff..05bf19828c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/RequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/RequestBodyPublisher.java @@ -28,6 +28,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.springframework.reactive.util.DemandCounter; + /** * @author Arjen Poutsma */ @@ -41,7 +43,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { private final byte[] buffer; - private long demand; + private final DemandCounter demand = new DemandCounter(); private Subscriber subscriber; @@ -64,7 +66,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { while (true) { logger.debug("Demand: " + this.demand); - if (demand <= 0) { + if (!demand.hasDemand()) { break; } @@ -82,9 +84,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { break; } else if (read > 0) { - if (demand != Long.MAX_VALUE) { - demand--; - } + this.demand.decrement(); byte[] copy = Arrays.copyOf(this.buffer, read); // logger.debug("Next: " + new String(copy, UTF_8)); @@ -114,14 +114,9 @@ public class RequestBodyPublisher implements ReadListener, Publisher { public void request(long n) { logger.debug("Updating demand " + demand + " by " + n); - boolean stalled = demand <= 0; + boolean stalled = !demand.hasDemand(); - if (n != Long.MAX_VALUE && demand != Long.MAX_VALUE) { - demand += n; - } - else { - demand = Long.MAX_VALUE; - } + demand.increase(n); if (stalled) { try { @@ -136,7 +131,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { @Override public void cancel() { synchronizer.readComplete(); - demand = 0; + demand.reset(); } } } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizerTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizerTests.java new file mode 100644 index 0000000000..0003f99608 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizerTests.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.servlet; + +import javax.servlet.AsyncContext; + +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.verify; + +/** + * @author Arjen Poutsma + */ +public class AsyncContextSynchronizerTests { + + private AsyncContext asyncContext; + + private AsyncContextSynchronizer synchronizer; + + @Before + public void setUp() throws Exception { + asyncContext = mock(AsyncContext.class); + synchronizer = new AsyncContextSynchronizer(asyncContext); + } + + @Test + public void readThenWrite() { + synchronizer.readComplete(); + synchronizer.writeComplete(); + + verify(asyncContext).complete(); + } + + @Test + public void writeThenRead() { + synchronizer.writeComplete(); + synchronizer.readComplete(); + + verify(asyncContext).complete(); + } +} \ No newline at end of file