Introduced DemandCounter
This commit is contained in:
parent
74a29ac146
commit
3595c51d40
|
@ -35,6 +35,13 @@ dependencies {
|
|||
testCompile 'org.eclipse.jetty:jetty-servlet:9.3.0.v20150612'
|
||||
|
||||
testCompile("log4j:log4j:1.2.16")
|
||||
testCompile("org.mockito:mockito-core:1.10.19") {
|
||||
exclude group:'org.hamcrest', module:'hamcrest-core'
|
||||
}
|
||||
testCompile("org.hamcrest:hamcrest-all:1.3")
|
||||
testCompile "org.springframework:spring-test:4.1.2.RELEASE"
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -181,17 +181,15 @@ public class BlockingSignalQueue<T> {
|
|||
|
||||
private class SubscriptionThread extends Thread {
|
||||
|
||||
private volatile long demand = 0;
|
||||
private final DemandCounter demand = new DemandCounter();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
if (demand > 0 && isHeadSignal()) {
|
||||
if (this.demand.hasDemand() && isHeadSignal()) {
|
||||
subscriber.onNext(pollSignal());
|
||||
if (demand != Long.MAX_VALUE) {
|
||||
demand--;
|
||||
}
|
||||
this.demand.decrement();
|
||||
}
|
||||
else if (isHeadError()) {
|
||||
subscriber.onError(pollError());
|
||||
|
@ -209,12 +207,7 @@ public class BlockingSignalQueue<T> {
|
|||
}
|
||||
|
||||
public void request(long n) {
|
||||
if (n != Long.MAX_VALUE) {
|
||||
this.demand += n;
|
||||
}
|
||||
else {
|
||||
this.demand = Long.MAX_VALUE;
|
||||
}
|
||||
this.demand.increase(n);
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.reactive.util;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Small utility class for keeping track of Reactive Streams demand.
|
||||
* @author Arjen Poutsma
|
||||
*/
|
||||
public final class DemandCounter {
|
||||
|
||||
private final AtomicLong demand = new AtomicLong();
|
||||
|
||||
/**
|
||||
* Increases the demand by the given number
|
||||
* @param n the positive number to increase demand by
|
||||
* @return the increased demand
|
||||
* @see org.reactivestreams.Subscription#request(long)
|
||||
*/
|
||||
public long increase(long n) {
|
||||
Assert.isTrue(n > 0, "'n' must be higher than 0");
|
||||
return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d + n : Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Decreases the demand by one.
|
||||
* @return the decremented demand
|
||||
*/
|
||||
public long decrement() {
|
||||
return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d - 1 : Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether this counter has demand, i.e. whether it is higher than 0.
|
||||
* @return {@code true} if this counter has demand; {@code false} otherwise
|
||||
*/
|
||||
public boolean hasDemand() {
|
||||
return this.demand.get() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets this counter to 0.
|
||||
* @see org.reactivestreams.Subscription#cancel()
|
||||
*/
|
||||
public void reset() {
|
||||
this.demand.set(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return demand.toString();
|
||||
}
|
||||
}
|
|
@ -22,15 +22,17 @@ import javax.servlet.AsyncContext;
|
|||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Utility class for synchronizing between the reading and writing side of an
|
||||
* {@link AsyncContext}. This class will simply call {@link AsyncContext#complete()} when
|
||||
* both {@link #readComplete()} and {@link #writeComplete()} have been called.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @see AsyncContext
|
||||
*/
|
||||
class AsyncContextSynchronizer {
|
||||
final class AsyncContextSynchronizer {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(AsyncContextSynchronizer.class);
|
||||
private static final int NONE_COMPLETE = 0;
|
||||
|
||||
private static final int READ_COMPLETE = 1;
|
||||
|
||||
|
@ -40,39 +42,59 @@ class AsyncContextSynchronizer {
|
|||
|
||||
private final AsyncContext asyncContext;
|
||||
|
||||
private final AtomicInteger complete = new AtomicInteger(0);
|
||||
private final AtomicInteger complete = new AtomicInteger(NONE_COMPLETE);
|
||||
|
||||
/**
|
||||
* Creates a new {@code AsyncContextSynchronizer} based on the given context.
|
||||
* @param asyncContext the context to base this synchronizer on
|
||||
*/
|
||||
public AsyncContextSynchronizer(AsyncContext asyncContext) {
|
||||
this.asyncContext = asyncContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the input stream of this synchronizer.
|
||||
* @return the input stream
|
||||
* @throws IOException if an input or output exception occurred
|
||||
*/
|
||||
public ServletInputStream getInputStream() throws IOException {
|
||||
return this.asyncContext.getRequest().getInputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the output stream of this synchronizer.
|
||||
* @return the output stream
|
||||
* @throws IOException if an input or output exception occurred
|
||||
*/
|
||||
public ServletOutputStream getOutputStream() throws IOException {
|
||||
return this.asyncContext.getResponse().getOutputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes the reading side of the asynchronous operation. When both this method and
|
||||
* {@link #writeComplete()} have been called, the {@code AsyncContext} will be
|
||||
* {@linkplain AsyncContext#complete() fully completed}.
|
||||
*/
|
||||
public void readComplete() {
|
||||
logger.debug("Read complete");
|
||||
if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) {
|
||||
logger.debug("Complete");
|
||||
this.asyncContext.complete();
|
||||
}
|
||||
else {
|
||||
this.complete.compareAndSet(0, READ_COMPLETE);
|
||||
this.complete.compareAndSet(NONE_COMPLETE, READ_COMPLETE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes the writing side of the asynchronous operation. When both this method and
|
||||
* {@link #readComplete()} have been called, the {@code AsyncContext} will be
|
||||
* {@linkplain AsyncContext#complete() fully completed}.
|
||||
*/
|
||||
public void writeComplete() {
|
||||
logger.debug("Write complete");
|
||||
if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) {
|
||||
logger.debug("Complete");
|
||||
this.asyncContext.complete();
|
||||
}
|
||||
else {
|
||||
this.complete.compareAndSet(0, WRITE_COMPLETE);
|
||||
this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.reactivestreams.Publisher;
|
|||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
||||
import org.springframework.reactive.util.DemandCounter;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
*/
|
||||
|
@ -41,7 +43,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
|
|||
|
||||
private final byte[] buffer;
|
||||
|
||||
private long demand;
|
||||
private final DemandCounter demand = new DemandCounter();
|
||||
|
||||
private Subscriber<? super byte[]> subscriber;
|
||||
|
||||
|
@ -64,7 +66,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
|
|||
while (true) {
|
||||
logger.debug("Demand: " + this.demand);
|
||||
|
||||
if (demand <= 0) {
|
||||
if (!demand.hasDemand()) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -82,9 +84,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
|
|||
break;
|
||||
}
|
||||
else if (read > 0) {
|
||||
if (demand != Long.MAX_VALUE) {
|
||||
demand--;
|
||||
}
|
||||
this.demand.decrement();
|
||||
byte[] copy = Arrays.copyOf(this.buffer, read);
|
||||
|
||||
// logger.debug("Next: " + new String(copy, UTF_8));
|
||||
|
@ -114,14 +114,9 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
|
|||
public void request(long n) {
|
||||
logger.debug("Updating demand " + demand + " by " + n);
|
||||
|
||||
boolean stalled = demand <= 0;
|
||||
boolean stalled = !demand.hasDemand();
|
||||
|
||||
if (n != Long.MAX_VALUE && demand != Long.MAX_VALUE) {
|
||||
demand += n;
|
||||
}
|
||||
else {
|
||||
demand = Long.MAX_VALUE;
|
||||
}
|
||||
demand.increase(n);
|
||||
|
||||
if (stalled) {
|
||||
try {
|
||||
|
@ -136,7 +131,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
|
|||
@Override
|
||||
public void cancel() {
|
||||
synchronizer.readComplete();
|
||||
demand = 0;
|
||||
demand.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.reactive.web.servlet;
|
||||
|
||||
import javax.servlet.AsyncContext;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.BDDMockito.mock;
|
||||
import static org.mockito.BDDMockito.verify;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
*/
|
||||
public class AsyncContextSynchronizerTests {
|
||||
|
||||
private AsyncContext asyncContext;
|
||||
|
||||
private AsyncContextSynchronizer synchronizer;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
asyncContext = mock(AsyncContext.class);
|
||||
synchronizer = new AsyncContextSynchronizer(asyncContext);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void readThenWrite() {
|
||||
synchronizer.readComplete();
|
||||
synchronizer.writeComplete();
|
||||
|
||||
verify(asyncContext).complete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void writeThenRead() {
|
||||
synchronizer.writeComplete();
|
||||
synchronizer.readComplete();
|
||||
|
||||
verify(asyncContext).complete();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue