Fix race conditions in AbstractListenerReadPublisher
Transition from DEMAND->NO_DEMAND: Two concurrent threads enter DEMAND.request and DEMAND.onDataAvailable. And DEMAND.onDataAvailable finishes before DEMAND.request to be able to update the demand field then a request for reading will be lost. Transition from READING->NO_DEMAND: readAndPublish() returns false because there is no demand but before switching the states READING.request is invoked again a request for reading will be lost. Changing READING->DEMAND/NO_DEMAND is made conditional so that the operations will be executed only if changing states succeeds. When in READING state detect completion before each next item in order to exit sooner, if completed. Issue: SPR-16207
This commit is contained in:
parent
b814875211
commit
2a481c5411
|
@ -132,7 +132,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
*/
|
*/
|
||||||
private boolean readAndPublish() throws IOException {
|
private boolean readAndPublish() throws IOException {
|
||||||
long r;
|
long r;
|
||||||
while ((r = demand) > 0) {
|
while ((r = demand) > 0 && !publisherCompleted) {
|
||||||
T data = read();
|
T data = read();
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
if (r != Long.MAX_VALUE) {
|
if (r != Long.MAX_VALUE) {
|
||||||
|
@ -292,27 +292,45 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
|
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
|
||||||
if (Operators.validate(n)) {
|
if (Operators.validate(n)) {
|
||||||
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
|
||||||
|
if (publisher.changeState(NO_DEMAND, DEMAND)) {
|
||||||
|
publisher.checkOnDataAvailable();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
|
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
|
||||||
|
for (;;) {
|
||||||
|
if (!read(publisher)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
long r = publisher.demand;
|
||||||
|
if (r == 0 || publisher.changeState(NO_DEMAND, this)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<T> boolean read(AbstractListenerReadPublisher<T> publisher) {
|
||||||
if (publisher.changeState(this, READING)) {
|
if (publisher.changeState(this, READING)) {
|
||||||
try {
|
try {
|
||||||
boolean demandAvailable = publisher.readAndPublish();
|
boolean demandAvailable = publisher.readAndPublish();
|
||||||
if (demandAvailable) {
|
if (demandAvailable) {
|
||||||
publisher.changeState(READING, DEMAND);
|
if (publisher.changeState(READING, DEMAND)) {
|
||||||
publisher.checkOnDataAvailable();
|
publisher.checkOnDataAvailable();
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
else {
|
}
|
||||||
publisher.changeState(READING, NO_DEMAND);
|
else if (publisher.changeState(READING, NO_DEMAND)) {
|
||||||
publisher.suspendReading();
|
publisher.suspendReading();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (IOException ex) {
|
catch (IOException ex) {
|
||||||
publisher.onError(ex);
|
publisher.onError(ex);
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -321,6 +339,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
|
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
|
||||||
if (Operators.validate(n)) {
|
if (Operators.validate(n)) {
|
||||||
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
|
||||||
|
if (publisher.changeState(NO_DEMAND, DEMAND)) {
|
||||||
|
publisher.checkOnDataAvailable();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -356,7 +377,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
|
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
|
||||||
if (!publisher.changeState(this, COMPLETED)) {
|
if (publisher.changeState(this, COMPLETED)) {
|
||||||
|
publisher.publisherCompleted = true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
publisher.state.get().cancel(publisher);
|
publisher.state.get().cancel(publisher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,6 +391,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
|
|
||||||
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
|
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
|
||||||
if (publisher.changeState(this, COMPLETED)) {
|
if (publisher.changeState(this, COMPLETED)) {
|
||||||
|
publisher.publisherCompleted = true;
|
||||||
if (publisher.subscriber != null) {
|
if (publisher.subscriber != null) {
|
||||||
publisher.subscriber.onComplete();
|
publisher.subscriber.onComplete();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue