From 4d10249b70cc6aab3baa62eb9dbf288f397d89f8 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Wed, 14 Aug 2019 11:07:11 +0200 Subject: [PATCH] Update to Coroutine 1.3.0-RC2 This updates brings full interoperability between Reactor and Coroutines contexts. Closes gh-22986 --- build.gradle | 2 +- .../org/springframework/core/CoroutinesUtils.kt | 4 ++-- .../springframework/core/ReactiveAdapterRegistry.java | 11 ++++------- .../messaging/rsocket/RSocketRequesterExtensions.kt | 2 +- .../function/client/ClientResponseExtensions.kt | 2 +- .../reactive/function/client/WebClientExtensions.kt | 2 +- .../function/server/ServerRequestExtensions.kt | 2 +- 7 files changed, 11 insertions(+), 14 deletions(-) diff --git a/build.gradle b/build.gradle index 4004dc1a289..0905dec3690 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { } aspectjVersion = "1.9.4" - coroutinesVersion = "1.3.0-RC" + coroutinesVersion = "1.3.0-RC2" freemarkerVersion = "2.3.28" groovyVersion = "2.5.7" hsqldbVersion = "2.5.0" diff --git a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt index 75f39216807..c5cc85c9d5f 100644 --- a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt +++ b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt @@ -24,7 +24,7 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull -import kotlinx.coroutines.reactive.flow.asPublisher +import kotlinx.coroutines.reactor.asFlux import kotlinx.coroutines.reactor.mono import reactor.core.publisher.Mono @@ -68,7 +68,7 @@ internal fun invokeHandlerMethod(method: Method, bean: Any, vararg args: Any?): .let { if (it == Unit) null else it } }.onErrorMap(InvocationTargetException::class.java) { it.targetException } if (function.returnType.classifier == Flow::class) { - mono.flatMapMany { (it as Flow).asPublisher() } + mono.flatMapMany { (it as Flow).asFlux() } } else { mono diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index ebec12833b0..d73881869f7 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -28,9 +28,6 @@ import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import kotlinx.coroutines.CompletableDeferredKt; import kotlinx.coroutines.Deferred; -import kotlinx.coroutines.flow.FlowKt; -import kotlinx.coroutines.reactive.flow.FlowAsPublisherKt; -import kotlinx.coroutines.reactive.flow.PublisherAsFlowKt; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -97,7 +94,7 @@ public class ReactiveAdapterRegistry { // We can fall back on "reactive-streams-flow-bridge" (once released) // Coroutines - if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader) && ClassUtils.isPresent("kotlinx.coroutines.reactive.flow.PublisherAsFlowKt", classLoader)) { + if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader)) { new CoroutinesRegistrar().registerAdapters(this); } } @@ -351,9 +348,9 @@ public class ReactiveAdapterRegistry { source -> CoroutinesUtils.monoToDeferred(Mono.from(source))); registry.registerReactiveType( - ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, FlowKt::emptyFlow), - source -> FlowAsPublisherKt.from((kotlinx.coroutines.flow.Flow) source), - PublisherAsFlowKt::from + ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow), + source -> kotlinx.coroutines.reactor.FlowKt.asFlux((kotlinx.coroutines.flow.Flow) source), + kotlinx.coroutines.reactive.FlowKt::asFlow ); } } diff --git a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt index c60533743c9..069f227da6d 100644 --- a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt +++ b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt @@ -21,7 +21,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.flow.asFlow +import kotlinx.coroutines.reactive.asFlow import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import reactor.core.publisher.Flux diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/ClientResponseExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/ClientResponseExtensions.kt index 49d7ff32c83..21c06d09647 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/ClientResponseExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/ClientResponseExtensions.kt @@ -20,7 +20,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.flow.asFlow +import kotlinx.coroutines.reactive.asFlow import org.springframework.core.ParameterizedTypeReference import org.springframework.http.ResponseEntity import reactor.core.publisher.Flux diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt index 21b10a7197c..399da10c719 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt @@ -19,7 +19,7 @@ package org.springframework.web.reactive.function.client import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.flow.asFlow +import kotlinx.coroutines.reactive.asFlow import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/ServerRequestExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/ServerRequestExtensions.kt index e17a2dd204e..bd98243b97e 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/ServerRequestExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/ServerRequestExtensions.kt @@ -20,7 +20,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.flow.asFlow +import kotlinx.coroutines.reactive.asFlow import org.springframework.core.ParameterizedTypeReference import org.springframework.http.codec.multipart.Part import org.springframework.util.MultiValueMap