Remove last Stream reference

This commit is contained in:
Stephane Maldini 2016-01-07 23:41:48 +00:00
parent b4c3a67d2c
commit 94be412327
2 changed files with 9 additions and 12 deletions

View File

@ -22,7 +22,6 @@ import java.util.Set;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.rx.Promise; import reactor.rx.Promise;
import reactor.rx.Stream; import reactor.rx.Stream;
import reactor.rx.Streams;
import org.springframework.core.convert.TypeDescriptor; import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.GenericConverter; import org.springframework.core.convert.converter.GenericConverter;
@ -52,13 +51,13 @@ public final class ReactiveStreamsToReactorStreamConverter implements GenericCon
return source; return source;
} }
else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.from((Publisher)source); return Stream.from((Publisher)source);
} }
else if (Promise.class.isAssignableFrom(source.getClass())) { else if (Promise.class.isAssignableFrom(source.getClass())) {
return source; return source;
} }
else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.from((Publisher)source).promise(); return Stream.from((Publisher)source).promise();
} }
return null; return null;
} }

View File

@ -29,12 +29,10 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import reactor.Flux; import reactor.Flux;
import reactor.core.subscriber.SubscriberBarrier; import reactor.core.subscriber.SubscriberBarrier;
import reactor.rx.Streams; import reactor.rx.Stream;
import reactor.rx.stream.Signal; import reactor.rx.stream.Signal;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
/** /**
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
@ -57,7 +55,7 @@ public class WriteWithOperatorTests {
public void errorBeforeFirstItem() throws Exception { public void errorBeforeFirstItem() throws Exception {
IllegalStateException error = new IllegalStateException("boo"); IllegalStateException error = new IllegalStateException("boo");
Publisher<Void> completion = Flux.<String>error(error).lift(this.operator); Publisher<Void> completion = Flux.<String>error(error).lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get(); List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
assertEquals(1, signals.size()); assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable()); assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());
@ -66,7 +64,7 @@ public class WriteWithOperatorTests {
@Test @Test
public void completionBeforeFirstItem() throws Exception { public void completionBeforeFirstItem() throws Exception {
Publisher<Void> completion = Flux.<String>empty().lift(this.operator); Publisher<Void> completion = Flux.<String>empty().lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get(); List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
assertEquals(1, signals.size()); assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -78,7 +76,7 @@ public class WriteWithOperatorTests {
@Test @Test
public void writeOneItem() throws Exception { public void writeOneItem() throws Exception {
Publisher<Void> completion = Flux.just("one").lift(this.operator); Publisher<Void> completion = Flux.just("one").lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get(); List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
assertEquals(1, signals.size()); assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -93,7 +91,7 @@ public class WriteWithOperatorTests {
public void writeMultipleItems() throws Exception { public void writeMultipleItems() throws Exception {
List<String> items = Arrays.asList("one", "two", "three"); List<String> items = Arrays.asList("one", "two", "three");
Publisher<Void> completion = Flux.fromIterable(items).lift(this.operator); Publisher<Void> completion = Flux.fromIterable(items).lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get(); List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
assertEquals(1, signals.size()); assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -116,7 +114,7 @@ public class WriteWithOperatorTests {
} }
}, subscriber -> new AtomicInteger()); }, subscriber -> new AtomicInteger());
Publisher<Void> completion = publisher.lift(this.operator); Publisher<Void> completion = publisher.lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get(); List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
assertEquals(1, signals.size()); assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable()); assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());