parent
8f33450df2
commit
1dc128361f
|
@ -68,6 +68,7 @@ configure(allprojects) { project ->
|
||||||
dependency "io.reactivex:rxjava-reactive-streams:1.2.1"
|
dependency "io.reactivex:rxjava-reactive-streams:1.2.1"
|
||||||
dependency "io.reactivex.rxjava2:rxjava:2.2.21"
|
dependency "io.reactivex.rxjava2:rxjava:2.2.21"
|
||||||
dependency "io.reactivex.rxjava3:rxjava:3.1.1"
|
dependency "io.reactivex.rxjava3:rxjava:3.1.1"
|
||||||
|
dependency "io.smallrye.reactive:mutiny:1.0.0"
|
||||||
dependency "io.projectreactor.tools:blockhound:1.0.4.RELEASE"
|
dependency "io.projectreactor.tools:blockhound:1.0.4.RELEASE"
|
||||||
|
|
||||||
dependency "com.caucho:hessian:4.0.63"
|
dependency "com.caucho:hessian:4.0.63"
|
||||||
|
|
|
@ -49,6 +49,7 @@ dependencies {
|
||||||
optional("io.reactivex:rxjava-reactive-streams")
|
optional("io.reactivex:rxjava-reactive-streams")
|
||||||
optional("io.reactivex.rxjava2:rxjava")
|
optional("io.reactivex.rxjava2:rxjava")
|
||||||
optional("io.reactivex.rxjava3:rxjava")
|
optional("io.reactivex.rxjava3:rxjava")
|
||||||
|
optional("io.smallrye.reactive:mutiny")
|
||||||
optional("io.netty:netty-buffer")
|
optional("io.netty:netty-buffer")
|
||||||
testCompile("io.projectreactor:reactor-test")
|
testCompile("io.projectreactor:reactor-test")
|
||||||
testCompile("com.google.code.findbugs:jsr305")
|
testCompile("com.google.code.findbugs:jsr305")
|
||||||
|
|
|
@ -24,6 +24,8 @@ import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import io.smallrye.mutiny.Multi;
|
||||||
|
import io.smallrye.mutiny.Uni;
|
||||||
import kotlinx.coroutines.CompletableDeferredKt;
|
import kotlinx.coroutines.CompletableDeferredKt;
|
||||||
import kotlinx.coroutines.Deferred;
|
import kotlinx.coroutines.Deferred;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
@ -72,6 +74,8 @@ public class ReactiveAdapterRegistry {
|
||||||
|
|
||||||
private static final boolean kotlinCoroutinesPresent;
|
private static final boolean kotlinCoroutinesPresent;
|
||||||
|
|
||||||
|
private static final boolean mutinyPresent;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
|
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
|
||||||
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
|
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
|
||||||
|
@ -81,6 +85,7 @@ public class ReactiveAdapterRegistry {
|
||||||
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
|
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
|
||||||
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader);
|
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader);
|
||||||
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
|
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
|
||||||
|
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Uni", classLoader);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final List<ReactiveAdapter> adapters = new ArrayList<>();
|
private final List<ReactiveAdapter> adapters = new ArrayList<>();
|
||||||
|
@ -121,6 +126,11 @@ public class ReactiveAdapterRegistry {
|
||||||
if (reactorPresent && kotlinCoroutinesPresent) {
|
if (reactorPresent && kotlinCoroutinesPresent) {
|
||||||
new CoroutinesRegistrar().registerAdapters(this);
|
new CoroutinesRegistrar().registerAdapters(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SmallRye Mutiny
|
||||||
|
if (mutinyPresent) {
|
||||||
|
new MutinyRegistrar().registerAdapters(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -417,6 +427,23 @@ public class ReactiveAdapterRegistry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class MutinyRegistrar {
|
||||||
|
|
||||||
|
void registerAdapters(ReactiveAdapterRegistry registry) {
|
||||||
|
registry.registerReactiveType(
|
||||||
|
ReactiveTypeDescriptor.singleOptionalValue(Uni.class, ()-> Uni.createFrom().nothing()),
|
||||||
|
uni ->((Uni<?>)uni).convert().toPublisher(),
|
||||||
|
publisher -> Uni.createFrom().publisher(publisher)
|
||||||
|
);
|
||||||
|
|
||||||
|
registry.registerReactiveType(
|
||||||
|
ReactiveTypeDescriptor.multiValue(Multi.class, ()-> Multi.createFrom().empty()),
|
||||||
|
multi -> (Multi<?>) multi,
|
||||||
|
publisher-> Multi.createFrom().publisher(publisher)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@code BlockHoundIntegration} for spring-core classes.
|
* {@code BlockHoundIntegration} for spring-core classes.
|
||||||
|
|
|
@ -21,6 +21,8 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import io.smallrye.mutiny.Multi;
|
||||||
|
import io.smallrye.mutiny.Uni;
|
||||||
import kotlinx.coroutines.Deferred;
|
import kotlinx.coroutines.Deferred;
|
||||||
import org.junit.jupiter.api.Nested;
|
import org.junit.jupiter.api.Nested;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -357,6 +359,52 @@ class ReactiveAdapterRegistryTests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SmallRye Mutiny
|
||||||
|
@Nested
|
||||||
|
class Mutiny {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void defaultAdapterRegistrations() {
|
||||||
|
assertThat(getAdapter(io.smallrye.mutiny.Uni.class)).isNotNull();
|
||||||
|
assertThat(getAdapter(io.smallrye.mutiny.Multi.class)).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void toUni() {
|
||||||
|
Publisher<Integer> source = Mono.just(1);
|
||||||
|
Object target = getAdapter(Uni.class).fromPublisher(source);
|
||||||
|
assertThat(target).isInstanceOf(Uni.class);
|
||||||
|
assertThat(((Uni<Integer>) target).await().atMost(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void fromUni() {
|
||||||
|
Uni<Integer> source = Uni.createFrom().item(1);
|
||||||
|
Object target = getAdapter(Uni.class).toPublisher(source);
|
||||||
|
assertThat(target).isInstanceOf(Mono.class);
|
||||||
|
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void toMulti() {
|
||||||
|
List<Integer> sequence = Arrays.asList(1, 2, 3);
|
||||||
|
Publisher<Integer> source = Flux.fromIterable(sequence);
|
||||||
|
Object target = getAdapter(Multi.class).fromPublisher(source);
|
||||||
|
assertThat(target).isInstanceOf(Multi.class);
|
||||||
|
assertThat(((Multi<Integer>) target).collect().asList().await().atMost(Duration.ofMillis(1000))).isEqualTo(sequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void fromMulti() {
|
||||||
|
List<Integer> sequence = Arrays.asList(1, 2, 3);
|
||||||
|
Multi<Integer> source = Multi.createFrom().iterable(sequence);
|
||||||
|
Object target = getAdapter(Multi.class).toPublisher(source);
|
||||||
|
assertThat(target).isInstanceOf(Flux.class);
|
||||||
|
assertThat(((Flux<Integer>) target).blockLast(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
|
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
|
||||||
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
|
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
|
||||||
assertThat(adapter).isNotNull();
|
assertThat(adapter).isNotNull();
|
||||||
|
|
Loading…
Reference in New Issue