Update after `reactor.core.converter.Converters` changes
Reactor's `DependencyUtils` has been renamed to `Converters` and all the `from` converter methods have been disambiguated to `fromPublisher`, `toPublisher`.
This commit is contained in:
parent
799bb64275
commit
b5bce1f017
|
@ -56,22 +56,22 @@ public final class ReactorToRxJava1Converter implements GenericConverter {
|
|||
return null;
|
||||
}
|
||||
if (Observable.class.isAssignableFrom(sourceType.getType())) {
|
||||
return RxJava1ObservableConverter.from((Observable) source);
|
||||
return RxJava1ObservableConverter.toPublisher((Observable) source);
|
||||
}
|
||||
else if (Observable.class.isAssignableFrom(targetType.getType())) {
|
||||
return RxJava1ObservableConverter.from((Publisher) source);
|
||||
return RxJava1ObservableConverter.fromPublisher((Publisher) source);
|
||||
}
|
||||
else if (Single.class.isAssignableFrom(sourceType.getType())) {
|
||||
return RxJava1SingleConverter.from((Single) source);
|
||||
return RxJava1SingleConverter.toPublisher((Single) source);
|
||||
}
|
||||
else if (Single.class.isAssignableFrom(targetType.getType())) {
|
||||
return RxJava1SingleConverter.from((Publisher) source);
|
||||
return RxJava1SingleConverter.fromPublisher((Publisher) source);
|
||||
}
|
||||
else if (Completable.class.isAssignableFrom(sourceType.getType())) {
|
||||
return RxJava1CompletableConverter.from((Completable) source);
|
||||
return RxJava1CompletableConverter.toPublisher((Completable) source);
|
||||
}
|
||||
else if (Completable.class.isAssignableFrom(targetType.getType())) {
|
||||
return RxJava1CompletableConverter.from((Publisher) source);
|
||||
return RxJava1CompletableConverter.fromPublisher((Publisher) source);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
|||
@Override
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
|
||||
|
||||
this.body = RxJava1ObservableConverter.from(Flux.from(body)
|
||||
this.body = RxJava1ObservableConverter.fromPublisher(Flux.from(body)
|
||||
.map(b -> dataBufferFactory.wrap(b.asByteBuffer()).getNativeBuffer()));
|
||||
|
||||
return Mono.empty();
|
||||
|
@ -119,10 +119,10 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
|||
})
|
||||
.map(req -> {
|
||||
if (this.body != null) {
|
||||
return RxJava1ObservableConverter.from(req.writeContent(this.body));
|
||||
return RxJava1ObservableConverter.toPublisher(req.writeContent(this.body));
|
||||
}
|
||||
else {
|
||||
return RxJava1ObservableConverter.from(req);
|
||||
return RxJava1ObservableConverter.toPublisher(req);
|
||||
}
|
||||
})
|
||||
.flatMap(resp -> resp)
|
||||
|
|
|
@ -85,7 +85,7 @@ public class RxNettyClientHttpResponse implements ClientHttpResponse {
|
|||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
return RxJava1ObservableConverter
|
||||
.from(this.response.getContent().map(dataBufferFactory::wrap));
|
||||
.toPublisher(this.response.getContent().map(dataBufferFactory::wrap));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -49,7 +49,7 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
|
|||
RxNettyServerHttpResponse adaptedResponse =
|
||||
new RxNettyServerHttpResponse(response, dataBufferFactory);
|
||||
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
|
||||
return RxJava1ObservableConverter.from(result);
|
||||
return RxJava1ObservableConverter.fromPublisher(result);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
|
|||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
Observable<DataBuffer> content = this.request.getContent().map(dataBufferFactory::wrap);
|
||||
return RxJava1ObservableConverter.from(content);
|
||||
return RxJava1ObservableConverter.toPublisher(content);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -71,8 +71,8 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
|
|||
|
||||
@Override
|
||||
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
|
||||
Observable<ByteBuf> content = RxJava1ObservableConverter.from(body).map(this::toByteBuf);
|
||||
return RxJava1ObservableConverter.from(this.response.write(content, bb -> bb instanceof FlushingByteBuf)).then();
|
||||
Observable<ByteBuf> content = RxJava1ObservableConverter.fromPublisher(body).map(this::toByteBuf);
|
||||
return RxJava1ObservableConverter.toPublisher(this.response.write(content, bb -> bb instanceof FlushingByteBuf)).then();
|
||||
}
|
||||
|
||||
private ByteBuf toByteBuf(DataBuffer buffer) {
|
||||
|
|
|
@ -53,7 +53,7 @@ public class RxJava1WebResponseExtractors {
|
|||
|
||||
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
|
||||
//noinspection unchecked
|
||||
return webResponse -> (Single<T>) RxJava1SingleConverter.from(webResponse.getClientResponse()
|
||||
return webResponse -> (Single<T>) RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
|
||||
.flatMap(resp -> decodeResponseBody(resp, resolvableType, webResponse.getMessageDecoders()))
|
||||
.next());
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public class RxJava1WebResponseExtractors {
|
|||
public static <T> WebResponseExtractor<Observable<T>> bodyStream(Class<T> sourceClass) {
|
||||
|
||||
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
|
||||
return webResponse -> RxJava1ObservableConverter.from(webResponse.getClientResponse()
|
||||
return webResponse -> RxJava1ObservableConverter.fromPublisher(webResponse.getClientResponse()
|
||||
.flatMap(resp -> decodeResponseBody(resp, resolvableType, webResponse.getMessageDecoders())));
|
||||
}
|
||||
|
||||
|
@ -76,7 +76,7 @@ public class RxJava1WebResponseExtractors {
|
|||
|
||||
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
|
||||
return webResponse -> (Single<ResponseEntity<T>>)
|
||||
RxJava1SingleConverter.from(webResponse.getClientResponse()
|
||||
RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
|
||||
.then(response ->
|
||||
Mono.when(
|
||||
decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders()).next(),
|
||||
|
@ -94,10 +94,10 @@ public class RxJava1WebResponseExtractors {
|
|||
*/
|
||||
public static <T> WebResponseExtractor<Single<ResponseEntity<Observable<T>>>> responseStream(Class<T> sourceClass) {
|
||||
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
|
||||
return webResponse -> RxJava1SingleConverter.from(webResponse.getClientResponse()
|
||||
return webResponse -> RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
|
||||
.map(response -> new ResponseEntity<>(
|
||||
RxJava1ObservableConverter
|
||||
.from(decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders())),
|
||||
.fromPublisher(decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders())),
|
||||
response.getHeaders(),
|
||||
response.getStatusCode())));
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ public class RxJava1WebResponseExtractors {
|
|||
*/
|
||||
public static WebResponseExtractor<Single<HttpHeaders>> headers() {
|
||||
return webResponse -> RxJava1SingleConverter
|
||||
.from(webResponse.getClientResponse().map(resp -> resp.getHeaders()));
|
||||
.fromPublisher(webResponse.getClientResponse().map(resp -> resp.getHeaders()));
|
||||
}
|
||||
|
||||
protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import reactor.core.converter.DependencyUtils;
|
||||
import reactor.core.converter.Converters;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.BeanInitializationException;
|
||||
|
@ -288,7 +288,7 @@ public class WebReactiveConfiguration implements ApplicationContextAware {
|
|||
*/
|
||||
protected void addFormatters(FormatterRegistry registry) {
|
||||
registry.addConverter(new MonoToCompletableFutureConverter());
|
||||
if (DependencyUtils.hasRxJava1()) {
|
||||
if (Converters.hasRxJava1()) {
|
||||
registry.addConverter(new ReactorToRxJava1Converter());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -156,7 +156,7 @@ public class HttpEntityArgumentResolverTests {
|
|||
ResolvableType type = httpEntityType(forClassWithGenerics(Single.class, String.class));
|
||||
HttpEntity<Single<String>> entity = resolveValueWithEmptyBody(type);
|
||||
|
||||
TestSubscriber.subscribe(RxJava1SingleConverter.from(entity.getBody()))
|
||||
TestSubscriber.subscribe(RxJava1SingleConverter.toPublisher(entity.getBody()))
|
||||
.assertNoValues()
|
||||
.assertError(ServerWebInputException.class);
|
||||
}
|
||||
|
@ -166,7 +166,7 @@ public class HttpEntityArgumentResolverTests {
|
|||
ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class));
|
||||
HttpEntity<Observable<String>> entity = resolveValueWithEmptyBody(type);
|
||||
|
||||
TestSubscriber.subscribe(RxJava1ObservableConverter.from(entity.getBody()))
|
||||
TestSubscriber.subscribe(RxJava1ObservableConverter.toPublisher(entity.getBody()))
|
||||
.assertNoError()
|
||||
.assertComplete()
|
||||
.assertNoValues();
|
||||
|
|
|
@ -165,12 +165,12 @@ public class RequestBodyArgumentResolverTests {
|
|||
ResolvableType type = forClassWithGenerics(Single.class, String.class);
|
||||
|
||||
Single<String> single = resolveValueWithEmptyBody(type, true);
|
||||
TestSubscriber.subscribe(RxJava1SingleConverter.from(single))
|
||||
TestSubscriber.subscribe(RxJava1SingleConverter.toPublisher(single))
|
||||
.assertNoValues()
|
||||
.assertError(ServerWebInputException.class);
|
||||
|
||||
single = resolveValueWithEmptyBody(type, false);
|
||||
TestSubscriber.subscribe(RxJava1SingleConverter.from(single))
|
||||
TestSubscriber.subscribe(RxJava1SingleConverter.toPublisher(single))
|
||||
.assertNoValues()
|
||||
.assertError(ServerWebInputException.class);
|
||||
}
|
||||
|
@ -180,12 +180,12 @@ public class RequestBodyArgumentResolverTests {
|
|||
ResolvableType type = forClassWithGenerics(Observable.class, String.class);
|
||||
|
||||
Observable<String> observable = resolveValueWithEmptyBody(type, true);
|
||||
TestSubscriber.subscribe(RxJava1ObservableConverter.from(observable))
|
||||
TestSubscriber.subscribe(RxJava1ObservableConverter.toPublisher(observable))
|
||||
.assertNoValues()
|
||||
.assertError(ServerWebInputException.class);
|
||||
|
||||
observable = resolveValueWithEmptyBody(type, false);
|
||||
TestSubscriber.subscribe(RxJava1ObservableConverter.from(observable))
|
||||
TestSubscriber.subscribe(RxJava1ObservableConverter.toPublisher(observable))
|
||||
.assertNoValues()
|
||||
.assertComplete();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue