From a98be035a3ed1b7f37b6ca6573863118672cda4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Ferna=CC=81ndez?= Date: Thu, 24 Nov 2016 22:58:10 +0100 Subject: [PATCH] Make the signature of RHOM#writeAndFlush() more flexible This modifies the signature of ReactiveHttpOutputMessage#writeAndFlush(...) in order to be able to use Flux> objects as arguments of this method. Issue: SPR-14952 --- .../mock/http/server/reactive/MockServerHttpResponse.java | 6 +++--- .../springframework/http/ReactiveHttpOutputMessage.java | 2 +- .../http/client/reactive/ReactorClientHttpRequest.java | 2 +- .../reactive/AbstractListenerServerHttpResponse.java | 2 +- .../http/server/reactive/AbstractServerHttpResponse.java | 4 ++-- .../http/server/reactive/ReactorServerHttpResponse.java | 2 +- .../http/server/reactive/RxNettyServerHttpResponse.java | 2 +- .../http/codec/ServerSentEventHttpMessageWriterTests.java | 8 ++++---- .../http/server/reactive/ServerHttpResponseTests.java | 2 +- .../http/server/reactive/test/MockServerHttpResponse.java | 6 +++--- .../web/client/reactive/test/MockClientHttpRequest.java | 6 +++--- 11 files changed, 21 insertions(+), 21 deletions(-) diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index 32e00b35b3c..f89cdfc07b0 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -52,7 +52,7 @@ public class MockServerHttpResponse implements ServerHttpResponse { private Publisher body; - private Publisher> bodyWithFlushes; + private Publisher> bodyWithFlushes; private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @@ -81,7 +81,7 @@ public class MockServerHttpResponse implements ServerHttpResponse { return this.body; } - public Publisher> getBodyWithFlush() { + public Publisher> getBodyWithFlush() { return this.bodyWithFlushes; } @@ -92,7 +92,7 @@ public class MockServerHttpResponse implements ServerHttpResponse { } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = body; return Flux.from(this.bodyWithFlushes).then(); } diff --git a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 4568beea8dd..043e884663b 100644 --- a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -64,7 +64,7 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ - Mono writeAndFlushWith(Publisher> body); + Mono writeAndFlushWith(Publisher> body); /** * Indicate that message handling is complete, allowing for any cleanup or diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index f7f7c18d41e..d5822617a6d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -80,7 +80,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { Publisher> byteBufs = Flux.from(body). map(ReactorClientHttpRequest::toByteBufs); return applyBeforeCommit().then(this.httpRequest diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java index b14f95b30e7..e2558669e2f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -48,7 +48,7 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH } @Override - protected final Mono writeAndFlushWithInternal(Publisher> body) { + protected final Mono writeAndFlushWithInternal(Publisher> body) { if (this.writeCalled.compareAndSet(false, true)) { Processor, Void> bodyProcessor = createBodyFlushProcessor(); return Mono.from(subscriber -> { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 6ad0d47a3c9..519f5519341 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -131,7 +131,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { } @Override - public final Mono writeAndFlushWith(Publisher> body) { + public final Mono writeAndFlushWith(Publisher> body) { return new ChannelSendOperator<>(body, writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher))); } @@ -193,7 +193,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { * each {@code Publisher}. * @param body the publisher to write and flush with */ - protected abstract Mono writeAndFlushWithInternal(Publisher> body); + protected abstract Mono writeAndFlushWithInternal(Publisher> body); /** * Implement this method to write the status code to the underlying response. diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index 99e514f28b2..c7711d32959 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -76,7 +76,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse } @Override - protected Mono writeAndFlushWithInternal(Publisher> publisher) { + protected Mono writeAndFlushWithInternal(Publisher> publisher) { Publisher> body = Flux.from(publisher) .map(ReactorServerHttpResponse::toByteBufs); return this.response.sendGroups(body); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index 8135a3af8bd..7b3df4df6be 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -80,7 +80,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { @Override protected Mono writeAndFlushWithInternal( - Publisher> body) { + Publisher> body) { Flux bodyWithFlushSignals = Flux.from(body). flatMap(publisher -> Flux.from(publisher). map(NettyDataBufferFactory::toByteBuf). diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java index ecc0fb564dd..33e566cf371 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java @@ -72,7 +72,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = Flux.from(outputMessage.getBodyWithFlush()); + Publisher> result = Flux.from(outputMessage.getBodyWithFlush()); StepVerifier.create(result) .consumeNextWith(sseConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" + ":bla\n:bla bla\n:bla bla bla\n" + "data:bar\n")) @@ -87,7 +87,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(String.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = outputMessage.getBodyWithFlush(); + Publisher> result = outputMessage.getBodyWithFlush(); StepVerifier.create(result) .consumeNextWith(sseConsumer("data:foo\n")) .consumeNextWith(sseConsumer("data:bar\n")) @@ -102,7 +102,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(String.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = outputMessage.getBodyWithFlush(); + Publisher> result = outputMessage.getBodyWithFlush(); StepVerifier.create(result) .consumeNextWith(sseConsumer("data:foo\ndata:bar\n")) .consumeNextWith(sseConsumer("data:foo\ndata:baz\n")) @@ -118,7 +118,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(Pojo.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = outputMessage.getBodyWithFlush(); + Publisher> result = outputMessage.getBodyWithFlush(); StepVerifier.create(result) .consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", "\n")) .consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "\n")) diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java index 92654c5cd42..7c7e8fc2721 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java @@ -166,7 +166,7 @@ public class ServerHttpResponseTests { @Override protected Mono writeAndFlushWithInternal( - Publisher> body) { + Publisher> body) { return Mono.error(new UnsupportedOperationException()); } } diff --git a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java index d95ffc74341..c1f0f94b835 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java @@ -51,7 +51,7 @@ public class MockServerHttpResponse implements ServerHttpResponse { private Publisher body; - private Publisher> bodyWithFlushes; + private Publisher> bodyWithFlushes; private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @@ -81,7 +81,7 @@ public class MockServerHttpResponse implements ServerHttpResponse { return this.body; } - public Publisher> getBodyWithFlush() { + public Publisher> getBodyWithFlush() { return this.bodyWithFlushes; } @@ -92,7 +92,7 @@ public class MockServerHttpResponse implements ServerHttpResponse { } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = body; return Flux.from(this.bodyWithFlushes).then(); } diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java b/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java index 43eec685076..bf51e6c4c79 100644 --- a/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java @@ -43,7 +43,7 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { private Publisher body; - private Publisher> bodyWithFlushes; + private Publisher> bodyWithFlushes; public MockClientHttpRequest() { @@ -96,7 +96,7 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = body; return applyBeforeCommit().then(Flux.from(this.bodyWithFlushes).then()); } @@ -105,7 +105,7 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { return body; } - public Publisher> getBodyWithFlush() { + public Publisher> getBodyWithFlush() { return bodyWithFlushes; }