Rename ReactiveHttpOutputMessage.setBody() to writeWith()
This commit is contained in:
parent
58307ebac4
commit
97155f1a30
|
@ -31,6 +31,7 @@ import org.springframework.core.io.buffer.DataBufferFactory;
|
|||
* on the server-side.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public interface ReactiveHttpOutputMessage extends HttpMessage {
|
||||
|
||||
|
@ -41,18 +42,20 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
|
|||
void beforeCommit(Supplier<? extends Mono<Void>> action);
|
||||
|
||||
/**
|
||||
* Set the body of the message to the given {@link Publisher} which will be
|
||||
* used to write to the underlying HTTP layer.
|
||||
* Use the given {@link Publisher} to write the body of the message to the underlying
|
||||
* HTTP layer, and flush the data when the complete signal is received (data could be
|
||||
* flushed before depending on the configuration, the HTTP engine and the amount of
|
||||
* data sent).
|
||||
*
|
||||
* @param body the body content publisher
|
||||
* @return a publisher that indicates completion or error.
|
||||
*/
|
||||
Mono<Void> setBody(Publisher<DataBuffer> body);
|
||||
Mono<Void> writeWith(Publisher<DataBuffer> body);
|
||||
|
||||
/**
|
||||
* Returns a {@link DataBufferFactory} that can be used for creating the body.
|
||||
* @return a buffer factory
|
||||
* @see #setBody(Publisher)
|
||||
* @see #writeWith(Publisher)
|
||||
*/
|
||||
DataBufferFactory bufferFactory();
|
||||
|
||||
|
|
|
@ -30,13 +30,13 @@ import reactor.core.publisher.Mono;
|
|||
public interface ZeroCopyHttpOutputMessage extends ReactiveHttpOutputMessage {
|
||||
|
||||
/**
|
||||
* Set the body of the message to the given {@link File} which will be
|
||||
* used to write to the underlying HTTP layer.
|
||||
* Use the given {@link File} to write the body of the message to the underlying
|
||||
* HTTP layer.
|
||||
* @param file the file to transfer
|
||||
* @param position the position within the file from which the transfer is to begin
|
||||
* @param count the number of bytes to be transferred
|
||||
* @return a publisher that indicates completion or error.
|
||||
*/
|
||||
Mono<Void> setBody(File file, long position, long count);
|
||||
Mono<Void> writeWith(File file, long position, long count);
|
||||
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ public interface ClientHttpRequestFactory {
|
|||
|
||||
/**
|
||||
* Create a new {@link ClientHttpRequest} for the specified HTTP method, URI and headers
|
||||
* <p>The returned request can be {@link ClientHttpRequest#setBody(Publisher) written to},
|
||||
* <p>The returned request can be {@link ClientHttpRequest#writeWith(Publisher) written to},
|
||||
* and then executed by calling {@link ClientHttpRequest#execute()}
|
||||
*
|
||||
* @param httpMethod the HTTP method to execute
|
||||
|
|
|
@ -89,7 +89,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
|
|||
* @see #execute()
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> setBody(Publisher<DataBuffer> body) {
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
|
||||
|
||||
this.body = Flux.from(body).map(this::toByteBuf);
|
||||
return Mono.empty();
|
||||
|
|
|
@ -77,7 +77,7 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
|||
* @see #execute()
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> setBody(Publisher<DataBuffer> body) {
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
|
||||
|
||||
this.body = RxJava1ObservableConverter.from(Flux.from(body)
|
||||
.map(b -> dataBufferFactory.wrap(b.asByteBuffer()).getNativeBuffer()));
|
||||
|
|
|
@ -131,6 +131,6 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
|
|||
DataBufferFactory dataBufferFactory = outputMessage.bufferFactory();
|
||||
Flux<DataBuffer> body =
|
||||
this.encoder.encode(inputStream, dataBufferFactory, type, contentType);
|
||||
return outputMessage.setBody(body);
|
||||
return outputMessage.writeWith(body);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ public class ResourceHttpMessageConverter extends CodecHttpMessageConverter<Reso
|
|||
(ZeroCopyHttpOutputMessage) outputMessage;
|
||||
|
||||
return zeroCopyResponse
|
||||
.setBody(file.get(), (long) 0, file.get().length());
|
||||
.writeWith(file.get(), (long) 0, file.get().length());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -89,9 +89,9 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setBody(Publisher<DataBuffer> publisher) {
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> publisher) {
|
||||
return new ChannelSendOperator<>(publisher, writePublisher ->
|
||||
applyBeforeCommit().then(() -> setBodyInternal(writePublisher)));
|
||||
applyBeforeCommit().then(() -> writeWithInternal(writePublisher)));
|
||||
}
|
||||
|
||||
protected Mono<Void> applyBeforeCommit() {
|
||||
|
@ -128,9 +128,9 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
|||
|
||||
/**
|
||||
* Implement this method to write to the underlying the response.
|
||||
* @param publisher the publisher to write with
|
||||
* @param body the publisher to write with
|
||||
*/
|
||||
protected abstract Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher);
|
||||
protected abstract Mono<Void> writeWithInternal(Publisher<DataBuffer> body);
|
||||
|
||||
@Override
|
||||
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
|
||||
|
|
|
@ -65,7 +65,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
|
||||
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
|
||||
return this.channel.send(Flux.from(publisher).map(this::toByteBuf));
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setBody(File file, long position, long count) {
|
||||
public Mono<Void> writeWith(File file, long position, long count) {
|
||||
return applyBeforeCommit().then(() -> {
|
||||
return this.channel.sendFile(file, position, count);
|
||||
});
|
||||
|
|
|
@ -63,7 +63,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
|
||||
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
|
||||
Observable<ByteBuf> content =
|
||||
RxJava1ObservableConverter.from(publisher).map(this::toByteBuf);
|
||||
Observable<Void> completion = this.response.write(content);
|
||||
|
@ -114,7 +114,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
|
|||
|
||||
|
||||
@Override
|
||||
public Mono<Void> setBody(File file, long position, long count) {
|
||||
public Mono<Void> writeWith(File file, long position, long count) {
|
||||
Channel channel = this.response.unsafeNettyChannel();
|
||||
|
||||
HttpResponse httpResponse =
|
||||
|
|
|
@ -68,7 +68,7 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
|
||||
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
|
||||
return this.responseBodyWriter.apply(publisher);
|
||||
}
|
||||
|
||||
|
|
|
@ -79,12 +79,12 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
|
||||
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
|
||||
return this.responseBodyWriter.apply(publisher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setBody(File file, long position, long count) {
|
||||
public Mono<Void> writeWith(File file, long position, long count) {
|
||||
writeHeaders();
|
||||
writeCookies();
|
||||
try {
|
||||
|
|
|
@ -148,7 +148,7 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
|
|||
.findFirst();
|
||||
|
||||
if (messageEncoder.isPresent()) {
|
||||
request.setBody(messageEncoder.get()
|
||||
request.writeWith(messageEncoder.get()
|
||||
.encode(this.contentPublisher, request.bufferFactory(),
|
||||
requestBodyType, mediaType));
|
||||
}
|
||||
|
|
|
@ -185,7 +185,7 @@ public class ViewResolutionResultHandler implements HandlerResultHandler, Ordere
|
|||
return viewMono.then(returnValue -> {
|
||||
if (returnValue instanceof View) {
|
||||
Flux<DataBuffer> body = ((View) returnValue).render(result, null, exchange);
|
||||
return exchange.getResponse().setBody(body);
|
||||
return exchange.getResponse().writeWith(body);
|
||||
}
|
||||
else if (returnValue instanceof CharSequence) {
|
||||
String viewName = returnValue.toString();
|
||||
|
@ -196,7 +196,7 @@ public class ViewResolutionResultHandler implements HandlerResultHandler, Ordere
|
|||
.otherwiseIfEmpty(handleUnresolvedViewName(viewName))
|
||||
.then(view -> {
|
||||
Flux<DataBuffer> body = view.render(result, null, exchange);
|
||||
return exchange.getResponse().setBody(body);
|
||||
return exchange.getResponse().writeWith(body);
|
||||
});
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -65,7 +65,7 @@ public class AsyncIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
@Override
|
||||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
return response.setBody(Flux.just("h", "e", "l", "l", "o")
|
||||
return response.writeWith(Flux.just("h", "e", "l", "l", "o")
|
||||
.useTimer(Timer.global())
|
||||
.delay(Duration.ofMillis(100))
|
||||
.publishOn(asyncGroup)
|
||||
|
|
|
@ -67,7 +67,7 @@ public class EchoHandlerIntegrationTests extends AbstractHttpHandlerIntegrationT
|
|||
|
||||
@Override
|
||||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
return response.setBody(request.getBody());
|
||||
return response.writeWith(request.getBody());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import java.net.URI;
|
|||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.HttpCookie;
|
||||
|
@ -96,7 +97,8 @@ public class MockServerHttpRequest implements ServerHttpRequest {
|
|||
return this.body;
|
||||
}
|
||||
|
||||
public void setBody(Publisher<DataBuffer> body) {
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
|
||||
this.body = Flux.from(body);
|
||||
return this.body.then();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setBody(Publisher<DataBuffer> body) {
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
|
||||
this.body = body;
|
||||
return Flux.from(this.body).then();
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio
|
|||
});
|
||||
|
||||
response.getHeaders().setContentLength(RESPONSE_SIZE);
|
||||
return response.setBody(multipleChunks());
|
||||
return response.writeWith(multipleChunks());
|
||||
}
|
||||
|
||||
private Publisher<DataBuffer> singleChunk() {
|
||||
|
|
|
@ -43,28 +43,28 @@ public class ServerHttpResponseTests {
|
|||
|
||||
|
||||
@Test
|
||||
public void setBody() throws Exception {
|
||||
public void writeWith() throws Exception {
|
||||
TestServerHttpResponse response = new TestServerHttpResponse();
|
||||
response.setBody(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
|
||||
response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
|
||||
|
||||
assertTrue(response.headersWritten);
|
||||
assertTrue(response.cookiesWritten);
|
||||
|
||||
assertEquals(3, response.content.size());
|
||||
assertEquals("a", new String(response.content.get(0).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("b", new String(response.content.get(1).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("c", new String(response.content.get(2).asByteBuffer().array(), UTF_8));
|
||||
assertEquals(3, response.body.size());
|
||||
assertEquals("a", new String(response.body.get(0).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("b", new String(response.body.get(1).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("c", new String(response.body.get(2).asByteBuffer().array(), UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setBodyWithError() throws Exception {
|
||||
public void writeWithWithComplete() throws Exception {
|
||||
TestServerHttpResponse response = new TestServerHttpResponse();
|
||||
IllegalStateException error = new IllegalStateException("boo");
|
||||
response.setBody(Flux.error(error)).otherwise(ex -> Mono.empty()).get();
|
||||
response.writeWith(Flux.error(error)).otherwise(ex -> Mono.empty()).get();
|
||||
|
||||
assertFalse(response.headersWritten);
|
||||
assertFalse(response.cookiesWritten);
|
||||
assertTrue(response.content.isEmpty());
|
||||
assertTrue(response.body.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -74,27 +74,27 @@ public class ServerHttpResponseTests {
|
|||
|
||||
assertTrue(response.headersWritten);
|
||||
assertTrue(response.cookiesWritten);
|
||||
assertTrue(response.content.isEmpty());
|
||||
assertTrue(response.body.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void beforeCommitWithSetBody() throws Exception {
|
||||
public void beforeCommitWithComplete() throws Exception {
|
||||
ResponseCookie cookie = ResponseCookie.from("ID", "123").build();
|
||||
TestServerHttpResponse response = new TestServerHttpResponse();
|
||||
response.beforeCommit(() -> {
|
||||
response.getCookies().add(cookie.getName(), cookie);
|
||||
return Mono.empty();
|
||||
});
|
||||
response.setBody(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
|
||||
response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
|
||||
|
||||
assertTrue(response.headersWritten);
|
||||
assertTrue(response.cookiesWritten);
|
||||
assertSame(cookie, response.getCookies().getFirst("ID"));
|
||||
|
||||
assertEquals(3, response.content.size());
|
||||
assertEquals("a", new String(response.content.get(0).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("b", new String(response.content.get(1).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("c", new String(response.content.get(2).asByteBuffer().array(), UTF_8));
|
||||
assertEquals(3, response.body.size());
|
||||
assertEquals("a", new String(response.body.get(0).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("b", new String(response.body.get(1).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("c", new String(response.body.get(2).asByteBuffer().array(), UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -102,16 +102,16 @@ public class ServerHttpResponseTests {
|
|||
TestServerHttpResponse response = new TestServerHttpResponse();
|
||||
IllegalStateException error = new IllegalStateException("boo");
|
||||
response.beforeCommit(() -> Mono.error(error));
|
||||
response.setBody(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
|
||||
response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).get();
|
||||
|
||||
assertTrue("beforeCommit action errors should be ignored", response.headersWritten);
|
||||
assertTrue("beforeCommit action errors should be ignored", response.cookiesWritten);
|
||||
assertNull(response.getCookies().get("ID"));
|
||||
|
||||
assertEquals(3, response.content.size());
|
||||
assertEquals("a", new String(response.content.get(0).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("b", new String(response.content.get(1).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("c", new String(response.content.get(2).asByteBuffer().array(), UTF_8));
|
||||
assertEquals(3, response.body.size());
|
||||
assertEquals("a", new String(response.body.get(0).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("b", new String(response.body.get(1).asByteBuffer().array(), UTF_8));
|
||||
assertEquals("c", new String(response.body.get(2).asByteBuffer().array(), UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -126,7 +126,7 @@ public class ServerHttpResponseTests {
|
|||
|
||||
assertTrue(response.headersWritten);
|
||||
assertTrue(response.cookiesWritten);
|
||||
assertTrue(response.content.isEmpty());
|
||||
assertTrue(response.body.isEmpty());
|
||||
assertSame(cookie, response.getCookies().getFirst("ID"));
|
||||
}
|
||||
|
||||
|
@ -143,7 +143,7 @@ public class ServerHttpResponseTests {
|
|||
|
||||
private boolean cookiesWritten;
|
||||
|
||||
private final List<DataBuffer> content = new ArrayList<>();
|
||||
private final List<DataBuffer> body = new ArrayList<>();
|
||||
|
||||
public TestServerHttpResponse() {
|
||||
super(new DefaultDataBufferFactory());
|
||||
|
@ -166,9 +166,9 @@ public class ServerHttpResponseTests {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
|
||||
return Flux.from(publisher).map(b -> {
|
||||
this.content.add(b);
|
||||
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
|
||||
return Flux.from(body).map(b -> {
|
||||
this.body.add(b);
|
||||
return b;
|
||||
}).then();
|
||||
}
|
||||
|
|
|
@ -84,7 +84,7 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
File logoFile = logo.getFile();
|
||||
zeroCopyResponse.getHeaders().setContentType(MediaType.IMAGE_PNG);
|
||||
zeroCopyResponse.getHeaders().setContentLength(logoFile.length());
|
||||
return zeroCopyResponse.setBody(logoFile, 0, logoFile.length());
|
||||
return zeroCopyResponse.writeWith(logoFile, 0, logoFile.length());
|
||||
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
|
|
|
@ -162,7 +162,7 @@ public class DispatcherHandlerErrorTests {
|
|||
this.request.getHeaders().setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
|
||||
DataBuffer buffer = new DefaultDataBufferFactory().allocateBuffer()
|
||||
.write("body".getBytes("UTF-8"));
|
||||
this.request.setBody(Mono.just(buffer));
|
||||
this.request.writeWith(Mono.just(buffer));
|
||||
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
@ -173,7 +173,7 @@ public class DispatcherHandlerErrorTests {
|
|||
@Test
|
||||
public void requestBodyError() throws Exception {
|
||||
this.request.setUri(new URI("/request-body"));
|
||||
this.request.setBody(Mono.error(EXCEPTION));
|
||||
this.request.writeWith(Mono.error(EXCEPTION));
|
||||
|
||||
Mono<Void> publisher = this.dispatcherHandler.handle(this.exchange);
|
||||
Throwable ex = awaitErrorSignal(publisher);
|
||||
|
|
|
@ -150,7 +150,7 @@ public class WebHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTe
|
|||
@Override
|
||||
public Mono<Void> handle(ServerWebExchange exchange) {
|
||||
DataBuffer buffer = asDataBuffer("foo");
|
||||
return exchange.getResponse().setBody(Flux.just(buffer));
|
||||
return exchange.getResponse().writeWith(Flux.just(buffer));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -159,7 +159,7 @@ public class WebHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTe
|
|||
@Override
|
||||
public Mono<Void> handle(ServerWebExchange exchange) {
|
||||
DataBuffer buffer = asDataBuffer("bar");
|
||||
return exchange.getResponse().setBody(Flux.just(buffer));
|
||||
return exchange.getResponse().writeWith(Flux.just(buffer));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue