Fix RxJava1Converter -> RxJava1ObservableConverter

This commit is contained in:
Sebastien Deleuze 2016-01-20 11:45:18 +01:00
parent 2dd9c92267
commit 4572cfa38c
4 changed files with 13 additions and 12 deletions

View File

@ -20,7 +20,8 @@ import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1Converter; import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import rx.Observable; import rx.Observable;
import rx.Single; import rx.Single;
@ -50,16 +51,16 @@ public final class ReactiveStreamsToRxJava1Converter implements GenericConverter
return null; return null;
} }
if (Observable.class.isAssignableFrom(source.getClass())) { if (Observable.class.isAssignableFrom(source.getClass())) {
return RxJava1Converter.from((Observable) source); return RxJava1ObservableConverter.from((Observable) source);
} }
else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return RxJava1Converter.from((Publisher) source); return RxJava1ObservableConverter.from((Publisher) source);
} }
else if (Single.class.isAssignableFrom(source.getClass())) { else if (Single.class.isAssignableFrom(source.getClass())) {
return reactor.core.converter.RxJava1SingleConverter.from((Single) source); return RxJava1SingleConverter.from((Single) source);
} }
else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return reactor.core.converter.RxJava1SingleConverter.from((Publisher) source); return RxJava1SingleConverter.from((Publisher) source);
} }
return null; return null;
} }

View File

@ -21,7 +21,7 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler; import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1Converter; import reactor.core.converter.RxJava1ObservableConverter;
import rx.Observable; import rx.Observable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -44,7 +44,7 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request); RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request);
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response); RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response);
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse); Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
return RxJava1Converter.from(result); return RxJava1ObservableConverter.from(result);
} }
} }

View File

@ -26,7 +26,7 @@ import java.util.Map;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import reactor.core.converter.RxJava1Converter; import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import rx.Observable; import rx.Observable;
@ -91,7 +91,7 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
public Flux<ByteBuffer> getBody() { public Flux<ByteBuffer> getBody() {
Observable<ByteBuffer> content = this.request.getContent().map(ByteBuf::nioBuffer); Observable<ByteBuffer> content = this.request.getContent().map(ByteBuf::nioBuffer);
content = content.concatWith(Observable.empty()); // See GH issue #58 content = content.concatWith(Observable.empty()); // See GH issue #58
return RxJava1Converter.from(content); return RxJava1ObservableConverter.from(content);
} }
} }

View File

@ -23,8 +23,8 @@ import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie; import io.netty.handler.codec.http.cookie.DefaultCookie;
import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.converter.RxJava1Converter;
import rx.Observable; import rx.Observable;
import org.springframework.http.HttpCookie; import org.springframework.http.HttpCookie;
@ -59,9 +59,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
@Override @Override
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) { protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
Observable<byte[]> content = RxJava1Converter.from(publisher).map(this::toBytes); Observable<byte[]> content = RxJava1ObservableConverter.from(publisher).map(this::toBytes);
Observable<Void> completion = this.response.writeBytes(content); Observable<Void> completion = this.response.writeBytes(content);
return RxJava1Converter.from(completion).after(); return RxJava1ObservableConverter.from(completion).after();
} }
private byte[] toBytes(ByteBuffer buffer) { private byte[] toBytes(ByteBuffer buffer) {