Ensure onComplete/onError events will be delivered

Issue: SPR-16207
This commit is contained in:
Violeta Georgieva 2017-11-21 00:25:38 +02:00 committed by Rossen Stoyanchev
parent 604017894e
commit 41b13a4e8a
4 changed files with 57 additions and 9 deletions

View File

@ -312,7 +312,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
} }
<T> void cancel(AbstractListenerReadPublisher<T> publisher) { <T> void cancel(AbstractListenerReadPublisher<T> publisher) {
publisher.changeState(this, COMPLETED); if (!publisher.changeState(this, COMPLETED)) {
publisher.state.get().cancel(publisher);
}
} }
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) { <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
@ -325,6 +327,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.subscriber.onComplete(); publisher.subscriber.onComplete();
} }
} }
else {
publisher.state.get().onAllDataRead(publisher);
}
} }
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) { <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
@ -333,6 +338,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.subscriber.onError(t); publisher.subscriber.onError(t);
} }
} }
else {
publisher.state.get().onError(publisher, t);
}
} }
} }

View File

@ -190,6 +190,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete(); processor.resultPublisher.publishComplete();
} }
else {
processor.state.get().onComplete(processor);
}
} }
}, },
@ -212,6 +215,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
else if (processor.changeState(this, COMPLETED)) { else if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete(); processor.resultPublisher.publishComplete();
} }
else {
processor.state.get().onComplete(processor);
}
} }
else { else {
if (processor.changeState(this, REQUESTED)) { if (processor.changeState(this, REQUESTED)) {
@ -238,6 +244,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete(); processor.resultPublisher.publishComplete();
} }
else {
processor.state.get().onComplete(processor);
}
} }
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) { public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
// ignore // ignore
@ -275,6 +284,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishError(ex); processor.resultPublisher.publishError(ex);
} }
else {
processor.state.get().onError(processor, ex);
}
} }
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {

View File

@ -251,6 +251,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
processor.writingComplete(); processor.writingComplete();
processor.resultPublisher.publishComplete(); processor.resultPublisher.publishComplete();
} }
else {
processor.state.get().onComplete(processor);
}
} }
}, },
@ -274,15 +277,29 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
if (writeCompleted) { if (writeCompleted) {
processor.releaseData(); processor.releaseData();
if (!processor.subscriberCompleted) { if (!processor.subscriberCompleted) {
processor.changeState(WRITING, REQUESTED); if (processor.changeState(WRITING, REQUESTED)) {
processor.suspendWriting(); if (processor.subscriberCompleted) {
Assert.state(processor.subscription != null, "No subscription"); if (processor.changeState(REQUESTED, COMPLETED)) {
processor.subscription.request(1); processor.writingComplete();
processor.resultPublisher.publishComplete();
} else {
processor.state.get().onComplete(processor);
}
}
else {
processor.suspendWriting();
Assert.state(processor.subscription != null, "No subscription");
processor.subscription.request(1);
}
}
} }
else { else {
processor.changeState(WRITING, COMPLETED); if (processor.changeState(WRITING, COMPLETED)) {
processor.writingComplete(); processor.writingComplete();
processor.resultPublisher.publishComplete(); processor.resultPublisher.publishComplete();
} else {
processor.state.get().onComplete(processor);
}
} }
} }
else { else {
@ -343,6 +360,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
processor.writingComplete(); processor.writingComplete();
processor.resultPublisher.publishError(ex); processor.resultPublisher.publishError(ex);
} }
else {
processor.state.get().onError(processor, ex);
}
} }
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {

View File

@ -158,6 +158,9 @@ class WriteResultPublisher implements Publisher<Void> {
Assert.state(publisher.subscriber != null, "No subscriber"); Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onComplete(); publisher.subscriber.onComplete();
} }
else {
publisher.state.get().publishComplete(publisher);
}
} }
@Override @Override
void publishError(WriteResultPublisher publisher, Throwable t) { void publishError(WriteResultPublisher publisher, Throwable t) {
@ -165,6 +168,9 @@ class WriteResultPublisher implements Publisher<Void> {
Assert.state(publisher.subscriber != null, "No subscriber"); Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onError(t); publisher.subscriber.onError(t);
} }
else {
publisher.state.get().publishError(publisher, t);
}
} }
}, },
@ -196,7 +202,9 @@ class WriteResultPublisher implements Publisher<Void> {
} }
void cancel(WriteResultPublisher publisher) { void cancel(WriteResultPublisher publisher) {
publisher.changeState(this, COMPLETED); if (!publisher.changeState(this, COMPLETED)) {
publisher.state.get().cancel(publisher);
}
} }
void publishComplete(WriteResultPublisher publisher) { void publishComplete(WriteResultPublisher publisher) {