Use Reactor 2.5 Flux and Mono Rx light API

Flux and Mono are used both for implementation and exposed at API
level to express 1 versus N semantic and to provide default Rx
operators:
- Flux<T> for multiple values Publisher (issue #48)
- Mono<T> for single value Publisher (issue #50)
- Mono<Void> for Publisher with no value (issue #49)
This commit is contained in:
Sebastien Deleuze 2015-12-16 18:05:37 +01:00
parent 8ef2ce44f4
commit 8ef7e2ff77
77 changed files with 354 additions and 424 deletions

View File

@ -20,9 +20,9 @@ group = 'org.springframework.reactive'
repositories {
mavenCentral()
mavenLocal()
maven { url 'https://oss.jfrog.org/libs-snapshot' } // RxNetty 0.5.x snapshots
maven { url 'http://repo.spring.io/snapshot' } // Reactor snapshot
mavenLocal()
}
configurations.all {
@ -32,7 +32,7 @@ configurations.all {
ext {
springVersion = '4.2.3.RELEASE'
reactorVersion = '2.1.0.BUILD-SNAPSHOT'
reactorVersion = '2.5.0.BUILD-SNAPSHOT'
tomcatVersion = '8.0.28'
jettyVersion = '9.3.5.v20151012'
}

View File

@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType;
@ -49,7 +50,7 @@ public interface Decoder<T> {
* @param hints Additional information about how to do decode, optional.
* @return the output stream
*/
Publisher<T> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
Flux<T> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints);
/**

View File

@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType;
@ -49,7 +50,7 @@ public interface Encoder<T> {
* @param hints Additional information about how to do decode, optional.
* @return the output stream
*/
Publisher<ByteBuffer> encode(Publisher<? extends T> inputStream, ResolvableType type,
Flux<ByteBuffer> encode(Publisher<? extends T> inputStream, ResolvableType type,
MimeType mimeType, Object... hints);
/**

View File

@ -19,6 +19,7 @@ package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType;
@ -42,10 +43,10 @@ public class ByteBufferDecoder extends AbstractDecoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return inputStream;
return Flux.from(inputStream);
}
}

View File

@ -19,6 +19,7 @@ package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType;
@ -42,11 +43,11 @@ public class ByteBufferEncoder extends AbstractEncoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> inputStream, ResolvableType type,
public Flux<ByteBuffer> encode(Publisher<? extends ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
//noinspection unchecked
return (Publisher<ByteBuffer>) inputStream;
return Flux.from((Publisher<ByteBuffer>)inputStream);
}
}

View File

@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
@ -62,16 +62,17 @@ public class JacksonJsonDecoder extends AbstractDecoder<Object> {
@Override
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
ObjectReader reader = this.mapper.readerFor(type.getRawClass());
Flux<ByteBuffer> stream = Flux.from(inputStream);
if (this.preProcessor != null) {
inputStream = this.preProcessor.decode(inputStream, type, mimeType, hints);
stream = this.preProcessor.decode(inputStream, type, mimeType, hints);
}
return Publishers.map(inputStream, content -> {
return stream.map(content -> {
try {
return reader.readValue(new ByteBufferInputStream(content));
}

View File

@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
@ -61,10 +61,10 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends Object> inputStream,
public Flux<ByteBuffer> encode(Publisher<? extends Object> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) {
Publisher<ByteBuffer> stream = Publishers.map(inputStream, value -> {
Flux<ByteBuffer> stream = Flux.from(inputStream).map(value -> {
Buffer buffer = new Buffer();
BufferOutputStream outputStream = new BufferOutputStream(buffer);
try {

View File

@ -34,7 +34,7 @@ import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.XMLReaderFactory;
import reactor.Publishers;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
@ -60,7 +60,7 @@ public class Jaxb2Decoder extends AbstractDecoder<Object> {
@Override
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
Class<?> outputClass = type.getRawClass();
@ -68,19 +68,19 @@ public class Jaxb2Decoder extends AbstractDecoder<Object> {
Source source = processSource(new StreamSource(new ByteBufferPublisherInputStream(inputStream)));
Unmarshaller unmarshaller = createUnmarshaller(outputClass);
if (outputClass.isAnnotationPresent(XmlRootElement.class)) {
return Publishers.just(unmarshaller.unmarshal(source));
return Flux.just(unmarshaller.unmarshal(source));
}
else {
JAXBElement<?> jaxbElement = unmarshaller.unmarshal(source, outputClass);
return Publishers.just(jaxbElement.getValue());
return Flux.just(jaxbElement.getValue());
}
}
catch (UnmarshalException ex) {
return Publishers.error(
return Flux.error(
new CodecException("Could not unmarshal to [" + outputClass + "]: " + ex.getMessage(), ex));
}
catch (JAXBException ex) {
return Publishers.error(new CodecException("Could not instantiate JAXBContext: " +
return Flux.error(new CodecException("Could not instantiate JAXBContext: " +
ex.getMessage(), ex));
}
}

View File

@ -26,7 +26,7 @@ import javax.xml.bind.MarshalException;
import javax.xml.bind.Marshaller;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
@ -54,10 +54,10 @@ public class Jaxb2Encoder extends AbstractEncoder<Object> {
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends Object> messageStream, ResolvableType type,
public Flux<ByteBuffer> encode(Publisher<? extends Object> messageStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return Publishers.map(messageStream, value -> {
return Flux.from(messageStream).map(value -> {
try {
Buffer buffer = new Buffer();
BufferOutputStream outputStream = new BufferOutputStream(buffer);

View File

@ -25,7 +25,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.fn.Function;
import org.springframework.core.ResolvableType;
@ -95,10 +95,10 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return Publishers.flatMap(inputStream, new Function<ByteBuffer, Publisher<? extends ByteBuffer>>() {
return Flux.from(inputStream).flatMap(new Function<ByteBuffer, Publisher<? extends ByteBuffer>>() {
int openBraces;
int index;
@ -120,13 +120,13 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
}
if (this.state == ST_CORRUPTED) {
this.input.skipBytes(this.input.readableBytes());
return Publishers.error(new IllegalStateException("Corrupted stream"));
return Flux.error(new IllegalStateException("Corrupted stream"));
}
if (this.writerIndex > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the complete buffer.
this.input.skipBytes(this.input.readableBytes());
reset();
return Publishers.error(new IllegalStateException("object length exceeds " +
return Flux.error(new IllegalStateException("object length exceeds " +
maxObjectLength + ": " + this.writerIndex + " bytes discarded"));
}
for (/* use current index */; this.index < this.writerIndex; this.index++) {
@ -199,7 +199,7 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
}
else {
this.state = ST_CORRUPTED;
return Publishers.error(new IllegalStateException(
return Flux.error(new IllegalStateException(
"invalid JSON received at byte position " + this.index + ": " +
ByteBufUtil.hexDump(this.input)));
}
@ -208,7 +208,7 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
if (this.input.readableBytes() == 0) {
this.index = 0;
}
return Publishers.from(chunks);
return Flux.fromIterable(chunks);
}
/**

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.Flux;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.support.BackpressureUtils;
import reactor.io.buffer.Buffer;
@ -30,8 +31,6 @@ import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType;
import static reactor.Publishers.lift;
/**
* Encode a byte stream of individual JSON element to a byte stream representing
* a single JSON array when if it contains more than one element.
@ -49,11 +48,11 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream,
public Flux<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream,
ResolvableType type, MimeType mimeType, Object... hints) {
//noinspection Convert2MethodRef
return lift(messageStream, bbs -> new JsonEncoderBarrier(bbs));
return Flux.from(messageStream).lift(bbs -> new JsonEncoderBarrier(bbs));
}

View File

@ -21,7 +21,7 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
@ -48,7 +48,7 @@ public class StringDecoder extends AbstractDecoder<String> {
}
@Override
public Publisher<String> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<String> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
Charset charset;
@ -58,7 +58,7 @@ public class StringDecoder extends AbstractDecoder<String> {
else {
charset = DEFAULT_CHARSET;
}
return Publishers.map(inputStream, content -> new String(new Buffer(content).asBytes(), charset));
return Flux.from(inputStream).map(content -> new String(new Buffer(content).asBytes(), charset));
}
}

View File

@ -21,7 +21,7 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType;
@ -49,7 +49,7 @@ public class StringEncoder extends AbstractEncoder<String> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends String> elementStream,
public Flux<ByteBuffer> encode(Publisher<? extends String> elementStream,
ResolvableType type, MimeType mimeType, Object... hints) {
Charset charset;
@ -59,7 +59,7 @@ public class StringEncoder extends AbstractEncoder<String> {
else {
charset = DEFAULT_CHARSET;
}
return Publishers.map(elementStream, s -> ByteBuffer.wrap(s.getBytes(charset)));
return Flux.from(elementStream).map(s -> ByteBuffer.wrap(s.getBytes(charset)));
}
}

View File

@ -31,7 +31,7 @@ import org.springframework.core.convert.converter.GenericConverter;
* @author Stephane Maldini
* @author Sebastien Deleuze
*/
public final class ReactiveStreamsToReactorConverter implements GenericConverter {
public final class ReactiveStreamsToReactorStreamConverter implements GenericConverter {
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
@ -52,13 +52,13 @@ public final class ReactiveStreamsToReactorConverter implements GenericConverter
return source;
}
else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.wrap((Publisher)source);
return Streams.from((Publisher)source);
}
else if (Promise.class.isAssignableFrom(source.getClass())) {
return source;
}
else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.wrap((Publisher)source).next();
return Streams.from((Publisher)source).promise();
}
return null;
}

View File

@ -19,6 +19,7 @@ package org.springframework.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Flux;
/**
* An "reactive" HTTP input message that exposes the input as {@link Publisher}.
@ -34,6 +35,6 @@ public interface ReactiveHttpInputMessage extends HttpMessage {
* Return the body of the message as a {@link Publisher}.
* @return the body content publisher
*/
Publisher<ByteBuffer> getBody();
Flux<ByteBuffer> getBody();
}

View File

@ -19,6 +19,7 @@ package org.springframework.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Mono;
/**
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
@ -37,6 +38,6 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
* @param body the body content publisher
* @return a publisher that indicates completion or error.
*/
Publisher<Void> setBody(Publisher<ByteBuffer> body);
Mono<Void> setBody(Publisher<ByteBuffer> body);
}

View File

@ -18,8 +18,7 @@ package org.springframework.http.server.reactive;
import java.util.Arrays;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.util.Assert;
@ -44,13 +43,13 @@ public class ErrorHandlingHttpHandler extends HttpHandlerDecorator {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Publisher<Void> publisher;
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Mono<Void> publisher;
try {
publisher = getDelegate().handle(request, response);
}
catch (Throwable ex) {
publisher = Publishers.error(ex);
publisher = Mono.error(ex);
}
for (HttpExceptionHandler handler : this.exceptionHandlers) {
publisher = applyExceptionHandler(publisher, handler, request, response);
@ -58,12 +57,10 @@ public class ErrorHandlingHttpHandler extends HttpHandlerDecorator {
return publisher;
}
private static Publisher<Void> applyExceptionHandler(Publisher<Void> publisher,
private static Mono<Void> applyExceptionHandler(Mono<Void> publisher,
HttpExceptionHandler handler, ServerHttpRequest request, ServerHttpResponse response) {
return Publishers.onErrorResumeNext(publisher, ex -> {
return handler.handle(request, response, ex);
});
return publisher.flux().onErrorResumeWith(ex -> handler.handle(request, response, ex)).after();
}
}

View File

@ -19,8 +19,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.Mono;
/**
* {@link HttpHandler} that delegates to a chain of {@link HttpFilter}s followed
@ -40,7 +39,7 @@ public class FilterChainHttpHandler extends HttpHandlerDecorator {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return new DefaultHttpFilterChain().filter(request, response);
}
@ -50,7 +49,7 @@ public class FilterChainHttpHandler extends HttpHandlerDecorator {
private int index;
@Override
public Publisher<Void> filter(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response) {
if (this.index < filters.size()) {
HttpFilter filter = filters.get(this.index++);
return filter.filter(request, response, this);

View File

@ -15,7 +15,7 @@
*/
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
/**
* A contract for resolving exceptions from HTTP request handling.
@ -38,6 +38,6 @@ public interface HttpExceptionHandler {
* @param ex the exception to handle
* @return Publisher to indicate when exception handling is complete.
*/
Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex);
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex);
}

View File

@ -17,6 +17,7 @@
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
/**
* Contract for interception-style, chained processing of HTTP requests.
@ -40,7 +41,7 @@ public interface HttpFilter {
* @param chain provides a way to delegate to the next HttpFilter.
* @return Publisher to indicate when request processing is complete.
*/
Publisher<Void> filter(ServerHttpRequest request, ServerHttpResponse response,
Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response,
HttpFilterChain chain);
}

View File

@ -15,8 +15,7 @@
*/
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
/**
* Represents a chain of {@link HttpFilter}s allowing each {@link HttpFilter} to
@ -32,6 +31,6 @@ public interface HttpFilterChain {
* @param response current HTTP response.
* @return Publisher to indicate when request handling is complete.
*/
Publisher<Void> filter(ServerHttpRequest request, ServerHttpResponse response);
Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response);
}

View File

@ -16,7 +16,7 @@
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
/**
* Contract for handling HTTP requests in a non-blocking way.
@ -35,6 +35,6 @@ public interface HttpHandler {
* @param response current HTTP response.
* @return Publisher to indicate when request handling is complete.
*/
Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}

View File

@ -15,7 +15,7 @@
*/
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
import org.springframework.util.Assert;
@ -40,7 +40,7 @@ public class HttpHandlerDecorator implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return this.delegate.handle(request, response);
}

View File

@ -15,8 +15,7 @@
*/
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.http.HttpStatus;
@ -29,9 +28,9 @@ public class InternalServerErrorExceptionHandler implements HttpExceptionHandler
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return Publishers.empty();
return Mono.empty();
}
}

View File

@ -15,14 +15,11 @@
*/
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.net.ReactiveChannelHandler;
import reactor.io.net.http.HttpChannel;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorServerHttpRequest;
import org.springframework.http.server.reactive.ReactorServerHttpResponse;
import org.springframework.util.Assert;
/**
@ -40,7 +37,7 @@ public class ReactorHttpHandlerAdapter
}
@Override
public Publisher<Void> apply(HttpChannel<Buffer, Buffer> channel) {
public Mono<Void> apply(HttpChannel<Buffer, Buffer> channel) {
ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel);
ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel);
return this.httpHandler.handle(adaptedRequest, adaptedResponse);

View File

@ -19,8 +19,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
@ -82,8 +81,8 @@ public class ReactorServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<ByteBuffer> getBody() {
return Publishers.map(this.channel.input(), Buffer::byteBuffer);
public Flux<ByteBuffer> getBody() {
return Flux.from(this.channel.input()).map(Buffer::byteBuffer);
}
}

View File

@ -19,7 +19,8 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.Status;
@ -64,12 +65,12 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
public Mono<Void> setBody(Publisher<ByteBuffer> publisher) {
return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after();
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return getReactorChannel().writeWith(Publishers.map(publisher, Buffer::new));
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return Mono.from(getReactorChannel().writeWith(Flux.from(publisher).map(Buffer::new)));
}

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import org.reactivestreams.Publisher;
import reactor.Flux;
import reactor.core.publisher.convert.RxJava1Converter;
import rx.Observable;
@ -85,7 +86,7 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<ByteBuffer> getBody() {
public Flux<ByteBuffer> getBody() {
Observable<ByteBuffer> content = this.getRxNettyRequest().getContent().map(ByteBuf::nioBuffer);
content = content.concatWith(Observable.empty()); // See GH issue #58
return RxJava1Converter.from(content);

View File

@ -22,7 +22,8 @@ import java.util.List;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import reactor.core.publisher.convert.RxJava1Converter;
import rx.Observable;
@ -66,14 +67,14 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
public Mono<Void> setBody(Publisher<ByteBuffer> publisher) {
return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after();
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
Observable<byte[]> content = RxJava1Converter.from(publisher).map(this::toBytes);
Observable<Void> completion = getRxNettyResponse().writeBytes(content);
return RxJava1Converter.from(completion);
return RxJava1Converter.from(completion).after();
}
private byte[] toBytes(ByteBuffer buffer) {

View File

@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Mono;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
@ -73,7 +74,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(synchronizer);
ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse,
publisher -> subscriber -> publisher.subscribe(responseBodySubscriber));
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber)));
servletResponse.getOutputStream().setWriteListener(responseBodySubscriber);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(synchronizer, response);

View File

@ -25,6 +25,7 @@ import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -46,14 +47,14 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
private HttpHeaders headers;
private final Publisher<ByteBuffer> requestBodyPublisher;
private final Flux<ByteBuffer> requestBodyPublisher;
public ServletServerHttpRequest(HttpServletRequest request, Publisher<ByteBuffer> body) {
Assert.notNull(request, "'request' must not be null.");
Assert.notNull(body, "'body' must not be null.");
this.request = request;
this.requestBodyPublisher = body;
this.requestBodyPublisher = Flux.from(body);
}
@ -126,7 +127,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<ByteBuffer> getBody() {
public Flux<ByteBuffer> getBody() {
return this.requestBodyPublisher;
}

View File

@ -22,7 +22,8 @@ import java.util.function.Function;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import org.springframework.http.ExtendedHttpHeaders;
import org.springframework.http.HttpHeaders;
@ -38,13 +39,13 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
private final HttpServletResponse response;
private final Function<Publisher<ByteBuffer>, Publisher<Void>> responseBodyWriter;
private final Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter;
private final HttpHeaders headers;
public ServletServerHttpResponse(HttpServletResponse response,
Function<Publisher<ByteBuffer>, Publisher<Void>> responseBodyWriter) {
Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter) {
Assert.notNull(response, "'response' must not be null");
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
@ -69,11 +70,11 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> setBody(final Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
public Mono<Void> setBody(final Publisher<ByteBuffer> publisher) {
return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after();
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return this.responseBodyWriter.apply(publisher);
}

View File

@ -35,7 +35,8 @@ import org.reactivestreams.Subscription;
import org.xnio.ChannelListener;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import reactor.core.error.SpecificationExceptions;
import reactor.Mono;
import reactor.core.error.Exceptions;
import reactor.core.subscriber.BaseSubscriber;
import reactor.core.support.BackpressureUtils;
@ -72,7 +73,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(exchange);
ServerHttpResponse response = new UndertowServerHttpResponse(exchange,
publisher -> subscriber -> publisher.subscribe(responseBodySubscriber));
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber)));
exchange.dispatch();
@ -126,7 +127,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (subscriber == null) {
throw SpecificationExceptions.spec_2_13_exception();
throw Exceptions.spec_2_13_exception();
}
if (this.subscriber != null) {
subscriber.onError(new IllegalStateException("Only one subscriber allowed"));

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderValues;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -42,14 +43,14 @@ public class UndertowServerHttpRequest implements ServerHttpRequest {
private HttpHeaders headers;
private final Publisher<ByteBuffer> body;
private final Flux<ByteBuffer> body;
public UndertowServerHttpRequest(HttpServerExchange exchange, Publisher<ByteBuffer> body) {
Assert.notNull(exchange, "'exchange' is required.");
Assert.notNull(exchange, "'body' is required.");
this.exchange = exchange;
this.body = body;
this.body = Flux.from(body);
}
@ -91,7 +92,7 @@ public class UndertowServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<ByteBuffer> getBody() {
public Flux<ByteBuffer> getBody() {
return this.body;
}

View File

@ -23,7 +23,8 @@ import java.util.function.Function;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HttpString;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import org.springframework.http.ExtendedHttpHeaders;
import org.springframework.http.HttpHeaders;
@ -40,13 +41,13 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
private final HttpServerExchange exchange;
private final Function<Publisher<ByteBuffer>, Publisher<Void>> responseBodyWriter;
private final Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter;
private final HttpHeaders headers;
public UndertowServerHttpResponse(HttpServerExchange exchange,
Function<Publisher<ByteBuffer>, Publisher<Void>> responseBodyWriter) {
Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter) {
Assert.notNull(exchange, "'exchange' is required.");
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
@ -72,11 +73,11 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
public Mono<Void> setBody(Publisher<ByteBuffer> publisher) {
return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after();
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return this.responseBodyWriter.apply(publisher);
}

View File

@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.Publishers;
import reactor.rx.Streams;
/**
* {@code InputStream} implementation based on a byte array {@link Publisher}.
@ -60,7 +60,7 @@ public class ByteBufferPublisherInputStream extends InputStream {
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, int requestSize) {
Assert.notNull(publisher, "'publisher' must not be null");
this.queue = Publishers.toReadQueue(publisher, requestSize);
this.queue = Streams.from(publisher).toBlockingQueue(requestSize);
}

View File

@ -23,10 +23,8 @@ import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.Publishers;
import reactor.fn.BiConsumer;
import reactor.Flux;
import reactor.Mono;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactoryUtils;
@ -114,42 +112,25 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (logger.isDebugEnabled()) {
logger.debug("Processing " + request.getMethod() + " request for [" + request.getURI() + "]");
}
Publisher<HandlerMapping> mappings = Publishers.from(this.handlerMappings);
Publisher<Object> handlerPublisher = Publishers.concatMap(mappings, m -> m.getHandler(request));
handlerPublisher = first(handlerPublisher);
Publisher<HandlerResult> resultPublisher = Publishers.concatMap(handlerPublisher, handler -> {
HandlerAdapter handlerAdapter = getHandlerAdapter(handler);
return handlerAdapter.handle(request, response, handler);
});
Publisher<Void> completionPublisher = Publishers.concatMap(resultPublisher, result -> {
Publisher<Void> publisher;
if (result.hasError()) {
publisher = Publishers.error(result.getError());
}
else {
HandlerResultHandler handler = getResultHandler(result);
publisher = handler.handleResult(request, response, result);
}
if (result.hasExceptionMapper()) {
return Publishers.onErrorResumeNext(publisher, ex -> {
return Publishers.concatMap(result.getExceptionMapper().apply(ex),
errorResult -> {
HandlerResultHandler handler = getResultHandler(errorResult);
return handler.handleResult(request, response, errorResult);
});
});
}
return publisher;
});
return mapError(completionPublisher, this.errorMapper);
return Flux.fromIterable(this.handlerMappings)
.concatMap(m -> m.getHandler(request))
.next()
.then(handler -> getHandlerAdapter(handler).handle(request, response, handler))
.then(result -> {
Mono<Void> publisher = (result.hasError() ? Mono.error(result.getError()) :
getResultHandler(result).handleResult(request, response, result));
if (result.hasExceptionMapper()) {
return publisher
.otherwise(ex -> result.getExceptionMapper().apply(ex)
.then(errorResult -> getResultHandler(errorResult).handleResult(request, response, errorResult)));
}
return publisher;
})
.otherwise(ex -> Mono.error(this.errorMapper.apply(ex)));
}
protected HandlerAdapter getHandlerAdapter(Object handler) {
@ -171,22 +152,6 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
}
private static <E> Publisher<E> first(Publisher<E> source) {
return Publishers.lift(source, (e, subscriber) -> {
subscriber.onNext(e);
subscriber.onComplete();
});
}
private static <E> Publisher<E> mapError(Publisher<E> source, Function<Throwable, Throwable> function) {
return Publishers.lift(source, null, new BiConsumer<Throwable, Subscriber<? super E>>() {
@Override
public void accept(Throwable throwable, Subscriber<? super E> subscriber) {
subscriber.onError(function.apply(throwable));
}
}, null);
}
private static class NotFoundHandlerMapping implements HandlerMapping {
@SuppressWarnings("ThrowableInstanceNeverThrown")
@ -194,8 +159,8 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
@Override
public Publisher<Object> getHandler(ServerHttpRequest request) {
return Publishers.error(HANDLER_NOT_FOUND_EXCEPTION);
public Mono<Object> getHandler(ServerHttpRequest request) {
return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
}
}

View File

@ -17,6 +17,7 @@
package org.springframework.web.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -54,7 +55,7 @@ public interface HandlerAdapter {
* returned {@code true}.
* @return A {@link Publisher} object that produces a single {@link HandlerResult} element
*/
Publisher<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response,
Mono<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response,
Object handler);
}

View File

@ -16,7 +16,7 @@
package org.springframework.web.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -32,8 +32,8 @@ public interface HandlerMapping {
/**
* Return a handler for this request.
* @param request current HTTP request
* @return A {@link Publisher} object that produces a single handler element
* @return A {@link Mono} object that produces a single handler element
*/
Publisher<Object> getHandler(ServerHttpRequest request);
Mono<Object> getHandler(ServerHttpRequest request);
}

View File

@ -17,10 +17,8 @@
package org.springframework.web.reactive;
import java.util.function.Function;
import java.util.logging.Handler;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.util.Assert;
@ -40,7 +38,7 @@ public class HandlerResult {
private final Throwable error;
private Function<Throwable, Publisher<HandlerResult>> exceptionMapper;
private Function<Throwable, Mono<HandlerResult>> exceptionMapper;
public HandlerResult(Object handler, Object result, ResolvableType resultType) {
@ -95,12 +93,12 @@ public class HandlerResult {
* that results in an error.
* @param function the exception resolving function
*/
public HandlerResult setExceptionMapper(Function<Throwable, Publisher<HandlerResult>> function) {
public HandlerResult setExceptionMapper(Function<Throwable, Mono<HandlerResult>> function) {
this.exceptionMapper = function;
return this;
}
public Function<Throwable, Publisher<HandlerResult>> getExceptionMapper() {
public Function<Throwable, Mono<HandlerResult>> getExceptionMapper() {
return this.exceptionMapper;
}

View File

@ -16,7 +16,7 @@
package org.springframework.web.reactive;
import org.reactivestreams.Publisher;
import reactor.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -42,13 +42,13 @@ public interface HandlerResultHandler {
* Process the given result in an asynchronous non blocking way, by eventually modifying
* response headers, or writing some data stream into the response.
* Implementations should not throw exceptions but signal them via the returned
* {@code Publisher<Void>}.
* {@code Mono<Void>}.
*
* @return A {@code Publisher<Void>} used to signal the demand, and receive a notification
* @return A {@code Mono<Void>} used to signal the demand, and receive a notification
* when the handling is complete (success or error) including the flush of the data on the
* network.
*/
Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response,
Mono<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response,
HandlerResult result);
}

View File

@ -15,8 +15,7 @@
*/
package org.springframework.web.reactive;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.http.server.reactive.HttpExceptionHandler;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -32,12 +31,12 @@ public class ResponseStatusExceptionHandler implements HttpExceptionHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
if (ex instanceof ResponseStatusException) {
response.setStatusCode(((ResponseStatusException) ex).getHttpStatus());
return Publishers.empty();
return Mono.empty();
}
return Publishers.error(ex);
return Mono.error(ex);
}
}

View File

@ -17,7 +17,7 @@
package org.springframework.web.reactive.handler;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -50,12 +50,12 @@ public class HttpHandlerAdapter implements HandlerAdapter {
}
@Override
public Publisher<HandlerResult> handle(ServerHttpRequest request,
public Mono<HandlerResult> handle(ServerHttpRequest request,
ServerHttpResponse response, Object handler) {
HttpHandler httpHandler = (HttpHandler)handler;
Publisher<Void> completion = httpHandler.handle(request, response);
return Publishers.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID));
Mono<Void> completion = httpHandler.handle(request, response);
return Mono.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID));
}
}

View File

@ -17,7 +17,7 @@
package org.springframework.web.reactive.handler;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
@ -74,17 +74,17 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler
@SuppressWarnings("unchecked")
@Override
public Publisher<Void> handleResult(ServerHttpRequest request,
public Mono<Void> handleResult(ServerHttpRequest request,
ServerHttpResponse response, HandlerResult result) {
Object value = result.getResult();
if (Void.TYPE.equals(result.getResultType().getRawClass())) {
return Publishers.empty();
return Mono.empty();
}
return (value instanceof Publisher ? (Publisher<Void>)value :
this.conversionService.convert(value, Publisher.class));
return (value instanceof Mono ? (Mono<Void>)value :
Mono.from(this.conversionService.convert(value, Publisher.class)));
}
}

View File

@ -19,8 +19,8 @@ package org.springframework.web.reactive.handler;
import java.util.HashMap;
import java.util.Map;
import org.reactivestreams.Publisher;
import reactor.core.publisher.PublisherFactory;
import reactor.Flux;
import reactor.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.HandlerMapping;
@ -42,15 +42,15 @@ public class SimpleUrlHandlerMapping implements HandlerMapping {
@Override
public Publisher<Object> getHandler(ServerHttpRequest request) {
return PublisherFactory.create(subscriber -> {
public Mono<Object> getHandler(ServerHttpRequest request) {
return Flux.create(subscriber -> {
String path = request.getURI().getPath();
Object handler = this.handlerMap.get(path);
if (handler != null) {
subscriber.onNext(handler);
}
subscriber.onComplete();
});
}).next();
}
}

View File

@ -16,7 +16,7 @@
package org.springframework.web.reactive.method;
import org.reactivestreams.Publisher;
import reactor.Mono;
import org.springframework.core.MethodParameter;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -39,6 +39,6 @@ public interface HandlerMethodArgumentResolver {
* resolve to any value which will result in passing {@code null} as the
* argument value.
*/
Publisher<Object> resolveArgument(MethodParameter parameter, ServerHttpRequest request);
Mono<Object> resolveArgument(MethodParameter parameter, ServerHttpRequest request);
}

View File

@ -21,15 +21,13 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import reactor.fn.tuple.Tuple;
import reactor.rx.Streams;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.GenericTypeResolver;
@ -48,7 +46,7 @@ import org.springframework.web.reactive.HandlerResult;
*/
public class InvocableHandlerMethod extends HandlerMethod {
public static final Publisher<Object[]> NO_ARGS = Publishers.just(new Object[0]);
public static final Mono<Object[]> NO_ARGS = Mono.just(new Object[0]);
private final static Object NO_VALUE = new Object();
@ -86,38 +84,37 @@ public class InvocableHandlerMethod extends HandlerMethod {
* @return Publisher that produces a single HandlerResult or an error signal;
* never throws an exception.
*/
public Publisher<HandlerResult> invokeForRequest(ServerHttpRequest request, Object... providedArgs) {
public Mono<HandlerResult> invokeForRequest(ServerHttpRequest request, Object... providedArgs) {
Publisher<Object[]> argsPublisher = NO_ARGS;
Mono<Object[]> argsPublisher = NO_ARGS;
try {
if (!ObjectUtils.isEmpty(getMethodParameters())) {
List<Publisher<Object>> publishers = resolveArguments(request, providedArgs);
argsPublisher = Publishers.zip(publishers, this::initArgs);
argsPublisher = first(argsPublisher);
List<Mono<Object>> publishers = resolveArguments(request, providedArgs);
argsPublisher = Flux.zip(publishers, this::initArgs).next();
}
}
catch (Throwable ex) {
return Publishers.error(ex);
return Mono.error(ex);
}
return Publishers.concatMap(argsPublisher, args -> {
return Flux.from(argsPublisher).concatMap(args -> {
try {
Object value = doInvoke(args);
ResolvableType type = ResolvableType.forMethodParameter(getReturnType());
HandlerResult handlerResult = new HandlerResult(this, value, type);
return Publishers.just(handlerResult);
return Mono.just(handlerResult);
}
catch (InvocationTargetException ex) {
return Publishers.error(ex.getTargetException());
return Mono.error(ex.getTargetException());
}
catch (Throwable ex) {
String s = getInvocationErrorMessage(args);
return Publishers.error(new IllegalStateException(s));
return Mono.error(new IllegalStateException(s));
}
});
}).next();
}
private List<Publisher<Object>> resolveArguments(ServerHttpRequest request, Object... providedArgs) {
private List<Mono<Object>> resolveArguments(ServerHttpRequest request, Object... providedArgs) {
return Stream.of(getMethodParameters())
.map(parameter -> {
parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
@ -125,7 +122,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
if (!ObjectUtils.isEmpty(providedArgs)) {
for (Object providedArg : providedArgs) {
if (parameter.getParameterType().isInstance(providedArg)) {
return Publishers.just(providedArg);
return Mono.just(providedArg);
}
}
}
@ -134,9 +131,10 @@ public class InvocableHandlerMethod extends HandlerMethod {
.findFirst()
.orElseThrow(() -> getArgError("No resolver for ", parameter, null));
try {
Publisher<Object> publisher = resolver.resolveArgument(parameter, request);
publisher = mapError(publisher, ex -> getArgError("Error resolving ", parameter, ex));
return Streams.wrap(publisher).defaultIfEmpty(NO_VALUE);
return resolver.resolveArgument(parameter, request)
// TODO Add a defaultIfEmpty alias to Mono to avoid conversion to Flux
.flux().defaultIfEmpty(NO_VALUE).next()
.otherwise(ex -> Mono.error(getArgError("Error resolving ", parameter, ex)));
}
catch (Exception ex) {
throw getArgError("Error resolving ", parameter, ex);
@ -180,17 +178,4 @@ public class InvocableHandlerMethod extends HandlerMethod {
return Stream.of(tuple.toArray()).map(o -> o != NO_VALUE ? o : null).toArray();
}
private static <E> Publisher<E> first(Publisher<E> source) {
return Publishers.lift(source, (e, subscriber) -> {
subscriber.onNext(e);
subscriber.onComplete();
});
}
private static <E> Publisher<E> mapError(Publisher<E> source, Function<Throwable, Throwable> function) {
return Publishers.lift(source, null,
(throwable, subscriber) -> subscriber.onError(function.apply(throwable)), null);
}
}

View File

@ -20,7 +20,8 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
@ -57,14 +58,14 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
}
@Override
public Publisher<Object> resolveArgument(MethodParameter parameter, ServerHttpRequest request) {
public Mono<Object> resolveArgument(MethodParameter parameter, ServerHttpRequest request) {
MediaType mediaType = request.getHeaders().getContentType();
if (mediaType == null) {
mediaType = MediaType.APPLICATION_OCTET_STREAM;
}
ResolvableType type = ResolvableType.forMethodParameter(parameter);
Publisher<ByteBuffer> body = request.getBody();
Publisher<?> elementStream = body;
Flux<ByteBuffer> body = request.getBody();
Flux<?> elementStream = body;
ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type;
Decoder<?> decoder = resolveDecoder(elementType, mediaType);
@ -73,10 +74,10 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
}
if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) {
return Publishers.just(this.conversionService.convert(elementStream, type.getRawClass()));
return Mono.just(this.conversionService.convert(elementStream, type.getRawClass()));
}
return Publishers.map(elementStream, element -> element);
return (Mono<Object>)Mono.from(elementStream);
}
private Decoder<?> resolveDecoder(ResolvableType type, MediaType mediaType, Object... hints) {

View File

@ -25,8 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.codec.Decoder;
@ -106,7 +106,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
}
@Override
public Publisher<HandlerResult> handle(ServerHttpRequest request,
public Mono<HandlerResult> handle(ServerHttpRequest request,
ServerHttpResponse response, Object handler) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
@ -114,20 +114,18 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod);
invocable.setHandlerMethodArgumentResolvers(this.argumentResolvers);
Publisher<HandlerResult> publisher = invocable.invokeForRequest(request);
Flux<HandlerResult> publisher = invocable.invokeForRequest(request).flux();
publisher = publisher.onErrorResumeWith(ex -> Mono.just(new HandlerResult(handler, ex)));
publisher = Publishers.onErrorResumeNext(publisher, ex -> {
return Publishers.just(new HandlerResult(handler, ex));
});
publisher = Publishers.map(publisher,
publisher = publisher.map(
result -> result.setExceptionMapper(
ex -> mapException((Exception) ex, handlerMethod, request, response)));
ex -> mapException(ex, handlerMethod, request, response)));
return publisher;
return publisher.next();
}
private Publisher<HandlerResult> mapException(Throwable ex, HandlerMethod handlerMethod,
private Mono<HandlerResult> mapException(Throwable ex, HandlerMethod handlerMethod,
ServerHttpRequest request, ServerHttpResponse response) {
if (ex instanceof Exception) {
@ -147,7 +145,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
}
}
}
return Publishers.error(ex);
return Mono.error(ex);
}
protected InvocableHandlerMethod findExceptionHandler(HandlerMethod handlerMethod, Exception exception) {

View File

@ -27,8 +27,8 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.PublisherFactory;
import reactor.Flux;
import reactor.Mono;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
@ -94,8 +94,8 @@ public class RequestMappingHandlerMapping implements HandlerMapping,
}
@Override
public Publisher<Object> getHandler(ServerHttpRequest request) {
return PublisherFactory.create(subscriber -> {
public Mono<Object> getHandler(ServerHttpRequest request) {
return Flux.create(subscriber -> {
for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : this.methodMap.entrySet()) {
RequestMappingInfo info = entry.getKey();
if (info.matchesRequest(request)) {
@ -109,7 +109,7 @@ public class RequestMappingHandlerMapping implements HandlerMapping,
}
}
subscriber.onComplete();
});
}).next();
}

View File

@ -16,11 +16,7 @@
package org.springframework.web.reactive.method.annotation;
import java.util.Optional;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.core.MethodParameter;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -45,12 +41,12 @@ public class RequestParamArgumentResolver implements HandlerMethodArgumentResolv
@Override
public Publisher<Object> resolveArgument(MethodParameter param, ServerHttpRequest request) {
public Mono<Object> resolveArgument(MethodParameter param, ServerHttpRequest request) {
RequestParam annotation = param.getParameterAnnotation(RequestParam.class);
String name = (annotation.value().length() != 0 ? annotation.value() : param.getParameterName());
UriComponents uriComponents = UriComponentsBuilder.fromUri(request.getURI()).build();
String value = uriComponents.getQueryParams().getFirst(name);
return (value != null ? Publishers.just(value) : Publishers.empty());
return (value != null ? Mono.just(value) : Mono.empty());
}
}

View File

@ -28,7 +28,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
@ -127,12 +127,12 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> handleResult(ServerHttpRequest request,
public Mono<Void> handleResult(ServerHttpRequest request,
ServerHttpResponse response, HandlerResult result) {
Object value = result.getResult();
if (value == null) {
return Publishers.empty();
return Mono.empty();
}
Publisher<?> publisher;
@ -143,7 +143,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
elementType = returnType.getGeneric(0);
}
else {
publisher = Publishers.just(value);
publisher = Mono.just(value);
elementType = returnType;
}
@ -163,7 +163,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
}
}
if (compatibleMediaTypes.isEmpty()) {
return Publishers.error(new HttpMediaTypeNotAcceptableException(producibleMediaTypes));
return Mono.error(new HttpMediaTypeNotAcceptableException(producibleMediaTypes));
}
List<MediaType> mediaTypes = new ArrayList<>(compatibleMediaTypes);
@ -189,7 +189,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
}
}
return Publishers.error(new HttpMediaTypeNotAcceptableException(this.allMediaTypes));
return Mono.error(new HttpMediaTypeNotAcceptableException(this.allMediaTypes));
}
private List<MediaType> getAcceptableMediaTypes(ServerHttpRequest request) {

View File

@ -16,11 +16,7 @@
package org.springframework.http.server.reactive;
import org.reactivestreams.Publisher;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.Mono;
/**
* @author Arjen Poutsma
@ -28,7 +24,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
public class EchoHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.setBody(request.getBody());
}
}

View File

@ -17,12 +17,11 @@ package org.springframework.http.server.reactive;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import reactor.rx.Streams;
import reactor.rx.stream.Signal;
@ -56,8 +55,7 @@ public class ErrorHandlingHttpHandlerTests {
HttpHandler targetHandler = new TestHttpHandler(new IllegalStateException("boo"));
HttpHandler handler = new ErrorHandlingHttpHandler(targetHandler, exceptionHandler);
Publisher<Void> publisher = handler.handle(this.request, this.response);
Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
handler.handle(this.request, this.response).get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
@ -73,8 +71,7 @@ public class ErrorHandlingHttpHandlerTests {
HttpHandler targetHandler = new TestHttpHandler(new IllegalStateException("boo"));
HttpHandler httpHandler = new ErrorHandlingHttpHandler(targetHandler, exceptionHandlers);
Publisher<Void> publisher = httpHandler.handle(this.request, this.response);
Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
httpHandler.handle(this.request, this.response).get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
@ -98,15 +95,14 @@ public class ErrorHandlingHttpHandlerTests {
HttpHandler targetHandler = new TestHttpHandler(new IllegalStateException("boo"), true);
HttpHandler handler = new ErrorHandlingHttpHandler(targetHandler, exceptionHandler);
Publisher<Void> publisher = handler.handle(this.request, this.response);
Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
handler.handle(this.request, this.response).get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
Signal<?> signal = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS).get(0);
Signal<?> signal = Streams.from(publisher).materialize().toList().get().get(0);
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
return signal.getThrowable();
}
@ -129,11 +125,11 @@ public class ErrorHandlingHttpHandlerTests {
}
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.raise) {
throw this.exception;
}
return Publishers.error(this.exception);
return Mono.error(this.exception);
}
}
@ -142,8 +138,8 @@ public class ErrorHandlingHttpHandlerTests {
private static class UnresolvedExceptionHandler implements HttpExceptionHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
return Publishers.error(ex);
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
return Mono.error(ex);
}
}

View File

@ -16,15 +16,11 @@
package org.springframework.http.server.reactive;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.rx.Streams;
import reactor.Mono;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -57,8 +53,7 @@ public class FilterChainHttpHandlerTests {
TestFilter filter3 = new TestFilter();
FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler, filter1, filter2, filter3);
Publisher<Void> voidPublisher = filterHandler.handle(this.request, this.response);
Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS);
filterHandler.handle(this.request, this.response).get();
assertTrue(filter1.invoked());
assertTrue(filter2.invoked());
@ -71,8 +66,7 @@ public class FilterChainHttpHandlerTests {
StubHandler handler = new StubHandler();
FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler);
Publisher<Void> voidPublisher = filterHandler.handle(this.request, this.response);
Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS);
filterHandler.handle(this.request, this.response).get();
assertTrue(handler.invoked());
}
@ -85,8 +79,7 @@ public class FilterChainHttpHandlerTests {
TestFilter filter3 = new TestFilter();
FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler, filter1, filter2, filter3);
Publisher<Void> voidPublisher = filterHandler.handle(this.request, this.response);
Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS);
filterHandler.handle(this.request, this.response).get();
assertTrue(filter1.invoked());
assertTrue(filter2.invoked());
@ -100,8 +93,7 @@ public class FilterChainHttpHandlerTests {
AsyncFilter filter = new AsyncFilter();
FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler, filter);
Publisher<Void> voidPublisher = filterHandler.handle(this.request, this.response);
Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS);
filterHandler.handle(this.request, this.response).get();
assertTrue(filter.invoked());
assertTrue(handler.invoked());
@ -119,14 +111,14 @@ public class FilterChainHttpHandlerTests {
}
@Override
public Publisher<Void> filter(ServerHttpRequest req, ServerHttpResponse res,
public Mono<Void> filter(ServerHttpRequest req, ServerHttpResponse res,
HttpFilterChain chain) {
this.invoked = true;
return doFilter(req, res, chain);
}
public Publisher<Void> doFilter(ServerHttpRequest req, ServerHttpResponse res,
public Mono<Void> doFilter(ServerHttpRequest req, ServerHttpResponse res,
HttpFilterChain chain) {
return chain.filter(req, res);
@ -136,25 +128,25 @@ public class FilterChainHttpHandlerTests {
private static class ShortcircuitingFilter extends TestFilter {
@Override
public Publisher<Void> doFilter(ServerHttpRequest req, ServerHttpResponse res,
public Mono<Void> doFilter(ServerHttpRequest req, ServerHttpResponse res,
HttpFilterChain chain) {
return Publishers.empty();
return Mono.empty();
}
}
private static class AsyncFilter extends TestFilter {
@Override
public Publisher<Void> doFilter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) {
return Publishers.concatMap(doAsyncWork(), asyncResult -> {
public Mono<Void> doFilter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) {
return doAsyncWork().then(asyncResult -> {
logger.debug("Async result: " + asyncResult);
return chain.filter(req, res);
});
}
private Publisher<String> doAsyncWork() {
return Publishers.just("123");
private Mono<String> doAsyncWork() {
return Mono.just("123");
}
}
@ -168,10 +160,10 @@ public class FilterChainHttpHandlerTests {
}
@Override
public Publisher<Void> handle(ServerHttpRequest req, ServerHttpResponse res) {
public Mono<Void> handle(ServerHttpRequest req, ServerHttpResponse res) {
logger.trace("StubHandler invoked.");
this.invoked = true;
return Publishers.empty();
return Mono.empty();
}
}

View File

@ -19,6 +19,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Flux;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -34,7 +35,7 @@ public class MockServerHttpRequest implements ServerHttpRequest {
private HttpHeaders headers = new HttpHeaders();
private Publisher<ByteBuffer> body;
private Flux<ByteBuffer> body;
public MockServerHttpRequest(HttpMethod httpMethod, URI uri) {
@ -43,7 +44,7 @@ public class MockServerHttpRequest implements ServerHttpRequest {
}
public MockServerHttpRequest(Publisher<ByteBuffer> body, HttpMethod httpMethod, URI uri) {
this.body = body;
this.body = Flux.from(body);
this.httpMethod = httpMethod;
this.uri = uri;
}
@ -77,11 +78,11 @@ public class MockServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<ByteBuffer> getBody() {
public Flux<ByteBuffer> getBody() {
return this.body;
}
public void setBody(Publisher<ByteBuffer> body) {
this.body = body;
this.body = Flux.from(body);
}
}

View File

@ -18,7 +18,8 @@ package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
@ -50,9 +51,9 @@ public class MockServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> body) {
public Mono<Void> setBody(Publisher<ByteBuffer> body) {
this.body = body;
return Publishers.concatMap(body, b -> Publishers.empty());
return Flux.from(body).after();
}
public Publisher<ByteBuffer> getBody() {

View File

@ -21,11 +21,10 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import static org.junit.Assert.assertEquals;
@ -41,7 +40,7 @@ public class RandomHandler implements HttpHandler {
private final Random rnd = new Random();
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
request.getBody().subscribe(new Subscriber<ByteBuffer>() {
private Subscription s;
@ -73,7 +72,7 @@ public class RandomHandler implements HttpHandler {
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return response.setBody(Streams.just(ByteBuffer.wrap(randomBytes())));
return response.setBody(Mono.just(ByteBuffer.wrap(randomBytes())));
}
private byte[] randomBytes() {

View File

@ -27,14 +27,12 @@ import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Publishers;
import reactor.core.publisher.PublisherFactory;
import reactor.Flux;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.rx.Streams;
import reactor.rx.stream.Signal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -58,8 +56,8 @@ public class WriteWithOperatorTests {
@Test
public void errorBeforeFirstItem() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Publisher<Void> completion = Publishers.lift(Publishers.error(error), this.operator);
List<Signal<Void>> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS);
Publisher<Void> completion = Flux.<String>error(error).lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());
@ -67,8 +65,8 @@ public class WriteWithOperatorTests {
@Test
public void completionBeforeFirstItem() throws Exception {
Publisher<Void> completion = Publishers.lift(Publishers.empty(), this.operator);
List<Signal<Void>> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS);
Publisher<Void> completion = Flux.<String>empty().lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -79,8 +77,8 @@ public class WriteWithOperatorTests {
@Test
public void writeOneItem() throws Exception {
Publisher<Void> completion = Publishers.lift(Publishers.just("one"), this.operator);
List<Signal<Void>> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS);
Publisher<Void> completion = Flux.just("one").lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -94,8 +92,8 @@ public class WriteWithOperatorTests {
@Test
public void writeMultipleItems() throws Exception {
List<String> items = Arrays.asList("one", "two", "three");
Publisher<Void> completion = Publishers.lift(Publishers.from(items), this.operator);
List<Signal<Void>> signals = Streams.wrap(completion).materialize().consumeAsList().await(5, TimeUnit.SECONDS);
Publisher<Void> completion = Flux.fromIterable(items).lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -110,15 +108,15 @@ public class WriteWithOperatorTests {
@Test
public void errorAfterMultipleItems() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Publisher<String> publisher = PublisherFactory.create(subscriber -> {
Flux<String> publisher = Flux.create(subscriber -> {
int i = subscriber.context().incrementAndGet();
subscriber.onNext(String.valueOf(i));
if (i == 3) {
subscriber.onError(error);
}
}, subscriber -> new AtomicInteger());
Publisher<Void> completion = Publishers.lift(publisher, this.operator);
List<Signal<Void>> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS);
Publisher<Void> completion = publisher.lift(this.operator);
List<Signal<Void>> signals = Streams.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());

View File

@ -22,7 +22,7 @@ import javax.xml.bind.Unmarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
@ -40,7 +40,7 @@ public class XmlHandler implements HttpHandler {
private static final Log logger = LogFactory.getLog(XmlHandler.class);
@Override
public Publisher<Void> handle(ServerHttpRequest request,
public Mono<Void> handle(ServerHttpRequest request,
ServerHttpResponse response) {
try {
JAXBContext jaxbContext = JAXBContext.newInstance(XmlHandlerIntegrationTests.Person.class);

View File

@ -49,8 +49,8 @@ public class ByteBufferDecoderTests {
ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer();
ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer();
Stream<ByteBuffer> source = Streams.just(fooBuffer, barBuffer);
List<ByteBuffer> results = Streams.wrap(decoder.decode(source,
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await();
List<ByteBuffer> results = Streams.from(decoder.decode(source,
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().get();
assertEquals(2, results.size());
assertEquals(fooBuffer, results.get(0));
assertEquals(barBuffer, results.get(1));

View File

@ -48,8 +48,8 @@ public class JacksonJsonDecoderTests {
@Test
public void decode() throws InterruptedException {
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
List<Object> results = Streams.wrap(decoder.decode(source, ResolvableType.forClass(Pojo.class), null))
.toList().await();
List<Object> results = Streams.from(decoder.decode(source, ResolvableType.forClass(Pojo.class), null))
.toList().get();
assertEquals(1, results.size());
assertEquals("foofoo", ((Pojo) results.get(0)).getFoo());
}

View File

@ -49,8 +49,8 @@ public class Jaxb2DecoderTests {
@Test
public void decode() throws InterruptedException {
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("<?xml version=\"1.0\" encoding=\"UTF-8\"?><pojo><bar>barbar</bar><foo>foofoo</foo></pojo>").byteBuffer());
List<Object> results = Streams.wrap(decoder.decode(source, ResolvableType.forClass(Pojo.class), null))
.toList().await();
List<Object> results = Streams.from(decoder.decode(source, ResolvableType.forClass(Pojo.class), null))
.toList().get();
assertEquals(1, results.size());
assertEquals("foofoo", ((Pojo) results.get(0)).getFoo());
}

View File

@ -37,11 +37,11 @@ public class JsonObjectDecoderTests {
public void decodeSingleChunkToJsonObject() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
List<String> results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(decoder.decode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(1, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
}
@ -50,11 +50,11 @@ public class JsonObjectDecoderTests {
public void decodeMultipleChunksToJsonObject() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\"").byteBuffer(), Buffer.wrap(", \"bar\": \"barbar\"}").byteBuffer());
List<String> results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(decoder.decode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(1, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
}
@ -63,11 +63,11 @@ public class JsonObjectDecoderTests {
public void decodeSingleChunkToArray() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer());
List<String> results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(decoder.decode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(2, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
assertEquals("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}", results.get(1));
@ -77,11 +77,11 @@ public class JsonObjectDecoderTests {
public void decodeMultipleChunksToArray() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\"").byteBuffer(), Buffer.wrap(": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer());
List<String> results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(decoder.decode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(2, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
assertEquals("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}", results.get(1));

View File

@ -49,8 +49,8 @@ public class StringDecoderTests {
@Test
public void decode() throws InterruptedException {
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer());
List<String> results = Streams.wrap(decoder.decode(source,
ResolvableType.forClassWithGenerics(Publisher.class, String.class), null)).toList().await();
List<String> results = Streams.from(decoder.decode(source,
ResolvableType.forClassWithGenerics(Publisher.class, String.class), null)).toList().get();
assertEquals(2, results.size());
assertEquals("foo", results.get(0));
assertEquals("bar", results.get(1));

View File

@ -49,8 +49,8 @@ public class ByteBufferEncoderTests {
ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer();
ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer();
Stream<ByteBuffer> source = Streams.just(fooBuffer, barBuffer);
List<ByteBuffer> results = Streams.wrap(encoder.encode(source,
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await();
List<ByteBuffer> results = Streams.from(encoder.encode(source,
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().get();
assertEquals(2, results.size());
assertEquals(fooBuffer, results.get(0));
assertEquals(barBuffer, results.get(1));

View File

@ -46,11 +46,11 @@ public class JacksonJsonEncoderTests {
@Test
public void write() throws InterruptedException {
Stream<Pojo> source = Streams.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
List<String> results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(2, results.size());
assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", results.get(0));
assertEquals("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", results.get(1));

View File

@ -47,11 +47,11 @@ public class Jaxb2EncoderTests {
@Test
public void encode() throws InterruptedException {
Stream<Pojo> source = Streams.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
List<String> results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(2, results.size());
assertEquals("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?><pojo><bar>barbar</bar><foo>foofoo</foo></pojo>", results.get(0));
assertEquals("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?><pojo><bar>barbarbar</bar><foo>foofoofoo</foo></pojo>", results.get(1));

View File

@ -37,11 +37,11 @@ public class JsonObjectEncoderTests {
public void encodeSingleElement() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder();
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
List<String> results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
String result = String.join("", results);
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", result);
}
@ -52,11 +52,11 @@ public class JsonObjectEncoderTests {
Stream<ByteBuffer> source = Streams.just(
Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer());
List<String> results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
String result = String.join("", results);
assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]", result);
}
@ -69,11 +69,11 @@ public class JsonObjectEncoderTests {
Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer()
);
List<String> results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> {
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
String result = String.join("", results);
assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]", result);
}

View File

@ -45,12 +45,12 @@ public class StringEncoderTests {
@Test
public void write() throws InterruptedException {
List<String> results = Streams.wrap(encoder.encode(Streams.just("foo"), null, null))
List<String> results = Streams.from(encoder.encode(Streams.just("foo"), null, null))
.map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toList().await();
}).toList().get();
assertEquals(1, results.size());
assertEquals("foo", results.get(0));
}

View File

@ -19,12 +19,11 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import reactor.rx.Streams;
import reactor.rx.stream.Signal;
@ -154,7 +153,7 @@ public class DispatcherHandlerErrorTests {
public void notAcceptable() throws Exception {
this.request.setUri(new URI("/request-body"));
this.request.getHeaders().setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
this.request.setBody(Publishers.just(ByteBuffer.wrap("body".getBytes("UTF-8"))));
this.request.setBody(Mono.just(ByteBuffer.wrap("body".getBytes("UTF-8"))));
Publisher<Void> publisher = this.dispatcherHandler.handle(this.request, this.response);
Throwable ex = awaitErrorSignal(publisher);
@ -167,12 +166,14 @@ public class DispatcherHandlerErrorTests {
@Test
public void requestBodyError() throws Exception {
this.request.setUri(new URI("/request-body"));
this.request.setBody(Publishers.error(EXCEPTION));
this.request.setBody(Mono.error(EXCEPTION));
Publisher<Void> publisher = this.dispatcherHandler.handle(this.request, this.response);
Throwable ex = awaitErrorSignal(publisher);
ex.printStackTrace();
assertSame(EXCEPTION, ex);
}
@Test
@ -183,7 +184,7 @@ public class DispatcherHandlerErrorTests {
HttpHandler httpHandler = new ErrorHandlingHttpHandler(this.dispatcherHandler, exceptionHandler);
Publisher<Void> publisher = httpHandler.handle(this.request, this.response);
Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
Streams.from(publisher).toList().get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
@ -196,13 +197,13 @@ public class DispatcherHandlerErrorTests {
httpHandler = new ErrorHandlingHttpHandler(httpHandler, new ServerError500ExceptionHandler());
Publisher<Void> publisher = httpHandler.handle(this.request, this.response);
Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
Streams.from(publisher).toList().get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
Signal<?> signal = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS).get(0);
Signal<?> signal = Streams.from(publisher).materialize().toList().get().get(0);
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
return signal.getThrowable();
}
@ -245,7 +246,7 @@ public class DispatcherHandlerErrorTests {
@RequestMapping("/error-signal")
@ResponseBody
public Publisher<String> errorSignal() {
return Publishers.error(EXCEPTION);
return Mono.error(EXCEPTION);
}
@RequestMapping("/raise-exception")
@ -261,7 +262,7 @@ public class DispatcherHandlerErrorTests {
@RequestMapping("/request-body")
@ResponseBody
public Publisher<String> requestBody(@RequestBody Publisher<String> body) {
return Publishers.map(body, s -> "hello " + s);
return Mono.from(body).map(s -> "hello " + s);
}
}
@ -271,16 +272,16 @@ public class DispatcherHandlerErrorTests {
private static class ServerError500ExceptionHandler implements HttpExceptionHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return Publishers.empty();
return Mono.empty();
}
}
private static class TestHttpFilter implements HttpFilter {
@Override
public Publisher<Void> filter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) {
public Mono<Void> filter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) {
return chain.filter(req, res);
}
}

View File

@ -60,7 +60,7 @@ public class ResponseStatusExceptionHandlerTests {
Throwable ex = new ResponseStatusException(HttpStatus.BAD_REQUEST);
Publisher<Void> publisher = this.handler.handle(this.request, this.response, ex);
Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
Streams.from(publisher).toList().get();
assertEquals(HttpStatus.BAD_REQUEST, this.response.getStatus());
}
@ -69,7 +69,7 @@ public class ResponseStatusExceptionHandlerTests {
Throwable ex = new IllegalStateException();
Publisher<Void> publisher = this.handler.handle(this.request, this.response, ex);
List<Signal<Void>> signals = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS);
List<Signal<Void>> signals = Streams.from(publisher).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue(signals.get(0).hasError());
assertSame(ex, signals.get(0).getThrowable());

View File

@ -28,7 +28,7 @@ import rx.Observable;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerResult;
@ -75,7 +75,7 @@ public class SimpleHandlerResultHandlerTests {
GenericConversionService conversionService = new GenericConversionService();
conversionService.addConverter(new ReactiveStreamsToCompletableFutureConverter());
conversionService.addConverter(new ReactiveStreamsToReactorConverter());
conversionService.addConverter(new ReactiveStreamsToReactorStreamConverter());
conversionService.addConverter(new ReactiveStreamsToRxJava1Converter());
SimpleHandlerResultHandler resultHandler = new SimpleHandlerResultHandler(conversionService);
TestController controller = new TestController();

View File

@ -22,8 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
@ -136,7 +135,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
private static class FooHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.setBody(Streams.just(Buffer.wrap("foo").byteBuffer()));
}
}
@ -144,7 +143,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
private static class BarHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.setBody(Streams.just(Buffer.wrap("bar").byteBuffer()));
}
}
@ -152,9 +151,9 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
private static class HeaderSettingHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
response.getHeaders().add("foo", "bar");
return Publishers.empty();
return Mono.empty();
}
}

View File

@ -21,12 +21,12 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Flux;
import reactor.Mono;
import reactor.rx.Streams;
import reactor.rx.stream.Signal;
@ -61,7 +61,7 @@ public class InvocableHandlerMethodTests {
InvocableHandlerMethod hm = createHandlerMethod("noArgs");
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.request);
List<HandlerResult> values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
List<HandlerResult> values = Streams.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success", values.get(0).getResult());
@ -74,7 +74,7 @@ public class InvocableHandlerMethodTests {
hm.setHandlerMethodArgumentResolvers(Collections.singletonList(new RequestParamArgumentResolver()));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.request);
List<HandlerResult> values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
List<HandlerResult> values = Streams.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success:null", values.get(0).getResult());
@ -83,10 +83,10 @@ public class InvocableHandlerMethodTests {
@Test
public void resolveArgToOneValue() throws Exception {
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
addResolver(hm, Publishers.just("value1"));
addResolver(hm, Mono.just("value1"));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.request);
List<HandlerResult> values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
List<HandlerResult> values = Streams.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success:value1", values.get(0).getResult());
@ -95,10 +95,10 @@ public class InvocableHandlerMethodTests {
@Test
public void resolveArgToMultipleValues() throws Exception {
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
addResolver(hm, Publishers.from(Arrays.asList("value1", "value2", "value3")));
addResolver(hm, Flux.fromIterable(Arrays.asList("value1", "value2", "value3")));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.request);
List<HandlerResult> values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS);
List<HandlerResult> values = Streams.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success:value1", values.get(0).getResult());
@ -137,7 +137,7 @@ public class InvocableHandlerMethodTests {
@Test
public void resolveArgumentWithErrorSignal() throws Exception {
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
addResolver(hm, Publishers.error(new IllegalStateException("boo")));
addResolver(hm, Mono.error(new IllegalStateException("boo")));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.request);
Throwable ex = awaitErrorSignal(publisher);
@ -151,7 +151,7 @@ public class InvocableHandlerMethodTests {
@Test
public void illegalArgumentExceptionIsWrappedWithHelpfulDetails() throws Exception {
InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class);
addResolver(hm, Publishers.just(1));
addResolver(hm, Mono.just(1));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.request);
Throwable ex = awaitErrorSignal(publisher);
@ -183,12 +183,12 @@ public class InvocableHandlerMethodTests {
private void addResolver(InvocableHandlerMethod handlerMethod, Publisher<Object> resolvedValue) {
HandlerMethodArgumentResolver resolver = mock(HandlerMethodArgumentResolver.class);
when(resolver.supportsParameter(any())).thenReturn(true);
when(resolver.resolveArgument(any(), any())).thenReturn(resolvedValue);
when(resolver.resolveArgument(any(), any())).thenReturn(Mono.from(resolvedValue));
handlerMethod.setHandlerMethodArgumentResolvers(Collections.singletonList(resolver));
}
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
Signal<?> signal = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS).get(0);
Signal<?> signal = Streams.from(publisher).materialize().toList().get().get(0);
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
return signal.getThrowable();
}

View File

@ -18,7 +18,6 @@ package org.springframework.web.reactive.method.annotation;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
@ -78,7 +77,7 @@ public class RequestMappingHandlerMappingTests {
private HandlerMethod toHandlerMethod(Publisher<?> handlerPublisher) throws InterruptedException {
assertNotNull(handlerPublisher);
List<?> handlerList = Streams.wrap(handlerPublisher).toList().await(5, TimeUnit.SECONDS);
List<?> handlerList = Streams.from(handlerPublisher).toList().get();
assertEquals(1, handlerList.size());
return (HandlerMethod) handlerList.get(0);
}

View File

@ -25,10 +25,9 @@ import java.util.concurrent.CompletableFuture;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
@ -46,7 +45,7 @@ import org.springframework.core.codec.support.StringEncoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@ -321,7 +320,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
// TODO: test failures with DefaultConversionService
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToReactorConverter());
service.addConverter(new ReactiveStreamsToReactorStreamConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
return service;
}
@ -398,7 +397,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/promise")
@ResponseBody
public Promise<Person> promiseResponseBody() {
return Promises.success(new Person("Robert"));
return Promise.success(new Person("Robert"));
}
@RequestMapping("/list")
@ -428,7 +427,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/publisher-capitalize")
@ResponseBody
public Publisher<Person> publisherCapitalize(@RequestBody Publisher<Person> persons) {
return Streams.wrap(persons).map(person -> {
return Streams.from(persons).map(person -> {
person.setName(person.getName().toUpperCase());
return person;
});
@ -482,20 +481,20 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/promise-capitalize")
@ResponseBody
public Promise<Person> promiseCapitalize(@RequestBody Promise<Person> personFuture) {
return personFuture.map(person -> {
return Streams.from(personFuture.map(person -> {
person.setName(person.getName().toUpperCase());
return person;
});
})).promise();
}
@RequestMapping("/publisher-create")
public Publisher<Void> publisherCreate(@RequestBody Publisher<Person> personStream) {
return Streams.wrap(personStream).toList().onSuccess(persons::addAll).after();
return Streams.from(personStream).toList().doOnSuccess(persons::addAll).after();
}
@RequestMapping("/stream-create")
public Promise<Void> streamCreate(@RequestBody Stream<Person> personStream) {
return personStream.toList().onSuccess(persons::addAll).after();
public Publisher<Void> streamCreate(@RequestBody Stream<Person> personStream) {
return Streams.from(personStream.toList().doOnSuccess(persons::addAll).after()).promise();
}
@RequestMapping("/observable-create")
@ -512,7 +511,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/error-signal")
@ResponseBody
public Publisher<String> handleWithError() {
return Publishers.error(new IllegalStateException("Boo"));
return Mono.error(new IllegalStateException("Boo"));
}
@ExceptionHandler