diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index ee0c58ef854..7ae3e92fa0f 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -16,6 +16,7 @@ package org.springframework.core; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -29,8 +30,13 @@ import reactor.core.publisher.Mono; import rx.RxReactiveStreams; import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; -import static org.springframework.core.ReactiveTypeDescriptor.*; +import static org.springframework.core.ReactiveTypeDescriptor.multiValue; +import static org.springframework.core.ReactiveTypeDescriptor.noValue; +import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue; +import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue; /** * A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from @@ -38,7 +44,8 @@ import static org.springframework.core.ReactiveTypeDescriptor.*; * {@code Observable}, and others. * *

By default, depending on classpath availability, adapters are registered - * for Reactor, RxJava 1, RxJava 2 types, and {@link CompletableFuture}. + * for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, and Java 9+ + * Flow.Publisher. * * @author Rossen Stoyanchev * @author Sebastien Deleuze @@ -82,6 +89,18 @@ public class ReactiveAdapterRegistry { catch (Throwable ex) { // Ignore } + + // Java 9+ Flow.Publisher + try { + new ReactorJdkFlowAdapterRegistrar().registerAdapter(this); + } + catch (NoSuchMethodException ex) { + throw new IllegalStateException("Failed to find JdkFlowAdapter methods", ex); + } + catch (Throwable ex) { + // Ignore + // We can fall back on "reactive-streams-flow-bridge" (once released) + } } @@ -232,6 +251,34 @@ public class ReactiveAdapterRegistry { } + private static class ReactorJdkFlowAdapterRegistrar { + + // TODO: remove reflection when build requires JDK 9+ + + void registerAdapter(ReactiveAdapterRegistry registry) + throws NoSuchMethodException, ClassNotFoundException { + + String name = "java.util.concurrent.Flow.Publisher"; + Class type = ClassUtils.forName(name, getClass().getClassLoader()); + + Method toFlowMethod = getMethod("publisherToFlowPublisher", Publisher.class); + Method toFluxMethod = getMethod("flowPublisherToFlux", type); + + Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); + + registry.registerReactiveType( + multiValue(type, () -> emptyFlow), + source -> (Publisher) ReflectionUtils.invokeMethod(toFluxMethod, null, source), + publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) + ); + } + + private static Method getMethod(String name, Class argumentType) throws NoSuchMethodException { + return reactor.adapter.JdkFlowAdapter.class.getMethod(name, argumentType); + } + } + + /** * Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as * {@link Flux} or {@link Mono} depending on the underlying reactive type's