diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 9b237c9a094..ab205a8f085 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -218,7 +218,7 @@ public class ReactiveAdapterRegistry { source -> source); registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(CompletionStage.class, EmptyCompletableFuture::new), + ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new), source -> Mono.fromCompletionStage((CompletionStage) source), source -> Mono.from(source).toFuture() ); diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java index 0c0183845f6..ad6e99fcf0e 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,18 +39,24 @@ public final class ReactiveTypeDescriptor { @Nullable private final Supplier emptyValueSupplier; + private final boolean deferred; + - /** - * Private constructor. See static factory methods. - */ private ReactiveTypeDescriptor(Class reactiveType, boolean multiValue, boolean noValue, @Nullable Supplier emptySupplier) { + this(reactiveType, multiValue, noValue, emptySupplier, true); + } + + private ReactiveTypeDescriptor(Class reactiveType, boolean multiValue, boolean noValue, + @Nullable Supplier emptySupplier, boolean deferred) { + Assert.notNull(reactiveType, "'reactiveType' must not be null"); this.reactiveType = reactiveType; this.multiValue = multiValue; this.noValue = noValue; this.emptyValueSupplier = emptySupplier; + this.deferred = deferred; } @@ -95,6 +101,16 @@ public final class ReactiveTypeDescriptor { return this.emptyValueSupplier.get(); } + /** + * Whether the underlying operation is deferred and needs to be started + * explicitly, e.g. via subscribing (or similar), or whether it is triggered + * without the consumer having any control. + * @since 5.1.16 + */ + public boolean isDeferred() { + return this.deferred; + } + @Override public boolean equals(@Nullable Object other) { @@ -148,4 +164,15 @@ public final class ReactiveTypeDescriptor { return new ReactiveTypeDescriptor(type, false, true, emptySupplier); } + /** + * The same as {@link #singleOptionalValue(Class, Supplier)} but for a + * non-deferred, async type such as {@link java.util.concurrent.CompletableFuture}. + * @param type the reactive type + * @param emptySupplier a supplier of an empty-value instance of the reactive type + * @since 5.1.16 + */ + public static ReactiveTypeDescriptor nonDeferredAsyncValue(Class type, Supplier emptySupplier) { + return new ReactiveTypeDescriptor(type, false, false, emptySupplier, false); + } + } diff --git a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java index 50f7b8b77b0..8e8374fc583 100644 --- a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java +++ b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -265,9 +265,26 @@ class ReactiveAdapterRegistryTests { assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); } + @Test + void deferred() { + assertThat(getAdapter(CompletableFuture.class).getDescriptor().isDeferred()).isEqualTo(false); + + assertThat(getAdapter(Mono.class).getDescriptor().isDeferred()).isEqualTo(true); + assertThat(getAdapter(Flux.class).getDescriptor().isDeferred()).isEqualTo(true); + + assertThat(getAdapter(io.reactivex.Completable.class).getDescriptor().isDeferred()).isEqualTo(true); + assertThat(getAdapter(io.reactivex.Single.class).getDescriptor().isDeferred()).isEqualTo(true); + assertThat(getAdapter(io.reactivex.Flowable.class).getDescriptor().isDeferred()).isEqualTo(true); + assertThat(getAdapter(io.reactivex.Observable.class).getDescriptor().isDeferred()).isEqualTo(true); + + assertThat(getAdapter(Deferred.class).getDescriptor().isDeferred()).isEqualTo(true); + assertThat(getAdapter(kotlinx.coroutines.flow.Flow.class).getDescriptor().isDeferred()).isEqualTo(true); + } private ReactiveAdapter getAdapter(Class reactiveType) { - return this.registry.getAdapter(reactiveType); + ReactiveAdapter adapter = this.registry.getAdapter(reactiveType); + assertThat(adapter).isNotNull(); + return adapter; } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 5636a9faa0f..523050a37a8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -175,6 +175,9 @@ class WriteResultPublisher implements Publisher { @Override void publishComplete(WriteResultPublisher publisher) { publisher.completedBeforeSubscribed = true; + if(State.SUBSCRIBED.equals(publisher.state.get())) { + publisher.state.get().publishComplete(publisher); + } } @Override void publishError(WriteResultPublisher publisher, Throwable ex) { @@ -190,6 +193,9 @@ class WriteResultPublisher implements Publisher { @Override void publishComplete(WriteResultPublisher publisher) { publisher.completedBeforeSubscribed = true; + if(State.SUBSCRIBED.equals(publisher.state.get())) { + publisher.state.get().publishComplete(publisher); + } } @Override void publishError(WriteResultPublisher publisher, Throwable ex) { diff --git a/spring-web/src/main/java/org/springframework/web/client/HttpMessageConverterExtractor.java b/spring-web/src/main/java/org/springframework/web/client/HttpMessageConverterExtractor.java index 3e640b2c9ae..771a444bd73 100644 --- a/spring-web/src/main/java/org/springframework/web/client/HttpMessageConverterExtractor.java +++ b/spring-web/src/main/java/org/springframework/web/client/HttpMessageConverterExtractor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.FileCopyUtils; /** * Response extractor that uses the given {@linkplain HttpMessageConverter entity converters} @@ -120,17 +121,17 @@ public class HttpMessageConverterExtractor implements ResponseExtractor { this.responseType + "] and content type [" + contentType + "]", ex); } - throw new RestClientException("Could not extract response: no suitable HttpMessageConverter found " + - "for response type [" + this.responseType + "] and content type [" + contentType + "]"); + throw new UnknownContentTypeException(this.responseType, contentType, + response.getRawStatusCode(), response.getStatusText(), response.getHeaders(), + getResponseBody(response)); } /** * Determine the Content-Type of the response based on the "Content-Type" * header or otherwise default to {@link MediaType#APPLICATION_OCTET_STREAM}. * @param response the response - * @return the MediaType, possibly {@code null}. + * @return the MediaType, or "application/octet-stream" */ - @Nullable protected MediaType getContentType(ClientHttpResponse response) { MediaType contentType = response.getHeaders().getContentType(); if (contentType == null) { @@ -142,4 +143,13 @@ public class HttpMessageConverterExtractor implements ResponseExtractor { return contentType; } + private static byte[] getResponseBody(ClientHttpResponse response) { + try { + return FileCopyUtils.copyToByteArray(response.getBody()); + } + catch (IOException ex) { + // ignore + } + return new byte[0]; + } } diff --git a/spring-web/src/main/java/org/springframework/web/client/UnknownContentTypeException.java b/spring-web/src/main/java/org/springframework/web/client/UnknownContentTypeException.java new file mode 100644 index 00000000000..fca905394c8 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/UnknownContentTypeException.java @@ -0,0 +1,128 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client; + +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.lang.Nullable; + +/** + * Raised when no suitable + * {@link org.springframework.http.converter.HttpMessageConverter} could be + * found to extract the response. + * + * @author Rossen Stoyanchev + * @since 5.2.7 + */ +public class UnknownContentTypeException extends RestClientException { + + private static final long serialVersionUID = 2759516676367274084L; + + + private final Type targetType; + + private final MediaType contentType; + + private final int rawStatusCode; + + private final String statusText; + + private final byte[] responseBody; + + private final HttpHeaders responseHeaders; + + + /** + * Construct a new instance of with the given response data. + * @param targetType the expected target type + * @param contentType the content type of the response + * @param statusCode the raw status code value + * @param statusText the status text + * @param responseHeaders the response headers (may be {@code null}) + * @param responseBody the response body content (may be {@code null}) + */ + public UnknownContentTypeException(Type targetType, MediaType contentType, + int statusCode, String statusText, HttpHeaders responseHeaders, byte[] responseBody) { + + super("Could not extract response: no suitable HttpMessageConverter found " + + "for response type [" + targetType + "] and content type [" + contentType + "]"); + + this.targetType = targetType; + this.contentType = contentType; + this.rawStatusCode = statusCode; + this.statusText = statusText; + this.responseHeaders = responseHeaders; + this.responseBody = responseBody; + } + + + /** + * Return the target type expected for the response. + */ + public Type getTargetType() { + return this.targetType; + } + + /** + * Return the content type of the response, or "application/octet-stream". + */ + public MediaType getContentType() { + return this.contentType; + } + + /** + * Return the raw HTTP status code value. + */ + public int getRawStatusCode() { + return this.rawStatusCode; + } + + /** + * Return the HTTP status text. + */ + public String getStatusText() { + return this.statusText; + } + + /** + * Return the HTTP response headers. + */ + @Nullable + public HttpHeaders getResponseHeaders() { + return this.responseHeaders; + } + + /** + * Return the response body as a byte array. + */ + public byte[] getResponseBody() { + return this.responseBody; + } + + /** + * Return the response body converted to String using the charset from the + * response "Content-Type" or {@code "UTF-8"} otherwise. + */ + public String getResponseBodyAsString() { + return new String(this.responseBody, this.contentType.getCharset() != null ? + this.contentType.getCharset() : StandardCharsets.UTF_8); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java index 26dc71ae9ce..3f33a3de896 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java @@ -380,23 +380,32 @@ public abstract class AbstractSockJsSession implements SockJsSession { public void delegateMessages(String... messages) throws SockJsMessageDeliveryException { for (int i = 0; i < messages.length; i++) { try { - if (!isClosed()) { - this.handler.handleMessage(this, new TextMessage(messages[i])); - } - else { - List undelivered = getUndelivered(messages, i); - if (undelivered.isEmpty()) { - return; - } - throw new SockJsMessageDeliveryException(this.id, undelivered, "Session closed"); + if (isClosed()) { + logUndeliveredMessages(i, messages); + return; } + this.handler.handleMessage(this, new TextMessage(messages[i])); } catch (Exception ex) { + if (isClosed()) { + if (logger.isTraceEnabled()) { + logger.trace("Failed to handle message '" + messages[i] + "'", ex); + } + logUndeliveredMessages(i, messages); + return; + } throw new SockJsMessageDeliveryException(this.id, getUndelivered(messages, i), ex); } } } + private void logUndeliveredMessages(int index, String[] messages) { + List undelivered = getUndelivered(messages, index); + if (logger.isTraceEnabled() && !undelivered.isEmpty()) { + logger.trace("Dropped inbound message(s) due to closed session: " + undelivered); + } + } + private static List getUndelivered(String[] messages, int i) { switch (messages.length - i) { case 0: diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java index 154a9bd8721..2421ded37e4 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; -import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; @@ -118,10 +117,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests session.delegateMessages(msg1, msg2, msg3)) - .satisfies(ex -> assertThat(ex.getUndeliveredMessages()).containsExactly(msg3)); + session.delegateMessages(msg1, msg2, msg3); verify(this.webSocketHandler).afterConnectionEstablished(session); verify(this.webSocketHandler).handleMessage(session, new TextMessage(msg1)); diff --git a/src/docs/asciidoc/web/webmvc.adoc b/src/docs/asciidoc/web/webmvc.adoc index 584cecdc1b8..0a759081440 100644 --- a/src/docs/asciidoc/web/webmvc.adoc +++ b/src/docs/asciidoc/web/webmvc.adoc @@ -1634,7 +1634,7 @@ extracts the name, version, and file extension: .Java ---- @GetMapping("/{name:[a-z-]+}-{version:\\d\\.\\d\\.\\d}{ext:\\.[a-z]+}") - public void handle(@PathVariable String version, @PathVariable String ext) { + public void handle(@PathVariable String name, @PathVariable String version, @PathVariable String ext) { // ... } ---- @@ -1642,7 +1642,7 @@ extracts the name, version, and file extension: .Kotlin ---- @GetMapping("/{name:[a-z-]+}-{version:\\d\\.\\d\\.\\d}{ext:\\.[a-z]+}") - fun handle(@PathVariable version: String, @PathVariable ext: String) { + fun handle(@PathVariable name: String, @PathVariable version: String, @PathVariable ext: String) { // ... } ----