Polish ReactiveAdapterRegisry

This commit is contained in:
Rossen Stoyanchev 2018-05-18 09:29:39 -04:00
parent 285eb94a03
commit d58888777f
4 changed files with 79 additions and 86 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,11 +24,10 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Adapt a Reactive Streams {@link Publisher} to and from an async/reactive type
* such as {@code CompletableFuture}, an RxJava {@code Observable}, etc.
* Adapter for a Reactive Streams {@link Publisher} to and from an async/reactive
* type such as {@code CompletableFuture}, RxJava {@code Observable}, and others.
*
* <p>Use the {@link ReactiveAdapterRegistry} to register reactive types and
* obtain adapters from.
* <p>An adapter is typically obtained via {@link ReactiveAdapterRegistry}.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -71,28 +70,28 @@ public class ReactiveAdapter {
}
/**
* A shortcut for {@code getDescriptor().getReactiveType()}.
* Shortcut for {@code getDescriptor().getReactiveType()}.
*/
public Class<?> getReactiveType() {
return getDescriptor().getReactiveType();
}
/**
* A shortcut for {@code getDescriptor().isMultiValue()}.
* Shortcut for {@code getDescriptor().isMultiValue()}.
*/
public boolean isMultiValue() {
return getDescriptor().isMultiValue();
}
/**
* A shortcut for {@code getDescriptor().supportsEmpty()}.
* Shortcut for {@code getDescriptor().supportsEmpty()}.
*/
public boolean supportsEmpty() {
return getDescriptor().supportsEmpty();
}
/**
* A shortcut for {@code getDescriptor().isNoValue()}.
* Shortcut for {@code getDescriptor().isNoValue()}.
*/
public boolean isNoValue() {
return getDescriptor().isNoValue();
@ -100,8 +99,9 @@ public class ReactiveAdapter {
/**
* Adapt the given instance to a Reactive Streams Publisher.
* @param source the source object to adapt from
* Adapt the given instance to a Reactive Streams {@code Publisher}.
* @param source the source object to adapt from; if the given object is
* {@code null}, {@link ReactiveTypeDescriptor#getEmptyValue()} is used.
* @return the Publisher representing the adaptation
*/
@SuppressWarnings("unchecked")

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,13 +34,8 @@ import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import static org.springframework.core.ReactiveTypeDescriptor.multiValue;
import static org.springframework.core.ReactiveTypeDescriptor.noValue;
import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue;
import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue;
/**
* A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from
* A registry of adapters to adapt Reactive Streams {@link Publisher} to/from
* various async/reactive types such as {@code CompletableFuture}, RxJava
* {@code Observable}, and others.
*
@ -64,6 +59,7 @@ public class ReactiveAdapterRegistry {
/**
* Create a registry and auto-register default adapters.
* @see #getSharedInstance()
*/
public ReactiveAdapterRegistry() {
@ -168,49 +164,54 @@ public class ReactiveAdapterRegistry {
/**
* Return a shared default {@code ReactiveAdapterRegistry} instance, lazily
* building it once needed.
*
* <p><b>NOTE:</b> We highly recommend passing a long-lived, pre-configured
* {@code ReactiveAdapterRegistry} instance for customization purposes.
* This accessor is only meant as a fallback for code paths that want to
* fall back on a default instance if one isn't provided.
*
* @return the shared {@code ReactiveAdapterRegistry} instance (never {@code null})
* @since 5.0.2
*/
public static ReactiveAdapterRegistry getSharedInstance() {
ReactiveAdapterRegistry ar = sharedInstance;
if (ar == null) {
ReactiveAdapterRegistry registry = sharedInstance;
if (registry == null) {
synchronized (ReactiveAdapterRegistry.class) {
ar = sharedInstance;
if (ar == null) {
ar = new ReactiveAdapterRegistry();
sharedInstance = ar;
registry = sharedInstance;
if (registry == null) {
registry = new ReactiveAdapterRegistry();
sharedInstance = registry;
}
}
}
return ar;
return registry;
}
private static class ReactorRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
// Flux and Mono ahead of Publisher...
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
singleOptionalValue(Mono.class, Mono::empty),
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(multiValue(Flux.class, Flux::empty),
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(multiValue(Publisher.class, Flux::empty),
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
singleOptionalValue(CompletableFuture.class, () -> {
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
CompletableFuture<?> empty = new CompletableFuture<>();
empty.complete(null);
return empty;
@ -226,17 +227,17 @@ public class ReactiveAdapterRegistry {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
multiValue(rx.Observable.class, rx.Observable::empty),
ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty),
source -> RxReactiveStreams.toPublisher((rx.Observable<?>) source),
RxReactiveStreams::toObservable
);
registry.registerReactiveType(
singleRequiredValue(rx.Single.class),
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
source -> RxReactiveStreams.toPublisher((rx.Single<?>) source),
RxReactiveStreams::toSingle
);
registry.registerReactiveType(
noValue(rx.Completable.class, rx.Completable::complete),
ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete),
source -> RxReactiveStreams.toPublisher((rx.Completable) source),
RxReactiveStreams::toCompletable
);
@ -248,27 +249,27 @@ public class ReactiveAdapterRegistry {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
Flowable::fromPublisher
);
registry.registerReactiveType(
multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
);
registry.registerReactiveType(
singleRequiredValue(io.reactivex.Single.class),
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
);
registry.registerReactiveType(
singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
);
registry.registerReactiveType(
noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
);
@ -278,15 +279,15 @@ public class ReactiveAdapterRegistry {
private static class ReactorJdkFlowAdapterRegistrar {
// TODO: remove reflection when build requires JDK 9+
void registerAdapter(ReactiveAdapterRegistry registry) throws Exception {
// TODO: remove reflection when build requires JDK 9+
Class<?> type = ClassUtils.forName("java.util.concurrent.Flow.Publisher", getClass().getClassLoader());
Method toFluxMethod = getMethod("flowPublisherToFlux", type);
Method toFlowMethod = getMethod("publisherToFlowPublisher", Publisher.class);
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty());
registry.registerReactiveType(
multiValue(type, () -> emptyFlow),
ReactiveTypeDescriptor.multiValue(type, () -> emptyFlow),
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source),
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher)
);
@ -299,9 +300,10 @@ public class ReactiveAdapterRegistry {
/**
* Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as
* {@link Flux} or {@link Mono} depending on the underlying reactive type's
* stream semantics.
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
* {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
* This is important in places where only the stream and stream element type
* information is available like encoders and decoders.
*/
private static class ReactorAdapter extends ReactiveAdapter {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,8 +22,8 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Descriptor for a reactive type with information about its stream semantics, i.e.
* how many values it can produce.
* Describes the semantics of a reactive type including boolean checks for
* {@link #isMultiValue()}, {@link #supportsEmpty()}, and {@link #isNoValue()}.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -55,21 +55,12 @@ public class ReactiveTypeDescriptor {
/**
* Return the reactive type the descriptor was created for.
* Return the reactive type for this descriptor.
*/
public Class<?> getReactiveType() {
return this.reactiveType;
}
/**
* Return an empty-value instance for the underlying reactive or async type.
* Use of this type implies {@link #supportsEmpty()} is true.
*/
public Object getEmptyValue() {
Assert.state(this.emptyValueSupplier != null, "Empty values not supported");
return this.emptyValueSupplier.get();
}
/**
* Return {@code true} if the reactive type can produce more than 1 value
* can be produced and is therefore a good fit to adapt to {@code Flux}.
@ -95,6 +86,15 @@ public class ReactiveTypeDescriptor {
return this.noValue;
}
/**
* Return an empty-value instance for the underlying reactive or async type.
* Use of this type implies {@link #supportsEmpty()} is true.
*/
public Object getEmptyValue() {
Assert.state(this.emptyValueSupplier != null, "Empty values not supported");
return this.emptyValueSupplier.get();
}
@Override
public boolean equals(@Nullable Object other) {

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.core.convert.support;
package org.springframework.core;
import java.time.Duration;
import java.util.Arrays;
@ -32,16 +32,7 @@ import rx.Completable;
import rx.Observable;
import rx.Single;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ReactiveTypeDescriptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
/**
* Unit tests for {@link ReactiveAdapterRegistry}.
@ -54,7 +45,7 @@ public class ReactiveAdapterRegistryTests {
@Test
public void defaultAdapterRegistrations() throws Exception {
public void defaultAdapterRegistrations() {
// Reactor
assertNotNull(getAdapter(Mono.class));
@ -80,7 +71,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void getAdapterForReactiveSubType() throws Exception {
public void getAdapterForReactiveSubType() {
ReactiveAdapter adapter1 = getAdapter(Flux.class);
ReactiveAdapter adapter2 = getAdapter(FluxProcessor.class);
@ -99,7 +90,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToFlux() throws Exception {
public void publisherToFlux() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
@ -110,7 +101,7 @@ public class ReactiveAdapterRegistryTests {
// TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)?
@Test
public void publisherToMono() throws Exception {
public void publisherToMono() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
assertTrue(target instanceof Mono);
@ -126,7 +117,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToRxObservable() throws Exception {
public void publisherToRxObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(rx.Observable.class).fromPublisher(source);
@ -135,7 +126,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToRxSingle() throws Exception {
public void publisherToRxSingle() {
Publisher<Integer> source = Flowable.fromArray(1);
Object target = getAdapter(rx.Single.class).fromPublisher(source);
assertTrue(target instanceof rx.Single);
@ -143,7 +134,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToRxCompletable() throws Exception {
public void publisherToRxCompletable() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(rx.Completable.class).fromPublisher(source);
assertTrue(target instanceof rx.Completable);
@ -151,7 +142,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToReactivexFlowable() throws Exception {
public void publisherToReactivexFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).fromPublisher(source);
@ -160,7 +151,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToReactivexObservable() throws Exception {
public void publisherToReactivexObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
@ -169,7 +160,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToReactivexSingle() throws Exception {
public void publisherToReactivexSingle() {
Publisher<Integer> source = Flowable.fromArray(1);
Object target = getAdapter(io.reactivex.Single.class).fromPublisher(source);
assertTrue(target instanceof io.reactivex.Single);
@ -177,7 +168,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void publisherToReactivexCompletable() throws Exception {
public void publisherToReactivexCompletable() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source);
assertTrue(target instanceof io.reactivex.Completable);
@ -185,7 +176,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void rxObservableToPublisher() throws Exception {
public void rxObservableToPublisher() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
@ -194,7 +185,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void rxSingleToPublisher() throws Exception {
public void rxSingleToPublisher() {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
@ -202,7 +193,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void rxCompletableToPublisher() throws Exception {
public void rxCompletableToPublisher() {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
@ -210,7 +201,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void reactivexFlowableToPublisher() throws Exception {
public void reactivexFlowableToPublisher() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
@ -219,7 +210,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void reactivexObservableToPublisher() throws Exception {
public void reactivexObservableToPublisher() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
@ -228,7 +219,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void reactivexSingleToPublisher() throws Exception {
public void reactivexSingleToPublisher() {
Object source = io.reactivex.Single.just(1);
Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
@ -236,7 +227,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void reactivexCompletableToPublisher() throws Exception {
public void reactivexCompletableToPublisher() {
Object source = io.reactivex.Completable.complete();
Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
@ -244,7 +235,7 @@ public class ReactiveAdapterRegistryTests {
}
@Test
public void CompletableFutureToPublisher() throws Exception {
public void CompletableFutureToPublisher() {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.complete(1);
Object target = getAdapter(CompletableFuture.class).toPublisher(future);