Update to latest Reactor 3.1 API changes
Issue: SPR-15318
This commit is contained in:
parent
135651de9a
commit
005e85b0f5
|
|
@ -90,7 +90,7 @@ public class DecoderHttpMessageReader<T> implements HttpMessageReader<T> {
|
|||
MediaType contentType = getContentType(message);
|
||||
return this.decoder
|
||||
.decode(message.getBody(), elementType, contentType, hints)
|
||||
.mapError(this::mapError);
|
||||
.onErrorMap(this::mapError);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -100,7 +100,7 @@ public class DecoderHttpMessageReader<T> implements HttpMessageReader<T> {
|
|||
MediaType contentType = getContentType(message);
|
||||
return this.decoder
|
||||
.decodeToMono(message.getBody(), elementType, contentType, hints)
|
||||
.mapError(this::mapError);
|
||||
.onErrorMap(this::mapError);
|
||||
}
|
||||
|
||||
private MediaType getContentType(HttpMessage inputMessage) {
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
|
|||
|
||||
Flux<DataBuffer> body = this.encoder
|
||||
.encode(inputStream, message.bufferFactory(), elementType, contentType, hints)
|
||||
.mapError(this::mapError);
|
||||
.onErrorMap(this::mapError);
|
||||
|
||||
return isStreamingMediaType(contentType) ?
|
||||
message.writeAndFlushWith(body.map(Flux::just)) :
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ public class ReactorHttpHandlerAdapter
|
|||
ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response, bufferFactory);
|
||||
|
||||
return this.httpHandler.handle(req, resp)
|
||||
.switchOnError(ex -> {
|
||||
.onErrorResume(ex -> {
|
||||
logger.error("Could not complete request", ex);
|
||||
response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
|
||||
return Mono.empty();
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
|
|||
RxNettyServerHttpResponse response = new RxNettyServerHttpResponse(nativeResponse, bufferFactory);
|
||||
|
||||
Publisher<Void> result = this.httpHandler.handle(request, response)
|
||||
.switchOnError(ex -> {
|
||||
.onErrorResume(ex -> {
|
||||
logger.error("Could not complete request", ex);
|
||||
nativeResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
|
||||
return Mono.empty();
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
|
|||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
ServerWebExchange exchange = createExchange(request, response);
|
||||
return getDelegate().handle(exchange)
|
||||
.switchOnError(ex -> {
|
||||
.onErrorResume(ex -> {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Could not complete request", ex);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ public class ExceptionHandlingWebHandler extends WebHandlerDecorator {
|
|||
}
|
||||
|
||||
for (WebExceptionHandler handler : this.exceptionHandlers) {
|
||||
completion = completion.switchOnError(ex -> handler.handle(exchange, ex));
|
||||
completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
|
||||
}
|
||||
|
||||
return completion;
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ public class ServerHttpResponseTests {
|
|||
public void writeWithError() throws Exception {
|
||||
TestServerHttpResponse response = new TestServerHttpResponse();
|
||||
IllegalStateException error = new IllegalStateException("boo");
|
||||
response.writeWith(Flux.error(error)).switchOnError(ex -> Mono.empty()).block();
|
||||
response.writeWith(Flux.error(error)).onErrorResume(ex -> Mono.empty()).block();
|
||||
|
||||
assertFalse(response.statusCodeWritten);
|
||||
assertFalse(response.headersWritten);
|
||||
|
|
|
|||
|
|
@ -141,7 +141,7 @@ public class DispatcherHandler implements WebHandler, ApplicationContextAware {
|
|||
|
||||
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
|
||||
return getResultHandler(result).handleResult(exchange, result)
|
||||
.switchOnError(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
|
||||
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
|
||||
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -120,13 +120,13 @@ class DefaultServerRequest implements ServerRequest {
|
|||
@Override
|
||||
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
|
||||
Mono<T> mono = body(BodyExtractors.toMono(elementClass));
|
||||
return mono.mapError(UnsupportedMediaTypeException.class, ERROR_MAPPER);
|
||||
return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
|
||||
Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
|
||||
return flux.mapError(UnsupportedMediaTypeException.class, ERROR_MAPPER);
|
||||
return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -234,7 +234,7 @@ public abstract class RouterFunctions {
|
|||
.defaultIfEmpty(notFound())
|
||||
.flatMap(handlerFunction -> wrapException(() -> handlerFunction.handle(request)))
|
||||
.flatMap(response -> wrapException(() -> response.writeTo(exchange, strategies)))
|
||||
.switchOnError(ResponseStatusException.class,
|
||||
.onErrorResume(ResponseStatusException.class,
|
||||
ex -> {
|
||||
exchange.getResponse().setStatusCode(ex.getStatus());
|
||||
if (ex.getMessage() != null) {
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ public abstract class AbstractMessageReaderArgumentResolver extends HandlerMetho
|
|||
Map<String, Object> readHints = Collections.emptyMap();
|
||||
if (adapter != null && adapter.isMultiValue()) {
|
||||
Flux<?> flux = reader.read(bodyType, elementType, request, response, readHints);
|
||||
flux = flux.onErrorResumeWith(ex -> Flux.error(getReadError(bodyParameter, ex)));
|
||||
flux = flux.onErrorResume(ex -> Flux.error(getReadError(bodyParameter, ex)));
|
||||
if (isBodyRequired || !adapter.supportsEmpty()) {
|
||||
flux = flux.switchIfEmpty(Flux.error(getRequiredBodyError(bodyParameter)));
|
||||
}
|
||||
|
|
@ -130,7 +130,7 @@ public abstract class AbstractMessageReaderArgumentResolver extends HandlerMetho
|
|||
}
|
||||
else {
|
||||
Mono<?> mono = reader.readMono(bodyType, elementType, request, response, readHints);
|
||||
mono = mono.switchOnError(ex -> Mono.error(getReadError(bodyParameter, ex)));
|
||||
mono = mono.onErrorResume(ex -> Mono.error(getReadError(bodyParameter, ex)));
|
||||
if (isBodyRequired || (adapter != null && !adapter.supportsEmpty())) {
|
||||
mono = mono.switchIfEmpty(Mono.error(getRequiredBodyError(bodyParameter)));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Application
|
|||
.then(() -> this.methodResolver.getRequestMappingMethod(handlerMethod)
|
||||
.invoke(exchange, bindingContext)
|
||||
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
|
||||
.switchOnError(exceptionHandler));
|
||||
.onErrorResume(exceptionHandler));
|
||||
}
|
||||
|
||||
private Mono<HandlerResult> handleException(Throwable ex, HandlerMethod handlerMethod,
|
||||
|
|
|
|||
Loading…
Reference in New Issue