Replace use of deprecated Reactor Operators#getAndAdd

This commit is contained in:
Rossen Stoyanchev 2017-01-30 09:14:05 -05:00
parent 69c16f3821
commit 74a3013174
1 changed files with 11 additions and 28 deletions

View File

@ -18,7 +18,7 @@ package org.springframework.http.server.reactive;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@ -48,7 +48,11 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private final AtomicLong demand = new AtomicLong();
private volatile long demand;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<AbstractListenerReadPublisher> DEMAND_FIELD_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractListenerReadPublisher.class, "demand");
private Subscriber<? super T> subscriber;
@ -115,7 +119,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
while (hasDemand()) {
T data = read();
if (data != null) {
getAndSub(this.demand, 1L);
Operators.addAndGet(DEMAND_FIELD_UPDATER, this, -1L);
this.subscriber.onNext(data);
}
else {
@ -125,29 +129,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
return false;
}
/**
* Concurrent subscription bound to 0 and Long.MAX_VALUE.
* Any concurrent write will "happen" before this operation.
* @param sequence current atomic to update
* @param toSub delta to sub
* @return value before subscription, 0 or Long.MAX_VALUE
*/
private static long getAndSub(AtomicLong sequence, long toSub) {
long r;
long u;
do {
r = sequence.get();
if (r == 0 || r == Long.MAX_VALUE) {
return r;
}
u = Operators.subOrZero(r, toSub);
} while (!sequence.compareAndSet(r, u));
return r;
}
private boolean hasDemand() {
return (this.demand.get() > 0);
return (this.demand > 0);
}
private boolean changeState(State oldState, State newState) {
@ -236,7 +219,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
Operators.addAndGet(DEMAND_FIELD_UPDATER, publisher, n);
if (publisher.changeState(this, DEMAND)) {
publisher.checkOnDataAvailable();
}
@ -254,7 +237,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
Operators.addAndGet(DEMAND_FIELD_UPDATER, publisher, n);
}
}
@ -282,7 +265,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
Operators.addAndGet(DEMAND_FIELD_UPDATER, publisher, n);
}
}
},