Switch to Reactor 3 snapshots and Netty 4.1.3

This commit is contained in:
Rossen Stoyanchev 2016-07-15 17:15:33 -04:00
parent dc1664939c
commit 028be2a298
50 changed files with 113 additions and 123 deletions

View File

@ -61,13 +61,13 @@ configure(allprojects) { project ->
ext.junitJupiterVersion = '5.0.0-M1'
ext.junitPlatformVersion = '1.0.0-M1'
ext.log4jVersion = '2.6.1'
ext.nettyVersion = "4.1.1.Final"
ext.nettyVersion = "4.1.3.Final"
ext.okhttpVersion = "2.7.5"
ext.okhttp3Version = "3.3.1"
ext.poiVersion = "3.14"
ext.reactorVersion = "2.0.8.RELEASE"
ext.reactorCoreVersion = '2.5.0.BUILD-SNAPSHOT'
ext.reactorNettyVersion = '2.5.0.BUILD-SNAPSHOT'
ext.reactorCoreVersion = '3.0.0.BUILD-SNAPSHOT'
ext.reactorNettyVersion = '0.5.0.BUILD-SNAPSHOT'
ext.rxJavaVersion = '1.1.6'
ext.romeVersion = "1.6.0"
ext.servletVersion = "3.1.0"

View File

@ -20,9 +20,7 @@ import java.util.LinkedHashSet;
import java.util.Set;
import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1CompletableConverter;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
@ -57,22 +55,22 @@ public final class ReactorToRxJava1Converter implements GenericConverter {
return null;
}
if (Observable.class.isAssignableFrom(sourceType.getType())) {
return RxJava1ObservableConverter.toPublisher((Observable<?>) source);
return RxJava1Adapter.observableToFlux((Observable<?>) source);
}
else if (Observable.class.isAssignableFrom(targetType.getType())) {
return RxJava1ObservableConverter.fromPublisher((Publisher<?>) source);
return RxJava1Adapter.publisherToObservable((Publisher<?>) source);
}
else if (Single.class.isAssignableFrom(sourceType.getType())) {
return RxJava1SingleConverter.toPublisher((Single<?>) source);
return RxJava1Adapter.singleToMono((Single<?>) source);
}
else if (Single.class.isAssignableFrom(targetType.getType())) {
return RxJava1SingleConverter.fromPublisher((Publisher<?>) source);
return RxJava1Adapter.publisherToSingle((Publisher<?>) source);
}
else if (Completable.class.isAssignableFrom(sourceType.getType())) {
return RxJava1CompletableConverter.toPublisher((Completable) source);
return RxJava1Adapter.completableToMono((Completable) source);
}
else if (Completable.class.isAssignableFrom(targetType.getType())) {
return RxJava1CompletableConverter.fromPublisher((Publisher<?>) source);
return RxJava1Adapter.publisherToCompletable((Publisher<?>) source);
}
return null;
}

View File

@ -27,7 +27,7 @@ import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.subscriber.SignalEmitter;
import reactor.core.publisher.SynchronousSink;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -149,7 +149,7 @@ public abstract class DataBufferUtils {
}
private static class ReadableByteChannelGenerator
implements BiFunction<ReadableByteChannel, SignalEmitter<DataBuffer>,
implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>,
ReadableByteChannel> {
private final DataBufferFactory dataBufferFactory;
@ -164,7 +164,7 @@ public abstract class DataBufferUtils {
@Override
public ReadableByteChannel apply(ReadableByteChannel
channel, SignalEmitter<DataBuffer> sub) {
channel, SynchronousSink<DataBuffer> sub) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize);
int read;

View File

@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -23,7 +23,7 @@ import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -20,7 +20,7 @@ import java.io.IOException;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;

View File

@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;

View File

@ -19,7 +19,7 @@ package org.springframework.core.codec;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -23,7 +23,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -24,7 +24,7 @@ import java.nio.file.StandardOpenOption;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;

View File

@ -20,8 +20,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import reactor.core.converter.Converters;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.context.ApplicationContext;
@ -82,8 +80,9 @@ public class WebReactiveConfiguration implements ApplicationContextAware {
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
private static final boolean jaxb2Present =
ClassUtils.isPresent("javax.xml.bind.Binder", classLoader);
private static final boolean jaxb2Present = ClassUtils.isPresent("javax.xml.bind.Binder", classLoader);
private static final boolean rxJava1Present = ClassUtils.isPresent("rx.Observable", classLoader);
private PathMatchConfigurer pathMatchConfigurer;
@ -289,7 +288,7 @@ public class WebReactiveConfiguration implements ApplicationContextAware {
*/
protected void addFormatters(FormatterRegistry registry) {
registry.addConverter(new MonoToCompletableFutureConverter());
if (Converters.hasRxJava1()) {
if (rxJava1Present) {
registry.addConverter(new ReactorToRxJava1Converter());
}
}

View File

@ -24,7 +24,7 @@ import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;

View File

@ -21,7 +21,7 @@ import java.time.Duration;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;

View File

@ -27,7 +27,7 @@ import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.MockServerHttpRequest;

View File

@ -22,7 +22,7 @@ import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.MockServerHttpRequest;

View File

@ -30,7 +30,7 @@ import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;

View File

@ -22,7 +22,7 @@ import java.net.URI;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.MethodParameter;

View File

@ -25,11 +25,10 @@ import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.Test;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import rx.Observable;
import rx.Single;
@ -156,7 +155,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(Single.class, String.class));
HttpEntity<Single<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava1SingleConverter.toPublisher(entity.getBody()))
TestSubscriber.subscribe(RxJava1Adapter.singleToMono(entity.getBody()))
.assertNoValues()
.assertError(ServerWebInputException.class);
}
@ -166,7 +165,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class));
HttpEntity<Observable<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava1ObservableConverter.toPublisher(entity.getBody()))
TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(entity.getBody()))
.assertNoError()
.assertComplete()
.assertNoValues();

View File

@ -34,7 +34,7 @@ import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import rx.Observable;
import rx.Single;

View File

@ -33,7 +33,7 @@ import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import rx.Observable;
import org.springframework.core.MethodParameter;

View File

@ -24,7 +24,7 @@ import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService;

View File

@ -22,7 +22,7 @@ import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.DefaultParameterNameDiscoverer;

View File

@ -27,11 +27,10 @@ import java.util.function.Predicate;
import org.junit.Before;
import org.junit.Test;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import rx.Observable;
import rx.Single;
@ -166,12 +165,12 @@ public class RequestBodyArgumentResolverTests {
ResolvableType type = forClassWithGenerics(Single.class, String.class);
Single<String> single = resolveValueWithEmptyBody(type, true);
TestSubscriber.subscribe(RxJava1SingleConverter.toPublisher(single))
TestSubscriber.subscribe(RxJava1Adapter.singleToMono(single))
.assertNoValues()
.assertError(ServerWebInputException.class);
single = resolveValueWithEmptyBody(type, false);
TestSubscriber.subscribe(RxJava1SingleConverter.toPublisher(single))
TestSubscriber.subscribe(RxJava1Adapter.singleToMono(single))
.assertNoValues()
.assertError(ServerWebInputException.class);
}
@ -181,12 +180,12 @@ public class RequestBodyArgumentResolverTests {
ResolvableType type = forClassWithGenerics(Observable.class, String.class);
Observable<String> observable = resolveValueWithEmptyBody(type, true);
TestSubscriber.subscribe(RxJava1ObservableConverter.toPublisher(observable))
TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(observable))
.assertNoValues()
.assertError(ServerWebInputException.class);
observable = resolveValueWithEmptyBody(type, false);
TestSubscriber.subscribe(RxJava1ObservableConverter.toPublisher(observable))
TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(observable))
.assertNoValues()
.assertComplete();
}

View File

@ -27,7 +27,7 @@ import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.MethodParameter;

View File

@ -25,7 +25,7 @@ import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.MethodParameter;

View File

@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import rx.Single;
import org.springframework.core.MethodParameter;

View File

@ -22,7 +22,7 @@ import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.DefaultParameterNameDiscoverer;

View File

@ -28,7 +28,7 @@ import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;

View File

@ -29,7 +29,7 @@ import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.MethodParameter;
import org.springframework.core.codec.StringEncoder;

View File

@ -32,7 +32,7 @@ import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import rx.Single;
import org.springframework.core.MethodParameter;

View File

@ -25,7 +25,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.context.ApplicationContextException;
import org.springframework.context.support.GenericApplicationContext;

View File

@ -28,7 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.util.BackpressureUtils;
import reactor.core.publisher.Operators;
import org.springframework.core.io.buffer.DataBuffer;
@ -106,7 +106,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
while (hasDemand()) {
DataBuffer dataBuffer = read();
if (dataBuffer != null) {
BackpressureUtils.getAndSub(this.demand, 1L);
Operators.getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
}
else {
@ -214,8 +214,8 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
NO_DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (BackpressureUtils.checkRequest(n, publisher.subscriber)) {
BackpressureUtils.addAndGet(publisher.demand, n);
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
if (publisher.changeState(this, DEMAND)) {
publisher.checkOnDataAvailable();
}
@ -249,8 +249,8 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
READING {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (BackpressureUtils.checkRequest(n, publisher.subscriber)) {
BackpressureUtils.addAndGet(publisher.demand, n);
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
}
}
},

View File

@ -21,8 +21,8 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.MonoSource;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.util.EmptySubscription;
import reactor.core.publisher.OperatorAdapter;
import reactor.core.publisher.Operators;
import org.springframework.util.Assert;
@ -54,7 +54,7 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
source.subscribe(new WriteWithBarrier(s));
}
private class WriteWithBarrier extends SubscriberBarrier<T, Void> implements Publisher<T> {
private class WriteWithBarrier extends OperatorAdapter<T, Void> implements Publisher<T> {
/**
* We've at at least one emission, we've called the write function, the write
@ -161,7 +161,7 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
this.writeSubscriber = writeSubscriber;
if (this.error != null || this.completed) {
this.writeSubscriber.onSubscribe(EmptySubscription.INSTANCE);
this.writeSubscriber.onSubscribe(Operators.emptySubscription());
emitCachedSignals();
}
else {

View File

@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.util.BackpressureUtils;
import reactor.core.publisher.Operators;
/**
* Publisher returned from {@link ServerHttpResponse#writeWith(Publisher)}.
@ -145,7 +145,7 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
SUBSCRIBED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
BackpressureUtils.checkRequest(n, publisher.subscriber);
Operators.checkRequest(n, publisher.subscriber);
}
@Override

View File

@ -21,7 +21,7 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.adapter.RxJava1Adapter;
import rx.Observable;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -50,7 +50,7 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
RxNettyServerHttpResponse adaptedResponse =
new RxNettyServerHttpResponse(response, dataBufferFactory);
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
return RxJava1ObservableConverter.fromPublisher(result);
return RxJava1Adapter.publisherToObservable(result);
}
}

View File

@ -22,7 +22,7 @@ import java.net.URISyntaxException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.cookie.Cookie;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import rx.Observable;
@ -95,7 +95,7 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
@Override
public Flux<DataBuffer> getBody() {
Observable<DataBuffer> content = this.request.getContent().map(dataBufferFactory::wrap);
return RxJava1ObservableConverter.toPublisher(content);
return RxJava1Adapter.observableToFlux(content);
}
}

View File

@ -23,8 +23,9 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.ResponseContentWriter;
import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Mono;
import rx.Observable;
@ -72,12 +73,15 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
@Override
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
Observable<ByteBuf> content = RxJava1ObservableConverter.fromPublisher(body).map(this::toByteBuf);
return RxJava1ObservableConverter.toPublisher(this.response.write(content, bb -> bb instanceof FlushingByteBuf)).then();
Observable<ByteBuf> content = RxJava1Adapter.publisherToObservable(body).map(this::toByteBuf);
ResponseContentWriter<ByteBuf> writer = this.response.write(content, bb -> bb instanceof FlushingByteBuf);
return RxJava1Adapter.observableToFlux(writer).then();
}
private ByteBuf toByteBuf(DataBuffer buffer) {
ByteBuf byteBuf = (buffer instanceof NettyDataBuffer ? ((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer()));
ByteBuf byteBuf = (buffer instanceof NettyDataBuffer ?
((NettyDataBuffer) buffer).getNativeBuffer() :
Unpooled.wrappedBuffer(buffer.asByteBuffer()));
return (buffer instanceof FlushingDataBuffer ? new FlushingByteBuf(byteBuf) : byteBuf);
}

View File

@ -18,8 +18,7 @@ package org.springframework.web.client.reactive.support;
import java.net.URI;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
@ -149,7 +148,7 @@ public class RxJava1ClientWebRequestBuilder implements ClientWebRequestBuilder {
* as type information for the element published by this reactive stream
*/
public RxJava1ClientWebRequestBuilder body(Single<?> content, ResolvableType elementType) {
this.delegate.body(RxJava1SingleConverter.toPublisher(content), elementType);
this.delegate.body(RxJava1Adapter.singleToMono(content), elementType);
return this;
}
@ -158,7 +157,7 @@ public class RxJava1ClientWebRequestBuilder implements ClientWebRequestBuilder {
* as type information for the elements published by this reactive stream
*/
public RxJava1ClientWebRequestBuilder body(Observable<?> content, ResolvableType elementType) {
this.delegate.body(RxJava1ObservableConverter.toPublisher(content), elementType);
this.delegate.body(RxJava1Adapter.observableToFlux(content), elementType);
return this;
}

View File

@ -19,6 +19,12 @@ package org.springframework.web.client.reactive.support;
import java.util.List;
import java.util.Optional;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@ -27,13 +33,6 @@ import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import org.springframework.web.client.reactive.ResponseExtractor;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
/**
* Static factory methods for {@link ResponseExtractor}
* based on the {@link Observable} and {@link Single} APIs.
@ -50,8 +49,8 @@ public class RxJava1ResponseExtractors {
public static <T> ResponseExtractor<Single<T>> body(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return (clientResponse, messageConverters) -> (Single<T>) RxJava1SingleConverter
.fromPublisher(clientResponse
return (clientResponse, messageConverters) -> (Single<T>) RxJava1Adapter
.publisherToSingle(clientResponse
.flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)).next());
}
@ -61,8 +60,8 @@ public class RxJava1ResponseExtractors {
public static <T> ResponseExtractor<Observable<T>> bodyStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return (clientResponse, messageConverters) -> RxJava1ObservableConverter
.fromPublisher(clientResponse
return (clientResponse, messageConverters) -> RxJava1Adapter
.publisherToObservable(clientResponse
.flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)));
}
@ -75,7 +74,7 @@ public class RxJava1ResponseExtractors {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return (clientResponse, messageConverters) ->
RxJava1SingleConverter.fromPublisher(clientResponse
RxJava1Adapter.publisherToSingle(clientResponse
.then(response ->
Mono.when(
decodeResponseBody(response, resolvableType, messageConverters).next(),
@ -91,9 +90,9 @@ public class RxJava1ResponseExtractors {
*/
public static <T> ResponseExtractor<Single<ResponseEntity<Observable<T>>>> responseStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return (clientResponse, messageConverters) -> RxJava1SingleConverter.fromPublisher(
return (clientResponse, messageConverters) -> RxJava1Adapter.publisherToSingle(
clientResponse.map(response -> new ResponseEntity<>(
RxJava1ObservableConverter.fromPublisher(
RxJava1Adapter.publisherToObservable(
RxJava1ResponseExtractors.<T> decodeResponseBody(response, resolvableType, messageConverters)),
response.getHeaders(),
response.getStatusCode())));
@ -103,8 +102,8 @@ public class RxJava1ResponseExtractors {
* Extract the response headers as an {@code HttpHeaders} instance.
*/
public static ResponseExtractor<Single<HttpHeaders>> headers() {
return (clientResponse, messageConverters) -> RxJava1SingleConverter
.fromPublisher(clientResponse.map(resp -> resp.getHeaders()));
return (clientResponse, messageConverters) -> RxJava1Adapter
.publisherToSingle(clientResponse.map(resp -> resp.getHeaders()));
}
@SuppressWarnings("unchecked")

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.*;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -23,7 +23,7 @@ import java.util.List;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;

View File

@ -22,7 +22,7 @@ import javax.xml.stream.events.XMLEvent;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -23,7 +23,7 @@ import org.junit.Before;
import org.junit.Test;
import org.xml.sax.SAXException;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -20,7 +20,7 @@ import javax.xml.stream.events.XMLEvent;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;

View File

@ -29,10 +29,13 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.OperatorAdapter;
import reactor.core.publisher.Signal;
import reactor.core.subscriber.SubscriberBarrier;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
/**
* @author Rossen Stoyanchev
@ -145,7 +148,7 @@ public class ChannelSendOperatorTests {
};
}
private class WriteSubscriber extends SubscriberBarrier<String, Void> {
private class WriteSubscriber extends OperatorAdapter<String, Void> {
public WriteSubscriber(Subscriber<? super Void> subscriber) {
super(subscriber);

View File

@ -21,7 +21,7 @@ import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer;

View File

@ -16,8 +16,7 @@
package org.springframework.http.server.reactive.bootstrap;
import reactor.core.flow.Loopback;
import reactor.core.state.Completable;
import reactor.core.Loopback;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.util.Assert;
@ -25,8 +24,7 @@ import org.springframework.util.Assert;
/**
* @author Stephane Maldini
*/
public class ReactorHttpServer extends HttpServerSupport
implements HttpServer, Loopback, Completable {
public class ReactorHttpServer extends HttpServerSupport implements HttpServer, Loopback {
private ReactorHttpHandlerAdapter reactorHandler;
@ -34,6 +32,7 @@ public class ReactorHttpServer extends HttpServerSupport
private boolean running;
@Override
public void afterPropertiesSet() throws Exception {
@ -42,6 +41,7 @@ public class ReactorHttpServer extends HttpServerSupport
this.reactorServer = reactor.io.netty.http.HttpServer.create(getHost(), getPort());
}
@Override
public boolean isRunning() {
return this.running;
@ -57,16 +57,6 @@ public class ReactorHttpServer extends HttpServerSupport
return reactorServer;
}
@Override
public boolean isStarted() {
return running;
}
@Override
public boolean isTerminated() {
return !running;
}
@Override
public void start() {
if (!this.running) {

View File

@ -33,7 +33,7 @@ import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import reactor.test.TestSubscriber;
import org.springframework.http.codec.Pojo;
import org.springframework.http.HttpHeaders;