Add support for Deferred to ReactiveAdapterRegistry

See gh-19975
This commit is contained in:
Sebastien Deleuze 2019-03-15 18:22:19 +01:00
parent 1c9cbaf399
commit 1382220021
5 changed files with 107 additions and 3 deletions

View File

@ -78,6 +78,8 @@ dependencies {
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}") optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}")
optional("io.netty:netty-buffer") optional("io.netty:netty-buffer")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
testCompile("io.projectreactor:reactor-test") testCompile("io.projectreactor:reactor-test")
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.xmlunit:xmlunit-matchers:2.6.2") testCompile("org.xmlunit:xmlunit-matchers:2.6.2")

View File

@ -40,8 +40,8 @@ import org.springframework.util.ReflectionUtils;
* {@code Observable}, and others. * {@code Observable}, and others.
* *
* <p>By default, depending on classpath availability, adapters are registered * <p>By default, depending on classpath availability, adapters are registered
* for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, and Java 9+ * for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, Java 9+
* Flow.Publisher. * {@code Flow.Publisher} and Kotlin Coroutines {@code Deferred}.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Sebastien Deleuze * @author Sebastien Deleuze
@ -90,6 +90,11 @@ public class ReactiveAdapterRegistry {
} }
// If not present, do nothing for the time being... // If not present, do nothing for the time being...
// We can fall back on "reactive-streams-flow-bridge" (once released) // We can fall back on "reactive-streams-flow-bridge" (once released)
// Coroutines
if (ClassUtils.isPresent("kotlinx.coroutines.Deferred", classLoader)) {
CoroutinesRegistrarKt.registerAdapter(this);
}
} }

View File

@ -0,0 +1,38 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.mono
import reactor.core.publisher.toMono
/**
* Register Reactive adapters for Coroutines types.
*
* @author Sebastien Deleuze
* @since 5.2
*/
internal fun registerAdapter(registry: ReactiveAdapterRegistry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Deferred::class.java) { GlobalScope.async {} },
{ source -> GlobalScope.mono { (source as Deferred<*>).await() }},
{ source -> GlobalScope.async { source.toMono().awaitFirstOrNull() } }
)
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.Maybe; import io.reactivex.Maybe;
import kotlinx.coroutines.Deferred;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -68,6 +69,9 @@ public class ReactiveAdapterRegistryTests {
assertNotNull(getAdapter(io.reactivex.Single.class)); assertNotNull(getAdapter(io.reactivex.Single.class));
assertNotNull(getAdapter(Maybe.class)); assertNotNull(getAdapter(Maybe.class));
assertNotNull(getAdapter(io.reactivex.Completable.class)); assertNotNull(getAdapter(io.reactivex.Completable.class));
// Coroutines
assertNotNull(getAdapter(Deferred.class));
} }
@Test @Test

View File

@ -0,0 +1,55 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import java.time.Duration
import kotlin.reflect.KClass
class KotlinReactiveAdapterRegistryTests {
private val registry = ReactiveAdapterRegistry.getSharedInstance()
@Test
fun deferredToPublisher() {
val source = GlobalScope.async { 1 }
val target: Publisher<Int> = getAdapter(Deferred::class).toPublisher(source)
assertTrue("Expected Mono Publisher: " + target.javaClass.name, target is Mono<*>)
assertEquals(1, (target as Mono<Int>).block(Duration.ofMillis(1000)))
}
@Test
fun publisherToDeferred() {
val source = Mono.just(1)
val target = getAdapter(Deferred::class).fromPublisher(source)
assertTrue(target is Deferred<*>)
assertEquals(1, runBlocking { (target as Deferred<*>).await() })
}
private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter {
return this.registry.getAdapter(reactiveType.java)!!
}
}