From 3714d0e401102c4b9215dab3889dd31ba38d979f Mon Sep 17 00:00:00 2001 From: Alex Feigin Date: Mon, 30 Nov 2020 16:40:28 +0200 Subject: [PATCH] Expose future response in new AsyncServerResponse This commit introduces AsyncServerResponse, an extension of ServerResponse that is returned from ServerResponse.async and that allows users to get the future response by calling the block method. This is particularly useful for testing purposes. --- .../servlet/function/AsyncServerResponse.java | 174 ++++------------ .../function/DefaultAsyncServerResponse.java | 191 ++++++++++++++++++ .../DefaultEntityResponseBuilder.java | 6 +- .../web/servlet/function/ServerResponse.java | 4 +- .../servlet/function/SseServerResponse.java | 2 +- .../DefaultAsyncServerResponseTests.java | 39 ++++ 6 files changed, 273 insertions(+), 143 deletions(-) create mode 100644 spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java create mode 100644 spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java index 279a9b165c3..b2fca283a00 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java @@ -16,161 +16,61 @@ package org.springframework.web.servlet.function; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.Function; - -import javax.servlet.ServletException; -import javax.servlet.http.Cookie; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Publisher; -import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; -import org.springframework.util.MultiValueMap; -import org.springframework.web.context.request.async.AsyncWebRequest; -import org.springframework.web.context.request.async.DeferredResult; -import org.springframework.web.context.request.async.WebAsyncManager; -import org.springframework.web.context.request.async.WebAsyncUtils; -import org.springframework.web.servlet.ModelAndView; /** - * Implementation of {@link ServerResponse} based on a {@link CompletableFuture}. + * Asynchronous subtype of {@link ServerResponse} that exposes the future + * response. * * @author Arjen Poutsma - * @since 5.3 + * @since 5.3.2 * @see ServerResponse#async(Object) */ -final class AsyncServerResponse extends ErrorHandlingServerResponse { +public interface AsyncServerResponse extends ServerResponse { - static final boolean reactiveStreamsPresent = ClassUtils.isPresent( - "org.reactivestreams.Publisher", AsyncServerResponse.class.getClassLoader()); + /** + * Blocks indefinitely until the future response is obtained. + */ + ServerResponse block(); - private final CompletableFuture futureResponse; + // Static creation methods - @Nullable - private final Duration timeout; - - - private AsyncServerResponse(CompletableFuture futureResponse, @Nullable Duration timeout) { - this.futureResponse = futureResponse; - this.timeout = timeout; + /** + * Create a {@code AsyncServerResponse} with the given asynchronous response. + * Parameter {@code asyncResponse} can be a + * {@link CompletableFuture CompletableFuture<ServerResponse>} or + * {@link Publisher Publisher<ServerResponse>} (or any + * asynchronous producer of a single {@code ServerResponse} that can be + * adapted via the {@link ReactiveAdapterRegistry}). + * @param asyncResponse a {@code CompletableFuture} or + * {@code Publisher} + * @return the asynchronous response + */ + static AsyncServerResponse create(Object asyncResponse) { + return DefaultAsyncServerResponse.create(asyncResponse, null); } - @Override - public HttpStatus statusCode() { - return delegate(ServerResponse::statusCode); + /** + * Create a (built) response with the given asynchronous response. + * Parameter {@code asyncResponse} can be a + * {@link CompletableFuture CompletableFuture<ServerResponse>} or + * {@link Publisher Publisher<ServerResponse>} (or any + * asynchronous producer of a single {@code ServerResponse} that can be + * adapted via the {@link ReactiveAdapterRegistry}). + * @param asyncResponse a {@code CompletableFuture} or + * {@code Publisher} + * @param timeout maximum time period to wait for before timing out + * @return the asynchronous response + */ + static AsyncServerResponse create(Object asyncResponse, Duration timeout) { + return DefaultAsyncServerResponse.create(asyncResponse, timeout); } - @Override - public int rawStatusCode() { - return delegate(ServerResponse::rawStatusCode); - } - - @Override - public HttpHeaders headers() { - return delegate(ServerResponse::headers); - } - - @Override - public MultiValueMap cookies() { - return delegate(ServerResponse::cookies); - } - - private R delegate(Function function) { - ServerResponse response = this.futureResponse.getNow(null); - if (response != null) { - return function.apply(response); - } - else { - throw new IllegalStateException("Future ServerResponse has not yet completed"); - } - } - - @Nullable - @Override - public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context) - throws ServletException, IOException { - - writeAsync(request, response, createDeferredResult()); - return null; - } - - static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult deferredResult) - throws ServletException, IOException { - - WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); - AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); - asyncManager.setAsyncWebRequest(asyncWebRequest); - try { - asyncManager.startDeferredResultProcessing(deferredResult); - } - catch (IOException | ServletException ex) { - throw ex; - } - catch (Exception ex) { - throw new ServletException("Async processing failed", ex); - } - - } - - private DeferredResult createDeferredResult() { - DeferredResult result; - if (this.timeout != null) { - result = new DeferredResult<>(this.timeout.toMillis()); - } - else { - result = new DeferredResult<>(); - } - this.futureResponse.handle((value, ex) -> { - if (ex != null) { - if (ex instanceof CompletionException && ex.getCause() != null) { - ex = ex.getCause(); - } - result.setErrorResult(ex); - } - else { - result.setResult(value); - } - return null; - }); - return result; - } - - - @SuppressWarnings({"unchecked"}) - public static ServerResponse create(Object o, @Nullable Duration timeout) { - Assert.notNull(o, "Argument to async must not be null"); - - if (o instanceof CompletableFuture) { - CompletableFuture futureResponse = (CompletableFuture) o; - return new AsyncServerResponse(futureResponse, timeout); - } - else if (reactiveStreamsPresent) { - ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); - ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass()); - if (publisherAdapter != null) { - Publisher publisher = publisherAdapter.toPublisher(o); - ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class); - if (futureAdapter != null) { - CompletableFuture futureResponse = - (CompletableFuture) futureAdapter.fromPublisher(publisher); - return new AsyncServerResponse(futureResponse, timeout); - } - } - } - throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass()); - } - - } + diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java new file mode 100644 index 00000000000..0fd28344543 --- /dev/null +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultAsyncServerResponse.java @@ -0,0 +1,191 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.servlet.function; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import javax.servlet.ServletException; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.reactivestreams.Publisher; + +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.MultiValueMap; +import org.springframework.web.context.request.async.AsyncWebRequest; +import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.context.request.async.WebAsyncManager; +import org.springframework.web.context.request.async.WebAsyncUtils; +import org.springframework.web.servlet.ModelAndView; + +/** + * Default {@link AsyncServerResponse} implementation. + * + * @author Arjen Poutsma + * @since 5.3.2 + */ +final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse implements AsyncServerResponse { + + static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", DefaultAsyncServerResponse.class.getClassLoader()); + + private final CompletableFuture futureResponse; + + @Nullable + private final Duration timeout; + + + private DefaultAsyncServerResponse(CompletableFuture futureResponse, @Nullable Duration timeout) { + this.futureResponse = futureResponse; + this.timeout = timeout; + } + + @Override + public ServerResponse block() { + try { + if (this.timeout != null) { + return this.futureResponse.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS); + } + else { + return this.futureResponse.get(); + } + } + catch (InterruptedException | ExecutionException | TimeoutException ex) { + throw new IllegalStateException("Failed to get future response", ex); + } + } + + @Override + public HttpStatus statusCode() { + return delegate(ServerResponse::statusCode); + } + + @Override + public int rawStatusCode() { + return delegate(ServerResponse::rawStatusCode); + } + + @Override + public HttpHeaders headers() { + return delegate(ServerResponse::headers); + } + + @Override + public MultiValueMap cookies() { + return delegate(ServerResponse::cookies); + } + + private R delegate(Function function) { + ServerResponse response = this.futureResponse.getNow(null); + if (response != null) { + return function.apply(response); + } + else { + throw new IllegalStateException("Future ServerResponse has not yet completed"); + } + } + + @Nullable + @Override + public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context) + throws ServletException, IOException { + + writeAsync(request, response, createDeferredResult()); + return null; + } + + static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult deferredResult) + throws ServletException, IOException { + + WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); + AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); + asyncManager.setAsyncWebRequest(asyncWebRequest); + try { + asyncManager.startDeferredResultProcessing(deferredResult); + } + catch (IOException | ServletException ex) { + throw ex; + } + catch (Exception ex) { + throw new ServletException("Async processing failed", ex); + } + + } + + private DeferredResult createDeferredResult() { + DeferredResult result; + if (this.timeout != null) { + result = new DeferredResult<>(this.timeout.toMillis()); + } + else { + result = new DeferredResult<>(); + } + this.futureResponse.handle((value, ex) -> { + if (ex != null) { + if (ex instanceof CompletionException && ex.getCause() != null) { + ex = ex.getCause(); + } + result.setErrorResult(ex); + } + else { + result.setResult(value); + } + return null; + }); + return result; + } + + @SuppressWarnings({"unchecked"}) + public static AsyncServerResponse create(Object o, @Nullable Duration timeout) { + Assert.notNull(o, "Argument to async must not be null"); + + if (o instanceof CompletableFuture) { + CompletableFuture futureResponse = (CompletableFuture) o; + return new DefaultAsyncServerResponse(futureResponse, timeout); + } + else if (reactiveStreamsPresent) { + ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass()); + if (publisherAdapter != null) { + Publisher publisher = publisherAdapter.toPublisher(o); + ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class); + if (futureAdapter != null) { + CompletableFuture futureResponse = + (CompletableFuture) futureAdapter.fromPublisher(publisher); + return new DefaultAsyncServerResponse(futureResponse, timeout); + } + } + } + throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass()); + } + + +} diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java index f28b1d81745..4dfad874a4e 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultEntityResponseBuilder.java @@ -208,7 +208,7 @@ final class DefaultEntityResponseBuilder implements EntityResponse.Builder return new CompletionStageEntityResponse(this.status, this.headers, this.cookies, completionStage, this.entityType); } - else if (AsyncServerResponse.reactiveStreamsPresent) { + else if (DefaultAsyncServerResponse.reactiveStreamsPresent) { ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(this.entity.getClass()); if (adapter != null) { Publisher publisher = adapter.toPublisher(this.entity); @@ -362,7 +362,7 @@ final class DefaultEntityResponseBuilder implements EntityResponse.Builder Context context) throws ServletException, IOException { DeferredResult deferredResult = createDeferredResult(servletRequest, servletResponse, context); - AsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); + DefaultAsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); return null; } @@ -410,7 +410,7 @@ final class DefaultEntityResponseBuilder implements EntityResponse.Builder Context context) throws ServletException, IOException { DeferredResult deferredResult = new DeferredResult<>(); - AsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); + DefaultAsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult); entity().subscribe(new DeferredResultSubscriber(servletRequest, servletResponse, context, deferredResult)); return null; diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java index 85823b75b9c..11efe5b0223 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java @@ -236,7 +236,7 @@ public interface ServerResponse { * @since 5.3 */ static ServerResponse async(Object asyncResponse) { - return AsyncServerResponse.create(asyncResponse, null); + return DefaultAsyncServerResponse.create(asyncResponse, null); } /** @@ -257,7 +257,7 @@ public interface ServerResponse { * @since 5.3.2 */ static ServerResponse async(Object asyncResponse, Duration timeout) { - return AsyncServerResponse.create(asyncResponse, timeout); + return DefaultAsyncServerResponse.create(asyncResponse, timeout); } /** diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java index a641033b6ce..e078a66d110 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/SseServerResponse.java @@ -89,7 +89,7 @@ final class SseServerResponse extends AbstractServerResponse { result = new DeferredResult<>(); } - AsyncServerResponse.writeAsync(request, response, result); + DefaultAsyncServerResponse.writeAsync(request, response, result); this.sseConsumer.accept(new DefaultSseBuilder(response, context, result)); return null; } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java new file mode 100644 index 00000000000..8f3cf3546d9 --- /dev/null +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/function/DefaultAsyncServerResponseTests.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.servlet.function; + +import java.util.concurrent.CompletableFuture; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Arjen Poutsma + */ +class DefaultAsyncServerResponseTests { + + @Test + void block() { + ServerResponse wrappee = ServerResponse.ok().build(); + CompletableFuture future = CompletableFuture.completedFuture(wrappee); + AsyncServerResponse response = AsyncServerResponse.create(future); + + assertThat(response.block()).isSameAs(wrappee); + } + +}