Shortcut handling of bodyToFlux(DataBuffer.class)

Given that the body is a Flux<DataBuffer> there probably could be a
Flux<DataBuffer> body();

At least bodyToFlux(DataBuffer.class) which is used when mutating and
is a common case should not incur overhead.

See gh-24680
This commit is contained in:
Rossen Stoyanchev 2020-05-08 21:40:39 +01:00
parent 67a06f5edc
commit 0e9ecb6c99
2 changed files with 27 additions and 18 deletions

View File

@ -30,6 +30,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
@ -66,6 +67,8 @@ class DefaultClientResponse implements ClientResponse {
private final Supplier<HttpRequest> requestSupplier;
private final BodyExtractor.Context bodyExtractorContext;
public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies strategies,
String logPrefix, String requestDescription, Supplier<HttpRequest> requestSupplier) {
@ -76,6 +79,22 @@ class DefaultClientResponse implements ClientResponse {
this.logPrefix = logPrefix;
this.requestDescription = requestDescription;
this.requestSupplier = requestSupplier;
this.bodyExtractorContext = new BodyExtractor.Context() {
@Override
public List<HttpMessageReader<?>> messageReaders() {
return strategies.messageReaders();
}
@Override
public Optional<ServerHttpResponse> serverResponse() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix);
}
};
}
@ -107,22 +126,7 @@ class DefaultClientResponse implements ClientResponse {
@SuppressWarnings("unchecked")
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
T result = extractor.extract(this.response, new BodyExtractor.Context() {
@Override
public List<HttpMessageReader<?>> messageReaders() {
return strategies.messageReaders();
}
@Override
public Optional<ServerHttpResponse> serverResponse() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix);
}
});
T result = extractor.extract(this.response, this.bodyExtractorContext);
String description = "Body from " + this.requestDescription + " [DefaultClientResponse]";
if (result instanceof Mono) {
return (T) ((Mono<?>) result).checkpoint(description);
@ -146,8 +150,10 @@ class DefaultClientResponse implements ClientResponse {
}
@Override
@SuppressWarnings("unchecked")
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
return body(BodyExtractors.toFlux(elementClass));
return elementClass.equals(DataBuffer.class) ?
(Flux<T>) body(BodyExtractors.toDataBuffers()) : body(BodyExtractors.toFlux(elementClass));
}
@Override

View File

@ -36,6 +36,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRange;
@ -195,8 +196,10 @@ class DefaultServerRequest implements ServerRequest {
}
@Override
@SuppressWarnings("unchecked")
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
Flux<T> flux = (elementClass.equals(DataBuffer.class) ?
(Flux<T>) request().getBody() : body(BodyExtractors.toFlux(elementClass)));
return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
.onErrorMap(DecodingException.class, DECODING_MAPPER);
}