Filter empty buffers PayloadMethodArgumentResolver

An empty buffer for RSocket is an empty message and while some codecs
such as StringDecoder may be able turn that into something such as an
empty String, for other formats such as JSON it is invalid input.

Closes gh-26344
This commit is contained in:
Rossen Stoyanchev 2021-01-07 17:25:07 +00:00
parent d387d9ae1e
commit e36d4162c2
2 changed files with 30 additions and 8 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -38,6 +38,7 @@ import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@ -232,6 +233,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
@ -245,6 +247,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
else {
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
@ -262,6 +265,14 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
message, parameter, "Cannot decode to [" + targetType + "]" + message));
}
private boolean nonEmptyDataBuffer(DataBuffer buffer) {
if (buffer.readableByteCount() > 0) {
return true;
}
DataBufferUtils.release(buffer);
return false;
}
private Throwable handleReadError(MethodParameter parameter, Message<?> message, Throwable ex) {
return ex instanceof DecodingException ?
new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,6 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
@ -43,6 +42,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
@ -164,6 +164,12 @@ public class RSocketClientToServerIntegrationTests {
.verify(Duration.ofSeconds(5));
}
@Test // gh-26344
public void echoChannelWithEmptyInput() {
Flux<String> result = requester.route("echo-channel-empty").data(Flux.empty()).retrieveFlux(String.class);
StepVerifier.create(result).verifyComplete();
}
@Test
public void metadataPush() {
Flux.just("bar", "baz")
@ -254,6 +260,11 @@ public class RSocketClientToServerIntegrationTests {
return payloads.delayElements(Duration.ofMillis(10)).map(payload -> payload + " async");
}
@MessageMapping("echo-channel-empty")
Flux<String> echoChannelEmpty(@Payload(required = false) Flux<String> payloads) {
return payloads.map(payload -> payload + " echoed");
}
@MessageMapping("thrown-exception")
Mono<String> handleAndThrow(String payload) {
throw new IllegalArgumentException("Invalid input error");
@ -338,29 +349,29 @@ public class RSocketClientToServerIntegrationTests {
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
public Mono<Void> fireAndForget(io.rsocket.Payload payload) {
return this.delegate.fireAndForget(payload)
.doOnSuccess(aVoid -> this.fireAndForgetCount.incrementAndGet());
}
@Override
public Mono<Void> metadataPush(Payload payload) {
public Mono<Void> metadataPush(io.rsocket.Payload payload) {
return this.delegate.metadataPush(payload)
.doOnSuccess(aVoid -> this.metadataPushCount.incrementAndGet());
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
public Mono<io.rsocket.Payload> requestResponse(io.rsocket.Payload payload) {
return this.delegate.requestResponse(payload);
}
@Override
public Flux<Payload> requestStream(Payload payload) {
public Flux<io.rsocket.Payload> requestStream(io.rsocket.Payload payload) {
return this.delegate.requestStream(payload);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
public Flux<io.rsocket.Payload> requestChannel(Publisher<io.rsocket.Payload> payloads) {
return this.delegate.requestChannel(payloads);
}
}