Make the signature of RHOM#writeAndFlush() more flexible

This modifies the signature of
ReactiveHttpOutputMessage#writeAndFlush(...) in order to
be able to use Flux<Flux<DataBuffer>> objects as arguments of
this method.

Issue: SPR-14952
This commit is contained in:
Daniel Fernández 2016-11-24 22:58:10 +01:00 committed by Sebastien Deleuze
parent a143b57d4b
commit a98be035a3
11 changed files with 21 additions and 21 deletions

View File

@ -52,7 +52,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
private Publisher<DataBuffer> body; private Publisher<DataBuffer> body;
private Publisher<Publisher<DataBuffer>> bodyWithFlushes; private Publisher<? extends Publisher<DataBuffer>> bodyWithFlushes;
private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@ -81,7 +81,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
return this.body; return this.body;
} }
public Publisher<Publisher<DataBuffer>> getBodyWithFlush() { public Publisher<? extends Publisher<DataBuffer>> getBodyWithFlush() {
return this.bodyWithFlushes; return this.bodyWithFlushes;
} }
@ -92,7 +92,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
} }
@Override @Override
public Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) { public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
this.bodyWithFlushes = body; this.bodyWithFlushes = body;
return Flux.from(this.bodyWithFlushes).then(); return Flux.from(this.bodyWithFlushes).then();
} }

View File

@ -64,7 +64,7 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
* @param body the body content publisher * @param body the body content publisher
* @return a {@link Mono} that indicates completion or error * @return a {@link Mono} that indicates completion or error
*/ */
Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body); Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body);
/** /**
* Indicate that message handling is complete, allowing for any cleanup or * Indicate that message handling is complete, allowing for any cleanup or

View File

@ -80,7 +80,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
} }
@Override @Override
public Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) { public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body). Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).
map(ReactorClientHttpRequest::toByteBufs); map(ReactorClientHttpRequest::toByteBufs);
return applyBeforeCommit().then(this.httpRequest return applyBeforeCommit().then(this.httpRequest

View File

@ -48,7 +48,7 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
} }
@Override @Override
protected final Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> body) { protected final Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<DataBuffer>> body) {
if (this.writeCalled.compareAndSet(false, true)) { if (this.writeCalled.compareAndSet(false, true)) {
Processor<Publisher<DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor(); Processor<Publisher<DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor();
return Mono.from(subscriber -> { return Mono.from(subscriber -> {

View File

@ -131,7 +131,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
} }
@Override @Override
public final Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) { public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
return new ChannelSendOperator<>(body, return new ChannelSendOperator<>(body,
writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher))); writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher)));
} }
@ -193,7 +193,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
* each {@code Publisher<DataBuffer>}. * each {@code Publisher<DataBuffer>}.
* @param body the publisher to write and flush with * @param body the publisher to write and flush with
*/ */
protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> body); protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<DataBuffer>> body);
/** /**
* Implement this method to write the status code to the underlying response. * Implement this method to write the status code to the underlying response.

View File

@ -76,7 +76,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
} }
@Override @Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> publisher) { protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<DataBuffer>> publisher) {
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher) Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
.map(ReactorServerHttpResponse::toByteBufs); .map(ReactorServerHttpResponse::toByteBufs);
return this.response.sendGroups(body); return this.response.sendGroups(body);

View File

@ -80,7 +80,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
@Override @Override
protected Mono<Void> writeAndFlushWithInternal( protected Mono<Void> writeAndFlushWithInternal(
Publisher<Publisher<DataBuffer>> body) { Publisher<? extends Publisher<DataBuffer>> body) {
Flux<ByteBuf> bodyWithFlushSignals = Flux.from(body). Flux<ByteBuf> bodyWithFlushSignals = Flux.from(body).
flatMap(publisher -> Flux.from(publisher). flatMap(publisher -> Flux.from(publisher).
map(NettyDataBufferFactory::toByteBuf). map(NettyDataBufferFactory::toByteBuf).

View File

@ -72,7 +72,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class), messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
Publisher<Publisher<DataBuffer>> result = Flux.from(outputMessage.getBodyWithFlush()); Publisher<? extends Publisher<DataBuffer>> result = Flux.from(outputMessage.getBodyWithFlush());
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith(sseConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" + .consumeNextWith(sseConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" +
":bla\n:bla bla\n:bla bla bla\n" + "data:bar\n")) ":bla\n:bla bla\n:bla bla bla\n" + "data:bar\n"))
@ -87,7 +87,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
messageWriter.write(source, ResolvableType.forClass(String.class), messageWriter.write(source, ResolvableType.forClass(String.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush(); Publisher<? extends Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith(sseConsumer("data:foo\n")) .consumeNextWith(sseConsumer("data:foo\n"))
.consumeNextWith(sseConsumer("data:bar\n")) .consumeNextWith(sseConsumer("data:bar\n"))
@ -102,7 +102,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
messageWriter.write(source, ResolvableType.forClass(String.class), messageWriter.write(source, ResolvableType.forClass(String.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush(); Publisher<? extends Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith(sseConsumer("data:foo\ndata:bar\n")) .consumeNextWith(sseConsumer("data:foo\ndata:bar\n"))
.consumeNextWith(sseConsumer("data:foo\ndata:baz\n")) .consumeNextWith(sseConsumer("data:foo\ndata:baz\n"))
@ -118,7 +118,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
messageWriter.write(source, ResolvableType.forClass(Pojo.class), messageWriter.write(source, ResolvableType.forClass(Pojo.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush(); Publisher<? extends Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", "\n")) .consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", "\n"))
.consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "\n")) .consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "\n"))

View File

@ -166,7 +166,7 @@ public class ServerHttpResponseTests {
@Override @Override
protected Mono<Void> writeAndFlushWithInternal( protected Mono<Void> writeAndFlushWithInternal(
Publisher<Publisher<DataBuffer>> body) { Publisher<? extends Publisher<DataBuffer>> body) {
return Mono.error(new UnsupportedOperationException()); return Mono.error(new UnsupportedOperationException());
} }
} }

View File

@ -51,7 +51,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
private Publisher<DataBuffer> body; private Publisher<DataBuffer> body;
private Publisher<Publisher<DataBuffer>> bodyWithFlushes; private Publisher<? extends Publisher<DataBuffer>> bodyWithFlushes;
private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@ -81,7 +81,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
return this.body; return this.body;
} }
public Publisher<Publisher<DataBuffer>> getBodyWithFlush() { public Publisher<? extends Publisher<DataBuffer>> getBodyWithFlush() {
return this.bodyWithFlushes; return this.bodyWithFlushes;
} }
@ -92,7 +92,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
} }
@Override @Override
public Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) { public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
this.bodyWithFlushes = body; this.bodyWithFlushes = body;
return Flux.from(this.bodyWithFlushes).then(); return Flux.from(this.bodyWithFlushes).then();
} }

View File

@ -43,7 +43,7 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
private Publisher<DataBuffer> body; private Publisher<DataBuffer> body;
private Publisher<Publisher<DataBuffer>> bodyWithFlushes; private Publisher<? extends Publisher<DataBuffer>> bodyWithFlushes;
public MockClientHttpRequest() { public MockClientHttpRequest() {
@ -96,7 +96,7 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
} }
@Override @Override
public Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) { public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
this.bodyWithFlushes = body; this.bodyWithFlushes = body;
return applyBeforeCommit().then(Flux.from(this.bodyWithFlushes).then()); return applyBeforeCommit().then(Flux.from(this.bodyWithFlushes).then());
} }
@ -105,7 +105,7 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
return body; return body;
} }
public Publisher<Publisher<DataBuffer>> getBodyWithFlush() { public Publisher<? extends Publisher<DataBuffer>> getBodyWithFlush() {
return bodyWithFlushes; return bodyWithFlushes;
} }