Sync Reactor SignalEmitter

This commit is contained in:
Stephane Maldini 2016-05-04 01:37:11 +01:00
parent 04f47da15e
commit 4e8c21e85a
2 changed files with 9 additions and 9 deletions

View File

@ -30,7 +30,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSource; import reactor.core.publisher.FluxSource;
import reactor.core.publisher.GenerateOutput; import reactor.core.subscriber.SignalEmitter;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator; import org.springframework.core.io.buffer.DataBufferAllocator;
@ -175,8 +175,8 @@ public abstract class DataBufferUtils {
} }
private static class ReadableByteChannelGenerator private static class ReadableByteChannelGenerator
implements BiFunction<ReadableByteChannel, GenerateOutput<DataBuffer>, implements BiFunction<ReadableByteChannel, SignalEmitter<DataBuffer>,
ReadableByteChannel> { ReadableByteChannel> {
private final DataBufferAllocator allocator; private final DataBufferAllocator allocator;
@ -189,7 +189,7 @@ public abstract class DataBufferUtils {
@Override @Override
public ReadableByteChannel apply(ReadableByteChannel public ReadableByteChannel apply(ReadableByteChannel
channel, GenerateOutput<DataBuffer> sub) { channel, SignalEmitter<DataBuffer> sub) {
try { try {
ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize); ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize);
int read; int read;
@ -200,7 +200,7 @@ public abstract class DataBufferUtils {
try { try {
dataBuffer.write(byteBuffer); dataBuffer.write(byteBuffer);
release = false; release = false;
sub.onNext(dataBuffer); sub.emit(dataBuffer);
} }
finally { finally {
if (release) { if (release) {
@ -209,11 +209,11 @@ public abstract class DataBufferUtils {
} }
} }
else { else {
sub.onComplete(); sub.complete();
} }
} }
catch (IOException ex) { catch (IOException ex) {
sub.onError(ex); sub.fail(ex);
} }
return channel; return channel;
} }

View File

@ -110,9 +110,9 @@ public class ChannelSendOperatorTests {
IllegalStateException error = new IllegalStateException("boo"); IllegalStateException error = new IllegalStateException("boo");
Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> { Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> {
int i = ++idx; int i = ++idx;
subscriber.onNext(String.valueOf(i)); subscriber.tryEmit(String.valueOf(i));
if (i == 3) { if (i == 3) {
subscriber.onError(error); subscriber.fail(error);
} }
return i; return i;
}); });