Complete reactive conversion support refactoring

This commit ensures stream semantics (Flux vs Mono) are adhered to also
on the target side.
This commit is contained in:
Rossen Stoyanchev 2016-07-01 17:28:20 -04:00
parent 71f4dff011
commit df64262db6
6 changed files with 156 additions and 17 deletions

View File

@ -35,7 +35,7 @@ public class MonoToCompletableFutureConverter implements GenericConverter {
public Set<ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, CompletableFuture.class));
pairs.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Mono.class));
return pairs;
}

View File

@ -40,9 +40,9 @@ public final class ReactorToRxJava1Converter implements GenericConverter {
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Flux.class, Observable.class));
pairs.add(new GenericConverter.ConvertiblePair(Observable.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Observable.class, Flux.class));
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, Single.class));
pairs.add(new GenericConverter.ConvertiblePair(Single.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Single.class, Mono.class));
return pairs;
}

View File

@ -19,6 +19,7 @@ package org.springframework.web.reactive.result;
import java.util.Optional;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.Ordered;
@ -53,6 +54,13 @@ public class SimpleResultHandler implements Ordered, HandlerResultHandler {
}
/**
* Return the configured {@link ConversionService}.
*/
public ConversionService getConversionService() {
return this.conversionService;
}
/**
* Set the order for this result handler relative to others.
* <p>By default this is set to {@link Ordered#LOWEST_PRECEDENCE} and is
@ -76,7 +84,8 @@ public class SimpleResultHandler implements Ordered, HandlerResultHandler {
if (Void.TYPE.equals(type.getRawClass())) {
return true;
}
if (this.conversionService.canConvert(type.getRawClass(), Publisher.class)) {
if (getConversionService().canConvert(type.getRawClass(), Mono.class) ||
getConversionService().canConvert(type.getRawClass(), Flux.class)) {
Class<?> clazz = result.getReturnValueType().getGeneric(0).getRawClass();
return Void.class.equals(clazz);
}
@ -90,11 +99,19 @@ public class SimpleResultHandler implements Ordered, HandlerResultHandler {
if (!optional.isPresent()) {
return Mono.empty();
}
Object returnValue = optional.get();
if (returnValue instanceof Mono) {
return (Mono<Void>) returnValue;
}
return Mono.from(this.conversionService.convert(returnValue, Publisher.class));
ResolvableType returnType = result.getReturnValueType();
if (getConversionService().canConvert(returnType.getRawClass(), Mono.class)) {
return this.conversionService.convert(returnValue, Mono.class);
}
else {
return this.conversionService.convert(returnValue, Flux.class).single();
}
}
}

View File

@ -72,21 +72,23 @@ public abstract class AbstractMessageConverterResultHandler extends ContentNegot
@SuppressWarnings("unchecked")
protected Mono<Void> writeBody(ServerWebExchange exchange, Object body, ResolvableType bodyType) {
Publisher<?> publisher;
ResolvableType elementType;
boolean convertToFlux = getConversionService().canConvert(bodyType.getRawClass(), Flux.class);
boolean convertToMono = getConversionService().canConvert(bodyType.getRawClass(), Mono.class);
if (getConversionService().canConvert(bodyType.getRawClass(), Publisher.class)) {
if (body != null) {
publisher = getConversionService().convert(body, Publisher.class);
}
else {
publisher = Mono.empty();
}
elementType = bodyType.getGeneric(0);
ResolvableType elementType = convertToFlux || convertToMono ? bodyType.getGeneric(0) : bodyType;
Publisher<?> publisher;
if (body == null) {
publisher = Mono.empty();
}
else if (convertToMono) {
publisher = getConversionService().convert(body, Mono.class);
}
else if (convertToFlux) {
publisher = getConversionService().convert(body, Flux.class);
}
else {
publisher = Mono.justOrEmpty(body);
elementType = bodyType;
publisher = Mono.just(body);
}
if (Void.class.equals(elementType.getRawClass())) {

View File

@ -0,0 +1,58 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.convert.support;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link ReactorToRxJava1Converter}.
* @author Rossen Stoyanchev
*/
public class MonoToCompletableFutureConverterTests {
private GenericConversionService conversionService;
@Before
public void setUp() throws Exception {
this.conversionService = new GenericConversionService();
this.conversionService.addConverter(new MonoToCompletableFutureConverter());
}
@Test
public void canConvert() throws Exception {
assertTrue(this.conversionService.canConvert(Mono.class, CompletableFuture.class));
assertTrue(this.conversionService.canConvert(CompletableFuture.class, Mono.class));
assertFalse(this.conversionService.canConvert(Flux.class, CompletableFuture.class));
assertFalse(this.conversionService.canConvert(CompletableFuture.class, Flux.class));
assertFalse(this.conversionService.canConvert(Publisher.class, CompletableFuture.class));
assertFalse(this.conversionService.canConvert(CompletableFuture.class, Publisher.class));
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.convert.support;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link ReactorToRxJava1Converter}.
* @author Rossen Stoyanchev
*/
public class ReactorToRxJava1ConverterTests {
private GenericConversionService conversionService;
@Before
public void setUp() throws Exception {
this.conversionService = new GenericConversionService();
this.conversionService.addConverter(new ReactorToRxJava1Converter());
}
@Test
public void canConvert() throws Exception {
assertTrue(this.conversionService.canConvert(Flux.class, Observable.class));
assertTrue(this.conversionService.canConvert(Observable.class, Flux.class));
assertTrue(this.conversionService.canConvert(Mono.class, Single.class));
assertTrue(this.conversionService.canConvert(Single.class, Mono.class));
assertFalse(this.conversionService.canConvert(Flux.class, Single.class));
assertFalse(this.conversionService.canConvert(Single.class, Flux.class));
assertFalse(this.conversionService.canConvert(Mono.class, Observable.class));
assertFalse(this.conversionService.canConvert(Observable.class, Mono.class));
assertFalse(this.conversionService.canConvert(Publisher.class, Observable.class));
assertFalse(this.conversionService.canConvert(Observable.class, Publisher.class));
}
}