diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java index 2b0293f8eb2..3c9aedd5d95 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java @@ -30,7 +30,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSource; -import reactor.core.publisher.GenerateOutput; +import reactor.core.subscriber.SignalEmitter; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; @@ -175,8 +175,8 @@ public abstract class DataBufferUtils { } private static class ReadableByteChannelGenerator - implements BiFunction, - ReadableByteChannel> { + implements BiFunction, + ReadableByteChannel> { private final DataBufferAllocator allocator; @@ -189,7 +189,7 @@ public abstract class DataBufferUtils { @Override public ReadableByteChannel apply(ReadableByteChannel - channel, GenerateOutput sub) { + channel, SignalEmitter sub) { try { ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize); int read; @@ -200,7 +200,7 @@ public abstract class DataBufferUtils { try { dataBuffer.write(byteBuffer); release = false; - sub.onNext(dataBuffer); + sub.emit(dataBuffer); } finally { if (release) { @@ -209,11 +209,11 @@ public abstract class DataBufferUtils { } } else { - sub.onComplete(); + sub.complete(); } } catch (IOException ex) { - sub.onError(ex); + sub.fail(ex); } return channel; } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java index d5ce4dc018f..c65fe4facbf 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java @@ -110,9 +110,9 @@ public class ChannelSendOperatorTests { IllegalStateException error = new IllegalStateException("boo"); Flux publisher = Flux.generate(() -> 0, (idx , subscriber) -> { int i = ++idx; - subscriber.onNext(String.valueOf(i)); + subscriber.tryEmit(String.valueOf(i)); if (i == 3) { - subscriber.onError(error); + subscriber.fail(error); } return i; });