remove reactor-stream and related artefacts, update tests
This commit is contained in:
parent
379ce6354e
commit
ce4a687cf2
|
|
@ -19,10 +19,10 @@ jar {
|
|||
group = 'org.springframework.reactive'
|
||||
|
||||
repositories {
|
||||
mavenCentral()
|
||||
maven { url 'https://oss.jfrog.org/libs-snapshot' } // RxNetty 0.5.x snapshots
|
||||
maven { url 'http://repo.spring.io/snapshot' } // Reactor snapshot
|
||||
mavenLocal()
|
||||
mavenLocal()
|
||||
mavenCentral()
|
||||
maven { url 'https://oss.jfrog.org/libs-snapshot' } // RxNetty 0.5.x snapshots
|
||||
maven { url 'http://repo.spring.io/snapshot' } // Reactor snapshot
|
||||
}
|
||||
|
||||
configurations.all {
|
||||
|
|
@ -88,7 +88,6 @@ dependencies {
|
|||
optional 'io.reactivex:rxjava:1.1.0'
|
||||
optional "io.reactivex:rxnetty-http:0.5.0-SNAPSHOT"
|
||||
optional "com.fasterxml.jackson.core:jackson-databind:2.6.2"
|
||||
optional "io.projectreactor:reactor-stream:${reactorVersion}"
|
||||
optional "io.projectreactor:reactor-net:${reactorVersion}"
|
||||
optional "io.projectreactor:reactor-io:${reactorVersion}"
|
||||
optional "org.apache.tomcat:tomcat-util:${tomcatVersion}"
|
||||
|
|
|
|||
|
|
@ -1,65 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.core.convert.support;
|
||||
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.rx.Promise;
|
||||
|
||||
import org.springframework.core.convert.TypeDescriptor;
|
||||
import org.springframework.core.convert.converter.GenericConverter;
|
||||
|
||||
/**
|
||||
* @author Stephane Maldini
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public final class ReactiveStreamsToReactorFluxionConverter implements GenericConverter {
|
||||
|
||||
@Override
|
||||
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
|
||||
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
|
||||
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Fluxion.class));
|
||||
pairs.add(new GenericConverter.ConvertiblePair(Fluxion.class, Publisher.class));
|
||||
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Promise.class));
|
||||
pairs.add(new GenericConverter.ConvertiblePair(Promise.class, Publisher.class));
|
||||
return pairs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
|
||||
if (source == null) {
|
||||
return null;
|
||||
}
|
||||
if (Fluxion.class.isAssignableFrom(source.getClass())) {
|
||||
return source;
|
||||
}
|
||||
else if (Fluxion.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
|
||||
return Fluxion.from((Publisher)source);
|
||||
}
|
||||
else if (Promise.class.isAssignableFrom(source.getClass())) {
|
||||
return source;
|
||||
}
|
||||
else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
|
||||
return Fluxion.from((Publisher)source).promise();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -21,10 +21,10 @@ import java.time.Duration;
|
|||
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.SchedulerGroup;
|
||||
import reactor.core.timer.Timer;
|
||||
import reactor.rx.Fluxion;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
||||
|
|
@ -64,11 +64,11 @@ public class AsyncIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
@Override
|
||||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
return response.setBody(Fluxion.just("h", "e", "l", "l", "o")
|
||||
.useTimer(Timer.global())
|
||||
.delay(Duration.ofMillis(100))
|
||||
.dispatchOn(asyncGroup)
|
||||
.collect(allocator::allocateBuffer,
|
||||
return response.setBody(Flux.just("h", "e", "l", "l", "o")
|
||||
.useTimer(Timer.global())
|
||||
.delay(Duration.ofMillis(100))
|
||||
.dispatchOn(asyncGroup)
|
||||
.collect(allocator::allocateBuffer,
|
||||
(buffer, str) -> buffer.write(str.getBytes())));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,9 +29,8 @@ import org.reactivestreams.Subscriber;
|
|||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
import reactor.core.subscriber.SubscriberBarrier;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.rx.Signal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
|
@ -58,19 +57,19 @@ public class WriteWithOperatorTests {
|
|||
public void errorBeforeFirstItem() throws Exception {
|
||||
IllegalStateException error = new IllegalStateException("boo");
|
||||
Mono<Void> completion = Mono.<String>error(error).as(this::writeWithOperator);
|
||||
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
|
||||
Signal<Void> signal = completion.materialize().get();
|
||||
|
||||
assertEquals(1, signals.size());
|
||||
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());
|
||||
assertNotNull(signal);
|
||||
assertSame("Unexpected signal: " + signal, error, signal.getThrowable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void completionBeforeFirstItem() throws Exception {
|
||||
Mono<Void> completion = Flux.<String>empty().as(this::writeWithOperator);
|
||||
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
|
||||
Signal<Void> signal = completion.materialize().get();
|
||||
|
||||
assertEquals(1, signals.size());
|
||||
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
|
||||
assertNotNull(signal);
|
||||
assertTrue("Unexpected signal: " + signal, signal.isOnComplete());
|
||||
|
||||
assertEquals(0, this.writer.items.size());
|
||||
assertTrue(this.writer.completed);
|
||||
|
|
@ -79,10 +78,10 @@ public class WriteWithOperatorTests {
|
|||
@Test
|
||||
public void writeOneItem() throws Exception {
|
||||
Mono<Void> completion = Flux.just("one").as(this::writeWithOperator);
|
||||
List<Signal<Void>> signals =completion.as(Fluxion::from).materialize().toList().get();
|
||||
Signal<Void> signal = completion.materialize().get();
|
||||
|
||||
assertEquals(1, signals.size());
|
||||
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
|
||||
assertNotNull(signal);
|
||||
assertTrue("Unexpected signal: " + signal, signal.isOnComplete());
|
||||
|
||||
assertEquals(1, this.writer.items.size());
|
||||
assertEquals("one", this.writer.items.get(0));
|
||||
|
|
@ -94,10 +93,10 @@ public class WriteWithOperatorTests {
|
|||
public void writeMultipleItems() throws Exception {
|
||||
List<String> items = Arrays.asList("one", "two", "three");
|
||||
Mono<Void> completion = Flux.fromIterable(items).as(this::writeWithOperator);
|
||||
List<Signal<Void>> signals = completion.as(Fluxion::from).materialize().toList().get();
|
||||
Signal<Void> signal = completion.materialize().get();
|
||||
|
||||
assertEquals(1, signals.size());
|
||||
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
|
||||
assertNotNull(signal);
|
||||
assertTrue("Unexpected signal: " + signal, signal.isOnComplete());
|
||||
|
||||
assertEquals(3, this.writer.items.size());
|
||||
assertEquals("one", this.writer.items.get(0));
|
||||
|
|
@ -117,10 +116,10 @@ public class WriteWithOperatorTests {
|
|||
}
|
||||
}, subscriber -> new AtomicInteger());
|
||||
Mono<Void> completion = publisher.as(this::writeWithOperator);
|
||||
List<Signal<Void>> signals = completion.as(Fluxion::from).materialize().toList().get();
|
||||
Signal<Void> signal = completion.materialize().get();
|
||||
|
||||
assertEquals(1, signals.size());
|
||||
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());
|
||||
assertNotNull(signal);
|
||||
assertSame("Unexpected signal: " + signal, error, signal.getThrowable());
|
||||
|
||||
assertEquals(3, this.writer.items.size());
|
||||
assertEquals("1", this.writer.items.get(0));
|
||||
|
|
|
|||
|
|
@ -24,8 +24,8 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.rx.Signal;
|
||||
import reactor.core.publisher.Signal;
|
||||
import reactor.core.util.SignalKind;
|
||||
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
|
@ -105,7 +105,7 @@ public class DispatcherHandlerErrorTests {
|
|||
public void noHandler() throws Exception {
|
||||
this.request.setUri(new URI("/does-not-exist"));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
assertEquals(ResponseStatusException.class, ex.getClass());
|
||||
|
|
@ -117,7 +117,7 @@ public class DispatcherHandlerErrorTests {
|
|||
public void noResolverForArgument() throws Exception {
|
||||
this.request.setUri(new URI("/unknown-argument-type"));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
assertEquals(IllegalStateException.class, ex.getClass());
|
||||
|
|
@ -128,7 +128,7 @@ public class DispatcherHandlerErrorTests {
|
|||
public void controllerMethodError() throws Exception {
|
||||
this.request.setUri(new URI("/error-signal"));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
assertSame(EXCEPTION, ex);
|
||||
|
|
@ -138,7 +138,7 @@ public class DispatcherHandlerErrorTests {
|
|||
public void controllerMethodWithThrownException() throws Exception {
|
||||
this.request.setUri(new URI("/raise-exception"));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
assertSame(EXCEPTION, ex);
|
||||
|
|
@ -148,7 +148,7 @@ public class DispatcherHandlerErrorTests {
|
|||
public void noHandlerResultHandler() throws Exception {
|
||||
this.request.setUri(new URI("/unknown-return-type"));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
assertEquals(IllegalStateException.class, ex.getClass());
|
||||
|
|
@ -163,7 +163,7 @@ public class DispatcherHandlerErrorTests {
|
|||
.write("body".getBytes("UTF-8"));
|
||||
this.request.setBody(Mono.just(buffer));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
assertEquals(ResponseStatusException.class, ex.getClass());
|
||||
|
|
@ -176,7 +176,7 @@ public class DispatcherHandlerErrorTests {
|
|||
this.request.setUri(new URI("/request-body"));
|
||||
this.request.setBody(Mono.error(EXCEPTION));
|
||||
|
||||
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
||||
ex.printStackTrace();
|
||||
|
|
@ -190,9 +190,9 @@ public class DispatcherHandlerErrorTests {
|
|||
|
||||
WebExceptionHandler exceptionHandler = new ServerError500ExceptionHandler();
|
||||
WebHandler webHandler = new ExceptionHandlingWebHandler(this.dispatcherHandler, exceptionHandler);
|
||||
Publisher<Void> publisher = webHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = webHandler.handle(this.exchange);
|
||||
|
||||
Fluxion.from(publisher).toList().get();
|
||||
publisher.get();
|
||||
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
|
||||
}
|
||||
|
||||
|
|
@ -202,16 +202,16 @@ public class DispatcherHandlerErrorTests {
|
|||
|
||||
WebHandler webHandler = new FilteringWebHandler(this.dispatcherHandler, new TestWebFilter());
|
||||
webHandler = new ExceptionHandlingWebHandler(webHandler, new ServerError500ExceptionHandler());
|
||||
Publisher<Void> publisher = webHandler.handle(this.exchange);
|
||||
Mono<Void> publisher = webHandler.handle(this.exchange);
|
||||
|
||||
Fluxion.from(publisher).toList().get();
|
||||
publisher.get();
|
||||
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
|
||||
}
|
||||
|
||||
|
||||
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
|
||||
Signal<?> signal = Fluxion.from(publisher).materialize().toList().get().get(0);
|
||||
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
|
||||
private Throwable awaitErrorSignal(Mono<?> mono) throws Exception {
|
||||
Signal<?> signal = mono.materialize().get();
|
||||
assertEquals("Unexpected signal: " + signal, SignalKind.onError, signal.getType());
|
||||
return signal.getThrowable();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,13 +16,11 @@
|
|||
package org.springframework.web.reactive;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.rx.Signal;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
|
@ -61,21 +59,21 @@ public class ResponseStatusExceptionHandlerTests {
|
|||
@Test
|
||||
public void handleException() throws Exception {
|
||||
Throwable ex = new ResponseStatusException(HttpStatus.BAD_REQUEST);
|
||||
Publisher<Void> publisher = this.handler.handle(this.exchange, ex);
|
||||
Mono<Void> publisher = this.handler.handle(this.exchange, ex);
|
||||
|
||||
Fluxion.from(publisher).toList().get();
|
||||
publisher.get();
|
||||
assertEquals(HttpStatus.BAD_REQUEST, this.response.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unresolvedException() throws Exception {
|
||||
Throwable ex = new IllegalStateException();
|
||||
Publisher<Void> publisher = this.handler.handle(this.exchange, ex);
|
||||
Mono<Void> publisher = this.handler.handle(this.exchange, ex);
|
||||
|
||||
List<Signal<Void>> signals = Fluxion.from(publisher).materialize().toList().get();
|
||||
assertEquals(1, signals.size());
|
||||
assertTrue(signals.get(0).hasError());
|
||||
assertSame(ex, signals.get(0).getThrowable());
|
||||
Signal<Void> signal = publisher.materialize().get();
|
||||
assertNotNull(signal);
|
||||
assertTrue(signal.hasError());
|
||||
assertSame(ex, signal.getThrowable());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,13 +20,12 @@ import java.util.concurrent.CompletableFuture;
|
|||
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.core.publisher.Flux;
|
||||
import rx.Observable;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.convert.support.GenericConversionService;
|
||||
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
|
||||
import org.springframework.core.convert.support.ReactiveStreamsToReactorFluxionConverter;
|
||||
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
|
||||
import org.springframework.ui.ExtendedModelMap;
|
||||
import org.springframework.web.method.HandlerMethod;
|
||||
|
|
@ -77,7 +76,6 @@ public class SimpleHandlerResultHandlerTests {
|
|||
|
||||
GenericConversionService conversionService = new GenericConversionService();
|
||||
conversionService.addConverter(new ReactiveStreamsToCompletableFutureConverter());
|
||||
conversionService.addConverter(new ReactiveStreamsToReactorFluxionConverter());
|
||||
conversionService.addConverter(new ReactiveStreamsToRxJava1Converter());
|
||||
SimpleHandlerResultHandler resultHandler = new SimpleHandlerResultHandler(conversionService);
|
||||
TestController controller = new TestController();
|
||||
|
|
@ -126,7 +124,7 @@ public class SimpleHandlerResultHandlerTests {
|
|||
return null;
|
||||
}
|
||||
|
||||
public Fluxion<Void> streamVoid() {
|
||||
public Flux<Void> streamVoid() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,15 +19,14 @@ import java.lang.reflect.Method;
|
|||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.rx.Signal;
|
||||
import reactor.core.publisher.Signal;
|
||||
import reactor.core.util.SignalKind;
|
||||
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
|
|
@ -42,6 +41,7 @@ import org.springframework.web.server.adapter.DefaultServerWebExchange;
|
|||
import org.springframework.web.server.session.WebSessionManager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
|
@ -72,11 +72,11 @@ public class InvocableHandlerMethodTests {
|
|||
public void noArgsMethod() throws Exception {
|
||||
InvocableHandlerMethod hm = createHandlerMethod("noArgs");
|
||||
|
||||
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
|
||||
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
|
||||
Mono<HandlerResult> mono = hm.invokeForRequest(this.exchange, this.model);
|
||||
HandlerResult value = mono.get();
|
||||
|
||||
assertEquals(1, values.size());
|
||||
assertEquals("success", values.get(0).getReturnValue().get());
|
||||
assertNotNull(value);
|
||||
assertEquals("success", value.getReturnValue().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -85,11 +85,11 @@ public class InvocableHandlerMethodTests {
|
|||
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
|
||||
hm.setHandlerMethodArgumentResolvers(Collections.singletonList(new RequestParamArgumentResolver()));
|
||||
|
||||
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
|
||||
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
|
||||
Mono<HandlerResult> mono = hm.invokeForRequest(this.exchange, this.model);
|
||||
HandlerResult value = mono.get();
|
||||
|
||||
assertEquals(1, values.size());
|
||||
assertEquals("success:null", values.get(0).getReturnValue().get());
|
||||
assertNotNull(value);
|
||||
assertEquals("success:null", value.getReturnValue().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -97,11 +97,11 @@ public class InvocableHandlerMethodTests {
|
|||
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
|
||||
addResolver(hm, Mono.just("value1"));
|
||||
|
||||
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
|
||||
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
|
||||
Mono<HandlerResult> mono = hm.invokeForRequest(this.exchange, this.model);
|
||||
HandlerResult value = mono.get();
|
||||
|
||||
assertEquals(1, values.size());
|
||||
assertEquals("success:value1", values.get(0).getReturnValue().get());
|
||||
assertNotNull(value);
|
||||
assertEquals("success:value1", value.getReturnValue().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -109,11 +109,11 @@ public class InvocableHandlerMethodTests {
|
|||
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
|
||||
addResolver(hm, Flux.fromIterable(Arrays.asList("value1", "value2", "value3")));
|
||||
|
||||
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
|
||||
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
|
||||
Mono<HandlerResult> mono = hm.invokeForRequest(this.exchange, this.model);
|
||||
HandlerResult value = mono.get();
|
||||
|
||||
assertEquals(1, values.size());
|
||||
assertEquals("success:value1", values.get(0).getReturnValue().get());
|
||||
assertNotNull(value);
|
||||
assertEquals("success:value1", value.getReturnValue().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -200,8 +200,8 @@ public class InvocableHandlerMethodTests {
|
|||
}
|
||||
|
||||
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
|
||||
Signal<?> signal = Fluxion.from(publisher).materialize().toList().get().get(0);
|
||||
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
|
||||
Signal<?> signal = Flux.from(publisher).materialize().toList().get().get(0);
|
||||
assertEquals("Unexpected signal: " + signal, SignalKind.onError, signal.getType());
|
||||
return signal.getThrowable();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,6 @@ import org.junit.Test;
|
|||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.rx.Fluxion;
|
||||
import reactor.rx.Promise;
|
||||
import rx.Observable;
|
||||
import rx.Single;
|
||||
|
||||
|
|
@ -48,7 +46,6 @@ import org.springframework.core.codec.support.StringEncoder;
|
|||
import org.springframework.core.convert.ConversionService;
|
||||
import org.springframework.core.convert.support.GenericConversionService;
|
||||
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
|
||||
import org.springframework.core.convert.support.ReactiveStreamsToReactorFluxionConverter;
|
||||
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
||||
|
|
@ -205,11 +202,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
serializeAsPojo("http://localhost:" + port + "/single");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void serializeAsPromise() throws Exception {
|
||||
serializeAsPojo("http://localhost:" + port + "/promise");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void serializeAsList() throws Exception {
|
||||
serializeAsCollection("http://localhost:" + port + "/list");
|
||||
|
|
@ -250,11 +242,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
capitalizeCollection("http://localhost:" + port + "/observable-capitalize");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void streamCapitalize() throws Exception {
|
||||
capitalizeCollection("http://localhost:" + port + "/stream-capitalize");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCapitalize() throws Exception {
|
||||
capitalizePojo("http://localhost:" + port + "/person-capitalize");
|
||||
|
|
@ -275,11 +262,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
capitalizePojo("http://localhost:" + port + "/single-capitalize");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void promiseCapitalize() throws Exception {
|
||||
capitalizePojo("http://localhost:" + this.port + "/promise-capitalize");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publisherCreate() throws Exception {
|
||||
create("http://localhost:" + this.port + "/publisher-create");
|
||||
|
|
@ -290,11 +272,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
create("http://localhost:" + this.port + "/flux-create");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void streamCreate() throws Exception {
|
||||
create("http://localhost:" + this.port + "/stream-create");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void observableCreate() throws Exception {
|
||||
create("http://localhost:" + this.port + "/observable-create");
|
||||
|
|
@ -399,7 +376,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
// TODO: test failures with DefaultConversionService
|
||||
GenericConversionService service = new GenericConversionService();
|
||||
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
|
||||
service.addConverter(new ReactiveStreamsToReactorFluxionConverter());
|
||||
service.addConverter(new ReactiveStreamsToRxJava1Converter());
|
||||
return service;
|
||||
}
|
||||
|
|
@ -492,7 +468,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
|
||||
@RequestMapping("/stream-result")
|
||||
public Publisher<Long> stringStreamResponseBody() {
|
||||
return Flux.interval(Duration.ofSeconds(1)).as(Fluxion::from).take(5);
|
||||
return Flux.interval(Duration.ofSeconds(1)).take(5);
|
||||
}
|
||||
|
||||
@RequestMapping("/raw-flux")
|
||||
|
|
@ -515,11 +491,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
return Single.just(new Person("Robert"));
|
||||
}
|
||||
|
||||
@RequestMapping("/promise")
|
||||
public Promise<Person> promiseResponseBody() {
|
||||
return Promise.success(new Person("Robert"));
|
||||
}
|
||||
|
||||
@RequestMapping("/list")
|
||||
public List<Person> listResponseBody() {
|
||||
return Arrays.asList(new Person("Robert"), new Person("Marie"));
|
||||
|
|
@ -541,8 +512,8 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
}
|
||||
|
||||
@RequestMapping("/stream")
|
||||
public Fluxion<Person> reactorStreamResponseBody() {
|
||||
return Fluxion.just(new Person("Robert"), new Person("Marie"));
|
||||
public Flux<Person> reactorStreamResponseBody() {
|
||||
return Flux.just(new Person("Robert"), new Person("Marie"));
|
||||
}
|
||||
|
||||
@RequestMapping("/publisher-capitalize")
|
||||
|
|
@ -562,9 +533,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
return persons.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
|
||||
@RequestMapping("/stream-capitalize")
|
||||
public Fluxion<Person> streamCapitalize(@RequestBody Fluxion<Person> persons) {
|
||||
return persons.map(person -> new Person(person.getName().toUpperCase()));
|
||||
@RequestMapping("/stream-create")
|
||||
public Publisher<Void> streamCreate(@RequestBody Flux<Person> personStream) {
|
||||
return personStream.toList().doOnSuccess(persons::addAll).after();
|
||||
}
|
||||
|
||||
@RequestMapping("/person-capitalize")
|
||||
|
|
@ -588,11 +559,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
|
||||
@RequestMapping("/promise-capitalize")
|
||||
public Promise<Person> promiseCapitalize(@RequestBody Promise<Person> personFuture) {
|
||||
return Fluxion.from(personFuture.map(person -> new Person(person.getName().toUpperCase()))).promise();
|
||||
}
|
||||
|
||||
@RequestMapping("/publisher-create")
|
||||
public Publisher<Void> publisherCreate(@RequestBody Publisher<Person> personStream) {
|
||||
return Flux.from(personStream).doOnNext(persons::add).after();
|
||||
|
|
@ -603,11 +569,6 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
|||
return personStream.doOnNext(persons::add).after();
|
||||
}
|
||||
|
||||
@RequestMapping("/stream-create")
|
||||
public Publisher<Void> streamCreate(@RequestBody Fluxion<Person> personStream) {
|
||||
return personStream.toList().doOnSuccess(persons::addAll).after();
|
||||
}
|
||||
|
||||
@RequestMapping("/observable-create")
|
||||
public Observable<Void> observableCreate(@RequestBody Observable<Person> personStream) {
|
||||
return personStream.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
|
||||
|
|
|
|||
Loading…
Reference in New Issue