diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java index 279a9b165c3..b2fca283a00 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java @@ -16,161 +16,61 @@ package org.springframework.web.servlet.function; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.Function; - -import javax.servlet.ServletException; -import javax.servlet.http.Cookie; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Publisher; -import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; -import org.springframework.util.MultiValueMap; -import org.springframework.web.context.request.async.AsyncWebRequest; -import org.springframework.web.context.request.async.DeferredResult; -import org.springframework.web.context.request.async.WebAsyncManager; -import org.springframework.web.context.request.async.WebAsyncUtils; -import org.springframework.web.servlet.ModelAndView; /** - * Implementation of {@link ServerResponse} based on a {@link CompletableFuture}. + * Asynchronous subtype of {@link ServerResponse} that exposes the future + * response. * * @author Arjen Poutsma - * @since 5.3 + * @since 5.3.2 * @see ServerResponse#async(Object) */ -final class AsyncServerResponse extends ErrorHandlingServerResponse { +public interface AsyncServerResponse extends ServerResponse { - static final boolean reactiveStreamsPresent = ClassUtils.isPresent( - "org.reactivestreams.Publisher", AsyncServerResponse.class.getClassLoader()); + /** + * Blocks indefinitely until the future response is obtained. + */ + ServerResponse block(); - private final CompletableFuture futureResponse; + // Static creation methods - @Nullable - private final Duration timeout; - - - private AsyncServerResponse(CompletableFuture futureResponse, @Nullable Duration timeout) { - this.futureResponse = futureResponse; - this.timeout = timeout; + /** + * Create a {@code AsyncServerResponse} with the given asynchronous response. + * Parameter {@code asyncResponse} can be a + * {@link CompletableFuture CompletableFuture<ServerResponse>} or + * {@link Publisher Publisher<ServerResponse>} (or any + * asynchronous producer of a single {@code ServerResponse} that can be + * adapted via the {@link ReactiveAdapterRegistry}). + * @param asyncResponse a {@code CompletableFuture} or + * {@code Publisher} + * @return the asynchronous response + */ + static AsyncServerResponse create(Object asyncResponse) { + return DefaultAsyncServerResponse.create(asyncResponse, null); } - @Override - public HttpStatus statusCode() { - return delegate(ServerResponse::statusCode); + /** + * Create a (built) response with the given asynchronous response. + * Parameter {@code asyncResponse} can be a + * {@link CompletableFuture CompletableFuture<ServerResponse>} or + * {@link Publisher Publisher<ServerResponse>} (or any + * asynchronous producer of a single {@code ServerResponse} that can be + * adapted via the {@link ReactiveAdapterRegistry}). + * @param asyncResponse a {@code CompletableFuture} or + * {@code Publisher} + * @param timeout maximum time period to wait for before timing out + * @return the asynchronous response + */ + static AsyncServerResponse create(Object asyncResponse, Duration timeout) { + return DefaultAsyncServerResponse.create(asyncResponse, timeout); } - @Override - public int rawStatusCode() { - return delegate(ServerResponse::rawStatusCode); - } - - @Override - public HttpHeaders headers() { - return delegate(ServerResponse::headers); - } - - @Override - public MultiValueMap cookies() { - return delegate(ServerResponse::cookies); - } - - private R delegate(Function function) { - ServerResponse response = this.futureResponse.getNow(null); - if (response != null) { - return function.apply(response); - } - else { - throw new IllegalStateException("Future ServerResponse has not yet completed"); - } - } - - @Nullable - @Override - public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context) - throws ServletException, IOException { - - writeAsync(request, response, createDeferredResult()); - return null; - } - - static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult deferredResult) - throws ServletException, IOException { - - WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); - AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); - asyncManager.setAsyncWebRequest(asyncWebRequest); - try { - asyncManager.startDeferredResultProcessing(deferredResult); - } - catch (IOException | ServletException ex) { - throw ex; - } - catch (Exception ex) { - throw new ServletException("Async processing failed", ex); - } - - } - - private DeferredResult createDeferredResult() { - DeferredResult result; - if (this.timeout != null) { - result = new DeferredResult<>(this.timeout.toMillis()); - } - else { - result = new DeferredResult<>(); - } - this.futureResponse.handle((value, ex) -> { - if (ex != null) { - if (ex instanceof CompletionException && ex.getCause() != null) { - ex = ex.getCause(); - } - result.setErrorResult(ex); - } - else { - result.setResult(value); - } - return null; - }); - return result; - } - - - @SuppressWarnings({"unchecked"}) - public static ServerResponse create(Object o, @Nullable Duration timeout) { - Assert.notNull(o, "Argument to async must not be null"); - - if (o instanceof CompletableFuture) { - CompletableFuture futureResponse = (CompletableFuture) o; - return new AsyncServerResponse(futureResponse, timeout); - } - else if (reactiveStreamsPresent) { - ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); - ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass()); - if (publisherAdapter != null) { - Publisher publisher = publisherAdapter.toPublisher(o); - ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class); - if (futureAdapter != null) { - CompletableFuture futureResponse = - (CompletableFuture) futureAdapter.fromPublisher(publisher); - return new AsyncServerResponse(futureResponse, timeout); - } - } - } - throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass()); - } - - } + diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java new file mode 100644 index 00000000000..0fd28344543 --- /dev/null +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java @@ -0,0 +1,191 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.servlet.function; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import javax.servlet.ServletException; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.reactivestreams.Publisher; + +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.MultiValueMap; +import org.springframework.web.context.request.async.AsyncWebRequest; +import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.context.request.async.WebAsyncManager; +import org.springframework.web.context.request.async.WebAsyncUtils; +import org.springframework.web.servlet.ModelAndView; + +/** + * Default {@link AsyncServerResponse} implementation. + * + * @author Arjen Poutsma + * @since 5.3.2 + */ +final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse implements AsyncServerResponse { + + static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", DefaultAsyncServerResponse.class.getClassLoader()); + + private final CompletableFuture futureResponse; + + @Nullable + private final Duration timeout; + + + private DefaultAsyncServerResponse(CompletableFuture futureResponse, @Nullable Duration timeout) { + this.futureResponse = futureResponse; + this.timeout = timeout; + } + + @Override + public ServerResponse block() { + try { + if (this.timeout != null) { + return this.futureResponse.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS); + } + else { + return this.futureResponse.get(); + } + } + catch (InterruptedException | ExecutionException | TimeoutException ex) { + throw new IllegalStateException("Failed to get future response", ex); + } + } + + @Override + public HttpStatus statusCode() { + return delegate(ServerResponse::statusCode); + } + + @Override + public int rawStatusCode() { + return delegate(ServerResponse::rawStatusCode); + } + + @Override + public HttpHeaders headers() { + return delegate(ServerResponse::headers); + } + + @Override + public MultiValueMap cookies() { + return delegate(ServerResponse::cookies); + } + + private R delegate(Function function) { + ServerResponse response = this.futureResponse.getNow(null); + if (response != null) { + return function.apply(response); + } + else { + throw new IllegalStateException("Future ServerResponse has not yet completed"); + } + } + + @Nullable + @Override + public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context) + throws ServletException, IOException { + + writeAsync(request, response, createDeferredResult()); + return null; + } + + static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult deferredResult) + throws ServletException, IOException { + + WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); + AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); + asyncManager.setAsyncWebRequest(asyncWebRequest); + try { + asyncManager.startDeferredResultProcessing(deferredResult); + } + catch (IOException | ServletException ex) { + throw ex; + } + catch (Exception ex) { + throw new ServletException("Async processing failed", ex); + } + + } + + private DeferredResult createDeferredResult() { + DeferredResult result; + if (this.timeout != null) { + result = new DeferredResult<>(this.timeout.toMillis()); + } + else { + result = new DeferredResult<>(); + } + this.futureResponse.handle((value, ex) -> { + if (ex != null) { + if (ex instanceof CompletionException && ex.getCause() != null) { + ex = ex.getCause(); + } + result.setErrorResult(ex); + } + else { + result.setResult(value); + } + return null; + }); + return result; + } + + @SuppressWarnings({"unchecked"}) + public static AsyncServerResponse create(Object o, @Nullable Duration timeout) { + Assert.notNull(o, "Argument to async must not be null"); + + if (o instanceof CompletableFuture) { + CompletableFuture futureResponse = (CompletableFuture) o; + return new DefaultAsyncServerResponse(futureResponse, timeout); + } + else if (reactiveStreamsPresent) { + ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass()); + if (publisherAdapter != null) { + Publisher publisher = publisherAdapter.toPublisher(o); + ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class); + if (futureAdapter != null) { + CompletableFuture futureResponse = + (CompletableFuture) futureAdapter.fromPublisher(publisher); + return new DefaultAsyncServerResponse(futureResponse, timeout); + } + } + } + throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass()); + } + + +} diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java index f28b1d81745..4dfad874a4e 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java @@ -208,7 +208,7 @@ final class DefaultEntityResponseBuilder implements EntityResponse.Builder return new CompletionStageEntityResponse(this.status, this.headers, this.cookies, completionStage, this.entityType); } - else if (AsyncServerResponse.reactiveStreamsPresent) { + else if (DefaultAsyncServerResponse.reactiveStreamsPresent) { ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(this.entity.getClass()); if (adapter != null) { Publisher publisher = adapter.toPublisher(this.entity); @@ -362,7 +362,7 @@ final class DefaultEntityResponseBuilder implements EntityResponse.Builder Context context) throws ServletException, IOException { DeferredResult deferredResult = createDeferredResult(servletRequest, servletResponse, context); - AsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); + DefaultAsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); return null; } @@ -410,7 +410,7 @@ final class DefaultEntityResponseBuilder implements EntityResponse.Builder Context context) throws ServletException, IOException { DeferredResult deferredResult = new DeferredResult<>(); - AsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); + DefaultAsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); entity().subscribe(new DeferredResultSubscriber(servletRequest, servletResponse, context, deferredResult)); return null; diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java index 85823b75b9c..11efe5b0223 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java @@ -236,7 +236,7 @@ public interface ServerResponse { * @since 5.3 */ static ServerResponse async(Object asyncResponse) { - return AsyncServerResponse.create(asyncResponse, null); + return DefaultAsyncServerResponse.create(asyncResponse, null); } /** @@ -257,7 +257,7 @@ public interface ServerResponse { * @since 5.3.2 */ static ServerResponse async(Object asyncResponse, Duration timeout) { - return AsyncServerResponse.create(asyncResponse, timeout); + return DefaultAsyncServerResponse.create(asyncResponse, timeout); } /** diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java index a641033b6ce..e078a66d110 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java @@ -89,7 +89,7 @@ final class SseServerResponse extends AbstractServerResponse { result = new DeferredResult<>(); } - AsyncServerResponse.writeAsync(request, response, result); + DefaultAsyncServerResponse.writeAsync(request, response, result); this.sseConsumer.accept(new DefaultSseBuilder(response, context, result)); return null; } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java new file mode 100644 index 00000000000..8f3cf3546d9 --- /dev/null +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.servlet.function; + +import java.util.concurrent.CompletableFuture; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Arjen Poutsma + */ +class DefaultAsyncServerResponseTests { + + @Test + void block() { + ServerResponse wrappee = ServerResponse.ok().build(); + CompletableFuture future = CompletableFuture.completedFuture(wrappee); + AsyncServerResponse response = AsyncServerResponse.create(future); + + assertThat(response.block()).isSameAs(wrappee); + } + +}