From 72bbb2619dcd2d71f4333fb501a7495ef34eec53 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 15 Mar 2018 23:12:24 -0400 Subject: [PATCH] Commit actions are (properly) deferred Issue: SPR-16597 --- .../reactive/server/HttpHandlerConnector.java | 10 ++--- .../http/ReactiveHttpOutputMessage.java | 3 ++ .../reactive/AbstractServerHttpResponse.java | 16 ++++---- .../reactive/UndertowServerHttpResponse.java | 40 ++++++++++--------- .../reactive/ServerHttpResponseTests.java | 7 +--- .../reactive/ZeroCopyIntegrationTests.java | 24 ++++------- .../ResponseStatusExceptionHandlerTests.java | 2 +- 7 files changed, 47 insertions(+), 55 deletions(-) diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java index 2eef55f5257..5a7753bc0e5 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java @@ -90,11 +90,11 @@ public class HttpHandlerConnector implements ClientHttpConnector { return Mono.empty(); }); - mockServerResponse.setWriteHandler(responseBody -> { - log("Creating client response for ", httpMethod, uri); - result.onNext(adaptResponse(mockServerResponse, responseBody)); - return Mono.empty(); - }); + mockServerResponse.setWriteHandler(responseBody -> + Mono.fromRunnable(() -> { + log("Creating client response for ", httpMethod, uri); + result.onNext(adaptResponse(mockServerResponse, responseBody)); + })); log("Writing client request for ", httpMethod, uri); requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError); diff --git a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 3a83764aa68..272b2823f97 100644 --- a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -45,6 +45,9 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { /** * Register an action to apply just before the HttpOutputMessage is committed. + *

Note: the supplied action must be properly deferred, + * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's + * executed in the right order, relative to other actions. * @param action the action to apply */ void beforeCommit(Supplier> action); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 3ad0e937000..ec6d5aaf94d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -209,13 +209,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { return Mono.empty(); } - this.commitActions.add(() -> { - applyStatusCode(); - applyHeaders(); - applyCookies(); - this.state.set(State.COMMITTED); - return Mono.empty(); - }); + this.commitActions.add(() -> + Mono.fromRunnable(() -> { + applyStatusCode(); + applyHeaders(); + applyCookies(); + this.state.set(State.COMMITTED); + })); if (writeAction != null) { this.commitActions.add(writeAction); @@ -224,7 +224,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { List> actions = this.commitActions.stream() .map(Supplier::get).collect(Collectors.toList()); - return Flux.concat(actions).next(); + return Flux.concat(actions).then(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 3804bb4b8a9..d6c93f896c9 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,28 +111,30 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @Override public Mono writeWith(File file, long position, long count) { - return doCommit(() -> { - FileChannel source = null; - try { - source = FileChannel.open(file.toPath(), StandardOpenOption.READ); - StreamSinkChannel destination = this.exchange.getResponseChannel(); - Channels.transferBlocking(destination, source, position, count); - return Mono.empty(); - } - catch (IOException ex) { - return Mono.error(ex); - } - finally { - if (source != null) { + return doCommit(() -> + Mono.defer(() -> { + FileChannel source = null; try { - source.close(); + source = FileChannel.open(file.toPath(), StandardOpenOption.READ); + StreamSinkChannel destination = this.exchange.getResponseChannel(); + Channels.transferBlocking(destination, source, position, count); + return Mono.empty(); } catch (IOException ex) { - // ignore + return Mono.error(ex); } - } - } - }); + finally { + if (source != null) { + try { + source.close(); + } + catch (IOException ex) { + // ignore + } + } + } + + })); } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java index fb3c6ab5125..651e9c55bd8 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -99,10 +99,7 @@ public class ServerHttpResponseTests { public void beforeCommitWithComplete() throws Exception { ResponseCookie cookie = ResponseCookie.from("ID", "123").build(); TestServerHttpResponse response = new TestServerHttpResponse(); - response.beforeCommit(() -> { - response.getCookies().add(cookie.getName(), cookie); - return Mono.empty(); - }); + response.beforeCommit(() -> Mono.fromRunnable(() -> response.getCookies().add(cookie.getName(), cookie))); response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).block(); assertTrue(response.statusCodeWritten); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java index e1323699be5..35bdf8e8e4c 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,15 +54,11 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest // Zero-copy only does not support servlet assumeTrue(server instanceof ReactorHttpServer || server instanceof UndertowHttpServer); - RestTemplate restTemplate = new RestTemplate(); + URI url = new URI("http://localhost:" + port); + RequestEntity request = RequestEntity.get(url).build(); + ResponseEntity response = new RestTemplate().exchange(request, byte[].class); - RequestEntity request = - RequestEntity.get(new URI("http://localhost:" + port)).build(); - - ResponseEntity response = restTemplate.exchange(request, byte[].class); - - Resource logo = - new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class); + Resource logo = new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class); assertTrue(response.hasBody()); assertEquals(logo.contentLength(), response.getHeaders().getContentLength()); @@ -76,22 +72,16 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { try { - ZeroCopyHttpOutputMessage zeroCopyResponse = - (ZeroCopyHttpOutputMessage) response; - - Resource logo = new ClassPathResource("spring.png", - ZeroCopyIntegrationTests.class); + ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response; + Resource logo = new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class); File logoFile = logo.getFile(); zeroCopyResponse.getHeaders().setContentType(MediaType.IMAGE_PNG); zeroCopyResponse.getHeaders().setContentLength(logoFile.length()); return zeroCopyResponse.writeWith(logoFile, 0, logoFile.length()); - } catch (Throwable ex) { return Mono.error(ex); } - - } } diff --git a/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java b/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java index df2d0e89165..d81f1cb138e 100644 --- a/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java +++ b/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java @@ -82,7 +82,7 @@ public class ResponseStatusExceptionHandlerTests { Throwable ex = new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Oops"); this.exchange.getResponse().setStatusCode(HttpStatus.CREATED); Mono mono = this.exchange.getResponse().setComplete() - .then(this.handler.handle(this.exchange, ex)); + .then(Mono.defer(() -> this.handler.handle(this.exchange, ex))); StepVerifier.create(mono).consumeErrorWith(actual -> assertSame(ex, actual)).verify(); }