Merge branch '5.1.x'

This commit is contained in:
Rossen Stoyanchev 2019-06-21 14:15:26 +01:00
commit 030caea9cf
2 changed files with 39 additions and 4 deletions

View File

@ -181,7 +181,15 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
else if (this.state == State.NEW) { else if (this.state == State.NEW) {
this.item = item; this.item = item;
this.state = State.FIRST_SIGNAL_RECEIVED; this.state = State.FIRST_SIGNAL_RECEIVED;
writeFunction.apply(this).subscribe(this.writeCompletionBarrier); Publisher<Void> result;
try {
result = writeFunction.apply(this);
}
catch (Throwable ex) {
this.writeCompletionBarrier.onError(ex);
return;
}
result.subscribe(this.writeCompletionBarrier);
} }
else { else {
if (this.subscription != null) { if (this.subscription != null) {
@ -230,7 +238,15 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
else if (this.state == State.NEW) { else if (this.state == State.NEW) {
this.completed = true; this.completed = true;
this.state = State.FIRST_SIGNAL_RECEIVED; this.state = State.FIRST_SIGNAL_RECEIVED;
writeFunction.apply(this).subscribe(this.writeCompletionBarrier); Publisher<Void> result;
try {
result = writeFunction.apply(this);
}
catch (Throwable ex) {
this.writeCompletionBarrier.onError(ex);
return;
}
result.subscribe(this.writeCompletionBarrier);
} }
else { else {
this.completed = true; this.completed = true;

View File

@ -92,7 +92,7 @@ public class ChannelSendOperatorTests {
@Test @Test
public void writeMultipleItems() throws Exception { public void writeMultipleItems() {
List<String> items = Arrays.asList("one", "two", "three"); List<String> items = Arrays.asList("one", "two", "three");
Mono<Void> completion = Flux.fromIterable(items).as(this::sendOperator); Mono<Void> completion = Flux.fromIterable(items).as(this::sendOperator);
Signal<Void> signal = completion.materialize().block(); Signal<Void> signal = completion.materialize().block();
@ -108,7 +108,7 @@ public class ChannelSendOperatorTests {
} }
@Test @Test
public void errorAfterMultipleItems() throws Exception { public void errorAfterMultipleItems() {
IllegalStateException error = new IllegalStateException("boo"); IllegalStateException error = new IllegalStateException("boo");
Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> { Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> {
int i = ++idx; int i = ++idx;
@ -213,6 +213,25 @@ public class ChannelSendOperatorTests {
bufferFactory.checkForLeaks(); bufferFactory.checkForLeaks();
} }
@Test // gh-23175
public void errorInWriteFunction() {
StepVerifier
.create(new ChannelSendOperator<>(Mono.just("one"), p -> {
throw new IllegalStateException("boo");
}))
.expectErrorMessage("boo")
.verify(Duration.ofMillis(5000));
StepVerifier
.create(new ChannelSendOperator<>(Mono.empty(), p -> {
throw new IllegalStateException("boo");
}))
.expectErrorMessage("boo")
.verify(Duration.ofMillis(5000));
}
private <T> Mono<Void> sendOperator(Publisher<String> source){ private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send); return new ChannelSendOperator<>(source, writer::send);
} }