diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java index b130703e8d0..422e71bfc0e 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,10 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * Adapt a Reactive Streams {@link Publisher} to and from an async/reactive type - * such as {@code CompletableFuture}, an RxJava {@code Observable}, etc. + * Adapter for a Reactive Streams {@link Publisher} to and from an async/reactive + * type such as {@code CompletableFuture}, RxJava {@code Observable}, and others. * - *

Use the {@link ReactiveAdapterRegistry} to register reactive types and - * obtain adapters from. + *

An adapter is typically obtained via {@link ReactiveAdapterRegistry}. * * @author Rossen Stoyanchev * @since 5.0 @@ -71,28 +70,28 @@ public class ReactiveAdapter { } /** - * A shortcut for {@code getDescriptor().getReactiveType()}. + * Shortcut for {@code getDescriptor().getReactiveType()}. */ public Class getReactiveType() { return getDescriptor().getReactiveType(); } /** - * A shortcut for {@code getDescriptor().isMultiValue()}. + * Shortcut for {@code getDescriptor().isMultiValue()}. */ public boolean isMultiValue() { return getDescriptor().isMultiValue(); } /** - * A shortcut for {@code getDescriptor().supportsEmpty()}. + * Shortcut for {@code getDescriptor().supportsEmpty()}. */ public boolean supportsEmpty() { return getDescriptor().supportsEmpty(); } /** - * A shortcut for {@code getDescriptor().isNoValue()}. + * Shortcut for {@code getDescriptor().isNoValue()}. */ public boolean isNoValue() { return getDescriptor().isNoValue(); @@ -100,8 +99,9 @@ public class ReactiveAdapter { /** - * Adapt the given instance to a Reactive Streams Publisher. - * @param source the source object to adapt from + * Adapt the given instance to a Reactive Streams {@code Publisher}. + * @param source the source object to adapt from; if the given object is + * {@code null}, {@link ReactiveTypeDescriptor#getEmptyValue()} is used. * @return the Publisher representing the adaptation */ @SuppressWarnings("unchecked") diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index e9c5ed94589..0a6e253d940 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,13 +34,8 @@ import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; -import static org.springframework.core.ReactiveTypeDescriptor.multiValue; -import static org.springframework.core.ReactiveTypeDescriptor.noValue; -import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue; -import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue; - /** - * A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from + * A registry of adapters to adapt Reactive Streams {@link Publisher} to/from * various async/reactive types such as {@code CompletableFuture}, RxJava * {@code Observable}, and others. * @@ -64,6 +59,7 @@ public class ReactiveAdapterRegistry { /** * Create a registry and auto-register default adapters. + * @see #getSharedInstance() */ public ReactiveAdapterRegistry() { @@ -168,49 +164,54 @@ public class ReactiveAdapterRegistry { /** * Return a shared default {@code ReactiveAdapterRegistry} instance, lazily * building it once needed. + * *

NOTE: We highly recommend passing a long-lived, pre-configured * {@code ReactiveAdapterRegistry} instance for customization purposes. * This accessor is only meant as a fallback for code paths that want to * fall back on a default instance if one isn't provided. + * * @return the shared {@code ReactiveAdapterRegistry} instance (never {@code null}) * @since 5.0.2 */ public static ReactiveAdapterRegistry getSharedInstance() { - ReactiveAdapterRegistry ar = sharedInstance; - if (ar == null) { + ReactiveAdapterRegistry registry = sharedInstance; + if (registry == null) { synchronized (ReactiveAdapterRegistry.class) { - ar = sharedInstance; - if (ar == null) { - ar = new ReactiveAdapterRegistry(); - sharedInstance = ar; + registry = sharedInstance; + if (registry == null) { + registry = new ReactiveAdapterRegistry(); + sharedInstance = registry; } } } - return ar; + return registry; } private static class ReactorRegistrar { void registerAdapters(ReactiveAdapterRegistry registry) { - // Flux and Mono ahead of Publisher... + + // Register Flux and Mono before Publisher... registry.registerReactiveType( - singleOptionalValue(Mono.class, Mono::empty), + ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty), source -> (Mono) source, Mono::from ); - registry.registerReactiveType(multiValue(Flux.class, Flux::empty), + registry.registerReactiveType( + ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty), source -> (Flux) source, Flux::from); - registry.registerReactiveType(multiValue(Publisher.class, Flux::empty), + registry.registerReactiveType( + ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty), source -> (Publisher) source, source -> source); registry.registerReactiveType( - singleOptionalValue(CompletableFuture.class, () -> { + ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> { CompletableFuture empty = new CompletableFuture<>(); empty.complete(null); return empty; @@ -226,17 +227,17 @@ public class ReactiveAdapterRegistry { void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - multiValue(rx.Observable.class, rx.Observable::empty), + ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty), source -> RxReactiveStreams.toPublisher((rx.Observable) source), RxReactiveStreams::toObservable ); registry.registerReactiveType( - singleRequiredValue(rx.Single.class), + ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class), source -> RxReactiveStreams.toPublisher((rx.Single) source), RxReactiveStreams::toSingle ); registry.registerReactiveType( - noValue(rx.Completable.class, rx.Completable::complete), + ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete), source -> RxReactiveStreams.toPublisher((rx.Completable) source), RxReactiveStreams::toCompletable ); @@ -248,27 +249,27 @@ public class ReactiveAdapterRegistry { void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty), + ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty), source -> (io.reactivex.Flowable) source, Flowable::fromPublisher ); registry.registerReactiveType( - multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty), + ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty), source -> ((io.reactivex.Observable) source).toFlowable(BackpressureStrategy.BUFFER), source -> io.reactivex.Flowable.fromPublisher(source).toObservable() ); registry.registerReactiveType( - singleRequiredValue(io.reactivex.Single.class), + ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class), source -> ((io.reactivex.Single) source).toFlowable(), source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle() ); registry.registerReactiveType( - singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty), + ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty), source -> ((io.reactivex.Maybe) source).toFlowable(), source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement() ); registry.registerReactiveType( - noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete), + ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete), source -> ((io.reactivex.Completable) source).toFlowable(), source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements() ); @@ -278,15 +279,15 @@ public class ReactiveAdapterRegistry { private static class ReactorJdkFlowAdapterRegistrar { - // TODO: remove reflection when build requires JDK 9+ void registerAdapter(ReactiveAdapterRegistry registry) throws Exception { + // TODO: remove reflection when build requires JDK 9+ Class type = ClassUtils.forName("java.util.concurrent.Flow.Publisher", getClass().getClassLoader()); Method toFluxMethod = getMethod("flowPublisherToFlux", type); Method toFlowMethod = getMethod("publisherToFlowPublisher", Publisher.class); Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); registry.registerReactiveType( - multiValue(type, () -> emptyFlow), + ReactiveTypeDescriptor.multiValue(type, () -> emptyFlow), source -> (Publisher) ReflectionUtils.invokeMethod(toFluxMethod, null, source), publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) ); @@ -299,9 +300,10 @@ public class ReactiveAdapterRegistry { /** - * Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as - * {@link Flux} or {@link Mono} depending on the underlying reactive type's - * stream semantics. + * ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or + * {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}. + * This is important in places where only the stream and stream element type + * information is available like encoders and decoders. */ private static class ReactorAdapter extends ReactiveAdapter { diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java index 5749b7b52fd..8cfb7c62a9d 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * Descriptor for a reactive type with information about its stream semantics, i.e. - * how many values it can produce. + * Describes the semantics of a reactive type including boolean checks for + * {@link #isMultiValue()}, {@link #supportsEmpty()}, and {@link #isNoValue()}. * * @author Rossen Stoyanchev * @since 5.0 @@ -55,21 +55,12 @@ public class ReactiveTypeDescriptor { /** - * Return the reactive type the descriptor was created for. + * Return the reactive type for this descriptor. */ public Class getReactiveType() { return this.reactiveType; } - /** - * Return an empty-value instance for the underlying reactive or async type. - * Use of this type implies {@link #supportsEmpty()} is true. - */ - public Object getEmptyValue() { - Assert.state(this.emptyValueSupplier != null, "Empty values not supported"); - return this.emptyValueSupplier.get(); - } - /** * Return {@code true} if the reactive type can produce more than 1 value * can be produced and is therefore a good fit to adapt to {@code Flux}. @@ -95,6 +86,15 @@ public class ReactiveTypeDescriptor { return this.noValue; } + /** + * Return an empty-value instance for the underlying reactive or async type. + * Use of this type implies {@link #supportsEmpty()} is true. + */ + public Object getEmptyValue() { + Assert.state(this.emptyValueSupplier != null, "Empty values not supported"); + return this.emptyValueSupplier.get(); + } + @Override public boolean equals(@Nullable Object other) { diff --git a/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java similarity index 83% rename from spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java rename to spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java index 162db70051b..ee5a5fd6e52 100644 --- a/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java +++ b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.core.convert.support; +package org.springframework.core; import java.time.Duration; import java.util.Arrays; @@ -32,16 +32,7 @@ import rx.Completable; import rx.Observable; import rx.Single; -import org.springframework.core.ReactiveAdapter; -import org.springframework.core.ReactiveAdapterRegistry; -import org.springframework.core.ReactiveTypeDescriptor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Unit tests for {@link ReactiveAdapterRegistry}. @@ -54,7 +45,7 @@ public class ReactiveAdapterRegistryTests { @Test - public void defaultAdapterRegistrations() throws Exception { + public void defaultAdapterRegistrations() { // Reactor assertNotNull(getAdapter(Mono.class)); @@ -80,7 +71,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void getAdapterForReactiveSubType() throws Exception { + public void getAdapterForReactiveSubType() { ReactiveAdapter adapter1 = getAdapter(Flux.class); ReactiveAdapter adapter2 = getAdapter(FluxProcessor.class); @@ -99,7 +90,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToFlux() throws Exception { + public void publisherToFlux() { List sequence = Arrays.asList(1, 2, 3); Publisher source = Flowable.fromIterable(sequence); Object target = getAdapter(Flux.class).fromPublisher(source); @@ -110,7 +101,7 @@ public class ReactiveAdapterRegistryTests { // TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)? @Test - public void publisherToMono() throws Exception { + public void publisherToMono() { Publisher source = Flowable.fromArray(1, 2, 3); Object target = getAdapter(Mono.class).fromPublisher(source); assertTrue(target instanceof Mono); @@ -126,7 +117,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToRxObservable() throws Exception { + public void publisherToRxObservable() { List sequence = Arrays.asList(1, 2, 3); Publisher source = Flowable.fromIterable(sequence); Object target = getAdapter(rx.Observable.class).fromPublisher(source); @@ -135,7 +126,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToRxSingle() throws Exception { + public void publisherToRxSingle() { Publisher source = Flowable.fromArray(1); Object target = getAdapter(rx.Single.class).fromPublisher(source); assertTrue(target instanceof rx.Single); @@ -143,7 +134,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToRxCompletable() throws Exception { + public void publisherToRxCompletable() { Publisher source = Flowable.fromArray(1, 2, 3); Object target = getAdapter(rx.Completable.class).fromPublisher(source); assertTrue(target instanceof rx.Completable); @@ -151,7 +142,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToReactivexFlowable() throws Exception { + public void publisherToReactivexFlowable() { List sequence = Arrays.asList(1, 2, 3); Publisher source = Flux.fromIterable(sequence); Object target = getAdapter(io.reactivex.Flowable.class).fromPublisher(source); @@ -160,7 +151,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToReactivexObservable() throws Exception { + public void publisherToReactivexObservable() { List sequence = Arrays.asList(1, 2, 3); Publisher source = Flowable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source); @@ -169,7 +160,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToReactivexSingle() throws Exception { + public void publisherToReactivexSingle() { Publisher source = Flowable.fromArray(1); Object target = getAdapter(io.reactivex.Single.class).fromPublisher(source); assertTrue(target instanceof io.reactivex.Single); @@ -177,7 +168,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void publisherToReactivexCompletable() throws Exception { + public void publisherToReactivexCompletable() { Publisher source = Flowable.fromArray(1, 2, 3); Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source); assertTrue(target instanceof io.reactivex.Completable); @@ -185,7 +176,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void rxObservableToPublisher() throws Exception { + public void rxObservableToPublisher() { List sequence = Arrays.asList(1, 2, 3); Object source = rx.Observable.from(sequence); Object target = getAdapter(rx.Observable.class).toPublisher(source); @@ -194,7 +185,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void rxSingleToPublisher() throws Exception { + public void rxSingleToPublisher() { Object source = rx.Single.just(1); Object target = getAdapter(rx.Single.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); @@ -202,7 +193,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void rxCompletableToPublisher() throws Exception { + public void rxCompletableToPublisher() { Object source = rx.Completable.complete(); Object target = getAdapter(rx.Completable.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); @@ -210,7 +201,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void reactivexFlowableToPublisher() throws Exception { + public void reactivexFlowableToPublisher() { List sequence = Arrays.asList(1, 2, 3); Object source = io.reactivex.Flowable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source); @@ -219,7 +210,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void reactivexObservableToPublisher() throws Exception { + public void reactivexObservableToPublisher() { List sequence = Arrays.asList(1, 2, 3); Object source = io.reactivex.Observable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source); @@ -228,7 +219,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void reactivexSingleToPublisher() throws Exception { + public void reactivexSingleToPublisher() { Object source = io.reactivex.Single.just(1); Object target = getAdapter(io.reactivex.Single.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); @@ -236,7 +227,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void reactivexCompletableToPublisher() throws Exception { + public void reactivexCompletableToPublisher() { Object source = io.reactivex.Completable.complete(); Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); @@ -244,7 +235,7 @@ public class ReactiveAdapterRegistryTests { } @Test - public void CompletableFutureToPublisher() throws Exception { + public void CompletableFutureToPublisher() { CompletableFuture future = new CompletableFuture<>(); future.complete(1); Object target = getAdapter(CompletableFuture.class).toPublisher(future);