Merge branch '5.1.x'

This commit is contained in:
Rossen Stoyanchev 2019-04-02 11:05:30 -04:00
commit 89a29598d5
2 changed files with 38 additions and 2 deletions

View File

@ -394,7 +394,12 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override
public void onError(Throwable ex) {
this.completionSubscriber.onError(ex);
try {
this.completionSubscriber.onError(ex);
}
finally {
this.writeBarrier.releaseCachedItem();
}
}
@Override

View File

@ -17,6 +17,7 @@
package org.springframework.http.server.reactive;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -33,6 +34,7 @@ import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
@ -156,7 +158,12 @@ public class ChannelSendOperatorTests {
}
@Test // gh-22720
public void errorWhileItemCached() {
public void errorFromWriteSourceWhileItemCached() {
// 1. First item received
// 2. writeFunction applied and writeCompletionBarrier subscribed to it
// 3. Write Publisher fails right after that and before request(n) from server
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
@ -186,6 +193,30 @@ public class ChannelSendOperatorTests {
bufferFactory.checkForLeaks();
}
@Test // gh-22720
public void errorFromWriteFunctionWhileItemCached() {
// 1. First item received
// 2. writeFunction applied and writeCompletionBarrier subscribed to it
// 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Flux.create(sink -> {
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
dataBuffer.write("foo", StandardCharsets.UTF_8);
sink.next(dataBuffer);
}),
publisher -> {
publisher.subscribe(new ZeroDemandSubscriber());
return Mono.error(new IllegalStateException("err"));
});
StepVerifier.create(operator).expectErrorMessage("err").verify(Duration.ofSeconds(5));
bufferFactory.checkForLeaks();
}
private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send);