Add setComplete + beforeCommit to ServerHttpResponse

setComplete replaces writeHeaders as a more general lifecycle method
to perform any kind of handling at the end of request processing, for
example to ensure headers are written if not already.

beforeCommit provides an extension point for an action to be invoked
just before the response is committed, e.g. adding headers/cookies.
This commit is contained in:
Rossen Stoyanchev 2016-01-19 22:17:18 -05:00
parent 03e6d7dabf
commit 3744549a3e
11 changed files with 294 additions and 98 deletions

View File

@ -15,12 +15,20 @@
*/
package org.springframework.http.server.reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rx.Stream;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* Base class for {@link ServerHttpResponse} implementations.
@ -31,7 +39,9 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private final HttpHeaders headers;
private boolean headersWritten = false;
private AtomicReference<State> state = new AtomicReference<>(State.NEW);
private final List<Supplier<? extends Mono<Void>>> beforeCommitActions = new ArrayList<>(4);
protected AbstractServerHttpResponse() {
@ -41,17 +51,54 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
@Override
public HttpHeaders getHeaders() {
return (this.headersWritten ? org.springframework.http.HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
if (State.COMITTED.equals(this.state.get())) {
return HttpHeaders.readOnlyHttpHeaders(this.headers);
}
return this.headers;
}
@Override
public Mono<Void> setBody(Publisher<DataBuffer> publisher) {
return Flux.from(publisher).lift(new WriteWithOperator<>(writeWithPublisher -> {
writeHeaders();
return setBodyInternal(writeWithPublisher);
})).after();
return Flux.from(publisher)
.lift(new WriteWithOperator<>(writePublisher ->
applyBeforeCommit().after(() -> setBodyInternal(writePublisher))))
.after();
}
private Mono<Void> applyBeforeCommit() {
return Stream.defer(() -> {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(State.NEW, State.COMMITTING)) {
for (Supplier<? extends Mono<Void>> action : this.beforeCommitActions) {
mono = mono.after(() -> action.get());
}
mono = mono.otherwise(ex -> {
// Ignore errors from beforeCommit actions
return Mono.empty();
});
mono = mono.after(() -> {
this.state.set(State.COMITTED);
writeHeaders();
writeCookies();
return Mono.empty();
});
}
return mono;
}).after();
}
/**
* Implement this method to apply header changes from {@link #getHeaders()}
* to the underlying response. This method is called once only.
*/
protected abstract void writeHeaders();
/**
* Implement this method to add cookies from {@link #getHeaders()} to the
* underlying response. This method is called once only.
*/
protected abstract void writeCookies();
/**
* Implement this method to write to the underlying the response.
* @param publisher the publisher to write with
@ -59,28 +106,17 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
protected abstract Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher);
@Override
public void writeHeaders() {
if (!this.headersWritten) {
try {
writeHeadersInternal();
writeCookies();
}
finally {
this.headersWritten = true;
}
}
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
Assert.notNull(action);
this.beforeCommitActions.add(action);
}
/**
* Implement this method to apply header changes from {@link #getHeaders()}
* to the underlying response. This method is called once only.
*/
protected abstract void writeHeadersInternal();
@Override
public Mono<Void> setComplete() {
return applyBeforeCommit();
}
/**
* Implement this method to add cookies from {@link #getHeaders()} to the
* underlying response. This method is called once only.
*/
protected abstract void writeCookies();
private enum State { NEW, COMMITTING, COMITTED }
}

View File

@ -61,7 +61,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected void writeHeadersInternal() {
protected void writeHeaders() {
for (String name : getHeaders().keySet()) {
for (String value : getHeaders().get(name)) {
this.channel.responseHeaders().add(name, value);

View File

@ -70,7 +70,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected void writeHeadersInternal() {
protected void writeHeaders() {
for (String name : getHeaders().keySet()) {
for (String value : getHeaders().get(name))
this.response.addHeader(name, value);

View File

@ -16,7 +16,10 @@
package org.springframework.http.server.reactive;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -35,15 +38,20 @@ public interface ServerHttpResponse extends ReactiveHttpOutputMessage {
void setStatusCode(HttpStatus status);
/**
* Use this method to apply header changes made via {@link #getHeaders()} to
* the underlying server response. By default changes made via
* {@link #getHeaders()} are cached until a call to {@link #setBody}
* implicitly applies header changes or until this method is called.
*
* <p><strong>Note:</strong> After this method is called,
* {@link #getHeaders() headers} become read-only and any additional calls
* to this method are ignored.
* Register an action to be applied just before the response is committed.
* @param action the action
*/
void writeHeaders();
void beforeCommit(Supplier<? extends Mono<Void>> action);
/**
* Indicate that request handling is complete, allowing for any cleanup or
* end-of-processing tasks to be performed such as applying header changes
* made via {@link #getHeaders()} to the underlying server response (if not
* applied already).
* <p>This method should be automatically invoked at the end of request
* processing so typically applications should not have to invoke it.
* If invoked multiple times it should have no side effects.
*/
Mono<Void> setComplete();
}

View File

@ -69,7 +69,7 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected void writeHeadersInternal() {
protected void writeHeaders() {
for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {
String headerName = entry.getKey();
for (String headerValue : entry.getValue()) {

View File

@ -71,7 +71,7 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected void writeHeadersInternal() {
protected void writeHeaders() {
for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {
HttpString headerName = HttpString.tryFromString(entry.getKey());
this.exchange.getResponseHeaders().addAll(headerName, entry.getValue());

View File

@ -43,17 +43,15 @@ public class WebToHttpHandlerAdapter extends WebHandlerDecorator implements Http
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
WebServerExchange exchange = createWebServerExchange(request, response);
return getDelegate().handle(exchange).otherwise(ex -> {
return getDelegate().handle(exchange)
.otherwise(ex -> {
if (logger.isDebugEnabled()) {
logger.debug("Could not complete request", ex);
}
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return Mono.empty();
})
.doOnTerminate((aVoid, ex) -> {
response.writeHeaders();
});
.after(response::setComplete);
}
protected WebServerExchange createWebServerExchange(ServerHttpRequest request, ServerHttpResponse response) {

View File

@ -20,8 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -30,13 +28,6 @@ import reactor.core.publisher.Mono;
import org.springframework.http.HttpCookie;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.server.reactive.boot.HttpServer;
import org.springframework.http.server.reactive.boot.JettyHttpServer;
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
import org.springframework.http.server.reactive.boot.RxNettyHttpServer;
import org.springframework.http.server.reactive.boot.TomcatHttpServer;
import org.springframework.http.server.reactive.boot.UndertowHttpServer;
import org.springframework.util.SocketUtils;
import org.springframework.web.client.RestTemplate;
import static org.hamcrest.CoreMatchers.equalTo;
@ -49,47 +40,16 @@ import static org.junit.Assert.assertThat;
* @author Rossen Stoyanchev
*/
@RunWith(Parameterized.class)
public class CookieIntegrationTests {
protected int port;
@Parameterized.Parameter(0)
public HttpServer server;
public class CookieIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private CookieHandler cookieHandler;
@Parameterized.Parameters(name = "server [{0}]")
public static Object[][] arguments() {
return new Object[][] {
{new JettyHttpServer()},
{new RxNettyHttpServer()},
{new ReactorHttpServer()},
{new TomcatHttpServer()},
{new UndertowHttpServer()}
};
}
@Before
public void setup() throws Exception {
this.port = SocketUtils.findAvailableTcpPort();
this.server.setPort(this.port);
this.server.setHandler(createHttpHandler());
this.server.afterPropertiesSet();
this.server.start();
}
@Override
protected HttpHandler createHttpHandler() {
this.cookieHandler = new CookieHandler();
return this.cookieHandler;
}
@After
public void tearDown() throws Exception {
this.server.stop();
}
@SuppressWarnings("unchecked")
@Test
@ -145,9 +105,8 @@ public class CookieIntegrationTests {
.path("/").secure().httpOnly().build());
response.getHeaders().addCookie(HttpCookie.serverCookie("lang", "en-US")
.domain("example.com").path("/").build());
response.writeHeaders();
return Mono.empty();
return response.setComplete();
}
}

View File

@ -15,6 +15,7 @@
*/
package org.springframework.http.server.reactive;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -27,7 +28,7 @@ import org.springframework.http.HttpStatus;
/**
* @author Rossen Stoyanchev
*/
public class MockServerHttpResponse implements ServerHttpResponse {
public class MockServerHttpResponse extends AbstractServerHttpResponse {
private HttpStatus status;
@ -50,18 +51,31 @@ public class MockServerHttpResponse implements ServerHttpResponse {
return this.headers;
}
@Override
public Mono<Void> setBody(Publisher<DataBuffer> body) {
this.body = body;
return Flux.from(body).after();
}
public Publisher<DataBuffer> getBody() {
return this.body;
}
@Override
public void writeHeaders() {
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> body) {
this.body = body;
return Flux.from(this.body).after();
}
@Override
protected void writeHeaders() {
}
@Override
protected void writeCookies() {
}
@Override
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
}
@Override
public Mono<Void> setComplete() {
return Mono.empty();
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpStatus;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
/**
* @author Rossen Stoyanchev
*/
public class ServerHttpResponseTests {
public static final Charset UTF_8 = Charset.forName("UTF-8");
@Test
public void setBody() throws Exception {
TestServerHttpResponse response = new TestServerHttpResponse();
response.setBody(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
assertTrue(response.headersWritten);
assertTrue(response.cookiesWritten);
assertEquals(3, response.content.size());
assertEquals("a", new String(response.content.get(0).asByteBuffer().array(), UTF_8));
assertEquals("b", new String(response.content.get(1).asByteBuffer().array(), UTF_8));
assertEquals("c", new String(response.content.get(2).asByteBuffer().array(), UTF_8));
}
@Test
public void setBodyWithError() throws Exception {
TestServerHttpResponse response = new TestServerHttpResponse();
IllegalStateException error = new IllegalStateException("boo");
response.setBody(Flux.error(error)).otherwise(ex -> Mono.empty()).get();
assertFalse(response.headersWritten);
assertFalse(response.cookiesWritten);
assertTrue(response.content.isEmpty());
}
@Test
public void setComplete() throws Exception {
TestServerHttpResponse response = new TestServerHttpResponse();
response.setComplete().get();
assertTrue(response.headersWritten);
assertTrue(response.cookiesWritten);
assertTrue(response.content.isEmpty());
}
@Test
public void beforeCommitWithSetBody() throws Exception {
HttpCookie cookie = HttpCookie.serverCookie("ID", "123").build();
TestServerHttpResponse response = new TestServerHttpResponse();
response.beforeCommit(() -> {
response.getHeaders().addCookie(cookie);
return Mono.empty();
});
response.setBody(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
assertTrue(response.headersWritten);
assertTrue(response.cookiesWritten);
assertSame(cookie, response.getHeaders().getCookies().get("ID").get(0));
assertEquals(3, response.content.size());
assertEquals("a", new String(response.content.get(0).asByteBuffer().array(), UTF_8));
assertEquals("b", new String(response.content.get(1).asByteBuffer().array(), UTF_8));
assertEquals("c", new String(response.content.get(2).asByteBuffer().array(), UTF_8));
}
@Test
public void beforeCommitActionWithError() throws Exception {
TestServerHttpResponse response = new TestServerHttpResponse();
IllegalStateException error = new IllegalStateException("boo");
response.beforeCommit(() -> Mono.error(error));
response.setBody(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
assertTrue("beforeCommit action errors should be ignored", response.headersWritten);
assertTrue("beforeCommit action errors should be ignored", response.cookiesWritten);
assertNull(response.getHeaders().getCookies().get("ID"));
assertEquals(3, response.content.size());
assertEquals("a", new String(response.content.get(0).asByteBuffer().array(), UTF_8));
assertEquals("b", new String(response.content.get(1).asByteBuffer().array(), UTF_8));
assertEquals("c", new String(response.content.get(2).asByteBuffer().array(), UTF_8));
}
@Test
public void beforeCommitActionWithSetComplete() throws Exception {
HttpCookie cookie = HttpCookie.serverCookie("ID", "123").build();
TestServerHttpResponse response = new TestServerHttpResponse();
response.beforeCommit(() -> {
response.getHeaders().addCookie(cookie);
return Mono.empty();
});
response.setComplete().get();
assertTrue(response.headersWritten);
assertTrue(response.cookiesWritten);
assertTrue(response.content.isEmpty());
assertSame(cookie, response.getHeaders().getCookies().get("ID").get(0));
}
private DataBuffer wrap(String a) {
return new DefaultDataBufferAllocator().wrap(ByteBuffer.wrap(a.getBytes(UTF_8)));
}
private static class TestServerHttpResponse extends AbstractServerHttpResponse {
private boolean headersWritten;
private boolean cookiesWritten;
private final List<DataBuffer> content = new ArrayList<>();
@Override
public void setStatusCode(HttpStatus status) {
}
@Override
protected void writeHeaders() {
assertFalse(this.headersWritten);
this.headersWritten = true;
}
@Override
protected void writeCookies() {
assertFalse(this.cookiesWritten);
this.cookiesWritten = true;
}
@Override
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
return Flux.from(publisher).map(b -> {
this.content.add(b);
return b;
}).after();
}
}
}

View File

@ -16,13 +16,18 @@
package org.springframework.web.server;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.MockServerHttpRequest;
import org.springframework.http.server.reactive.MockServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -45,8 +50,8 @@ public class FilteringWebHandlerTests {
@Before
public void setUp() throws Exception {
this.request = mock(ServerHttpRequest.class);
this.response = mock(ServerHttpResponse.class);
this.request = new MockServerHttpRequest(HttpMethod.GET, new URI("http://localhost"));
this.response = new MockServerHttpResponse();
}
@Test