diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
index 2eef55f5257..5a7753bc0e5 100644
--- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
+++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
@@ -90,11 +90,11 @@ public class HttpHandlerConnector implements ClientHttpConnector {
return Mono.empty();
});
- mockServerResponse.setWriteHandler(responseBody -> {
- log("Creating client response for ", httpMethod, uri);
- result.onNext(adaptResponse(mockServerResponse, responseBody));
- return Mono.empty();
- });
+ mockServerResponse.setWriteHandler(responseBody ->
+ Mono.fromRunnable(() -> {
+ log("Creating client response for ", httpMethod, uri);
+ result.onNext(adaptResponse(mockServerResponse, responseBody));
+ }));
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError);
diff --git a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
index 3a83764aa68..272b2823f97 100644
--- a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
+++ b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
@@ -45,6 +45,9 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Register an action to apply just before the HttpOutputMessage is committed.
+ *
Note: the supplied action must be properly deferred,
+ * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's
+ * executed in the right order, relative to other actions.
* @param action the action to apply
*/
void beforeCommit(Supplier extends Mono> action);
diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
index 3ad0e937000..ec6d5aaf94d 100644
--- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
+++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
@@ -209,13 +209,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
return Mono.empty();
}
- this.commitActions.add(() -> {
- applyStatusCode();
- applyHeaders();
- applyCookies();
- this.state.set(State.COMMITTED);
- return Mono.empty();
- });
+ this.commitActions.add(() ->
+ Mono.fromRunnable(() -> {
+ applyStatusCode();
+ applyHeaders();
+ applyCookies();
+ this.state.set(State.COMMITTED);
+ }));
if (writeAction != null) {
this.commitActions.add(writeAction);
@@ -224,7 +224,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
List extends Mono> actions = this.commitActions.stream()
.map(Supplier::get).collect(Collectors.toList());
- return Flux.concat(actions).next();
+ return Flux.concat(actions).then();
}
diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
index 3804bb4b8a9..d6c93f896c9 100644
--- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
+++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -111,28 +111,30 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
public Mono writeWith(File file, long position, long count) {
- return doCommit(() -> {
- FileChannel source = null;
- try {
- source = FileChannel.open(file.toPath(), StandardOpenOption.READ);
- StreamSinkChannel destination = this.exchange.getResponseChannel();
- Channels.transferBlocking(destination, source, position, count);
- return Mono.empty();
- }
- catch (IOException ex) {
- return Mono.error(ex);
- }
- finally {
- if (source != null) {
+ return doCommit(() ->
+ Mono.defer(() -> {
+ FileChannel source = null;
try {
- source.close();
+ source = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+ StreamSinkChannel destination = this.exchange.getResponseChannel();
+ Channels.transferBlocking(destination, source, position, count);
+ return Mono.empty();
}
catch (IOException ex) {
- // ignore
+ return Mono.error(ex);
}
- }
- }
- });
+ finally {
+ if (source != null) {
+ try {
+ source.close();
+ }
+ catch (IOException ex) {
+ // ignore
+ }
+ }
+ }
+
+ }));
}
diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java
index fb3c6ab5125..651e9c55bd8 100644
--- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java
+++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -99,10 +99,7 @@ public class ServerHttpResponseTests {
public void beforeCommitWithComplete() throws Exception {
ResponseCookie cookie = ResponseCookie.from("ID", "123").build();
TestServerHttpResponse response = new TestServerHttpResponse();
- response.beforeCommit(() -> {
- response.getCookies().add(cookie.getName(), cookie);
- return Mono.empty();
- });
+ response.beforeCommit(() -> Mono.fromRunnable(() -> response.getCookies().add(cookie.getName(), cookie)));
response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).block();
assertTrue(response.statusCodeWritten);
diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java
index e1323699be5..35bdf8e8e4c 100644
--- a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java
+++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2016 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,15 +54,11 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest
// Zero-copy only does not support servlet
assumeTrue(server instanceof ReactorHttpServer || server instanceof UndertowHttpServer);
- RestTemplate restTemplate = new RestTemplate();
+ URI url = new URI("http://localhost:" + port);
+ RequestEntity> request = RequestEntity.get(url).build();
+ ResponseEntity response = new RestTemplate().exchange(request, byte[].class);
- RequestEntity> request =
- RequestEntity.get(new URI("http://localhost:" + port)).build();
-
- ResponseEntity response = restTemplate.exchange(request, byte[].class);
-
- Resource logo =
- new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
+ Resource logo = new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
assertTrue(response.hasBody());
assertEquals(logo.contentLength(), response.getHeaders().getContentLength());
@@ -76,22 +72,16 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest
@Override
public Mono handle(ServerHttpRequest request, ServerHttpResponse response) {
try {
- ZeroCopyHttpOutputMessage zeroCopyResponse =
- (ZeroCopyHttpOutputMessage) response;
-
- Resource logo = new ClassPathResource("spring.png",
- ZeroCopyIntegrationTests.class);
+ ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;
+ Resource logo = new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
File logoFile = logo.getFile();
zeroCopyResponse.getHeaders().setContentType(MediaType.IMAGE_PNG);
zeroCopyResponse.getHeaders().setContentLength(logoFile.length());
return zeroCopyResponse.writeWith(logoFile, 0, logoFile.length());
-
}
catch (Throwable ex) {
return Mono.error(ex);
}
-
-
}
}
diff --git a/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java b/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java
index df2d0e89165..d81f1cb138e 100644
--- a/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java
+++ b/spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java
@@ -82,7 +82,7 @@ public class ResponseStatusExceptionHandlerTests {
Throwable ex = new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Oops");
this.exchange.getResponse().setStatusCode(HttpStatus.CREATED);
Mono mono = this.exchange.getResponse().setComplete()
- .then(this.handler.handle(this.exchange, ex));
+ .then(Mono.defer(() -> this.handler.handle(this.exchange, ex)));
StepVerifier.create(mono).consumeErrorWith(actual -> assertSame(ex, actual)).verify();
}