diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 99c780e6d66..117cab0e2a2 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -20,9 +20,9 @@ group = 'org.springframework.reactive' repositories { mavenCentral() - mavenLocal() maven { url 'https://oss.jfrog.org/libs-snapshot' } // RxNetty 0.5.x snapshots maven { url 'http://repo.spring.io/snapshot' } // Reactor snapshot + mavenLocal() } configurations.all { @@ -32,7 +32,7 @@ configurations.all { ext { springVersion = '4.2.3.RELEASE' - reactorVersion = '2.1.0.BUILD-SNAPSHOT' + reactorVersion = '2.5.0.BUILD-SNAPSHOT' tomcatVersion = '8.0.28' jettyVersion = '9.3.5.v20151012' } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java index b786547efca..f78397dba88 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; @@ -49,7 +50,7 @@ public interface Decoder { * @param hints Additional information about how to do decode, optional. * @return the output stream */ - Publisher decode(Publisher inputStream, ResolvableType type, + Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints); /** diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/Encoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/Encoder.java index 72e9a38f5de..bee2f78ed65 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/Encoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/Encoder.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; @@ -49,7 +50,7 @@ public interface Encoder { * @param hints Additional information about how to do decode, optional. * @return the output stream */ - Publisher encode(Publisher inputStream, ResolvableType type, + Flux encode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints); /** diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferDecoder.java index 54b78143eec..e24b722fe4b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferDecoder.java @@ -19,6 +19,7 @@ package org.springframework.core.codec.support; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; @@ -42,10 +43,10 @@ public class ByteBufferDecoder extends AbstractDecoder { } @Override - public Publisher decode(Publisher inputStream, ResolvableType type, + public Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { - return inputStream; + return Flux.from(inputStream); } } \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferEncoder.java index fad1fc8d28a..71a29865812 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ByteBufferEncoder.java @@ -19,6 +19,7 @@ package org.springframework.core.codec.support; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; @@ -42,11 +43,11 @@ public class ByteBufferEncoder extends AbstractEncoder { } @Override - public Publisher encode(Publisher inputStream, ResolvableType type, + public Flux encode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { //noinspection unchecked - return (Publisher) inputStream; + return Flux.from((Publisher)inputStream); } } \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonDecoder.java index 3280831ee1d..5042480450d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonDecoder.java @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.codec.CodecException; @@ -62,16 +62,17 @@ public class JacksonJsonDecoder extends AbstractDecoder { @Override - public Publisher decode(Publisher inputStream, ResolvableType type, + public Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { ObjectReader reader = this.mapper.readerFor(type.getRawClass()); + Flux stream = Flux.from(inputStream); if (this.preProcessor != null) { - inputStream = this.preProcessor.decode(inputStream, type, mimeType, hints); + stream = this.preProcessor.decode(inputStream, type, mimeType, hints); } - return Publishers.map(inputStream, content -> { + return stream.map(content -> { try { return reader.readValue(new ByteBufferInputStream(content)); } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java index 5b656935f50..d1f78ee5d52 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import reactor.io.buffer.Buffer; import org.springframework.core.ResolvableType; @@ -61,10 +61,10 @@ public class JacksonJsonEncoder extends AbstractEncoder { } @Override - public Publisher encode(Publisher inputStream, + public Flux encode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { - Publisher stream = Publishers.map(inputStream, value -> { + Flux stream = Flux.from(inputStream).map(value -> { Buffer buffer = new Buffer(); BufferOutputStream outputStream = new BufferOutputStream(buffer); try { diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Decoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Decoder.java index 12c2973dd8f..337cb245125 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Decoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Decoder.java @@ -34,7 +34,7 @@ import org.xml.sax.InputSource; import org.xml.sax.SAXException; import org.xml.sax.XMLReader; import org.xml.sax.helpers.XMLReaderFactory; -import reactor.Publishers; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.codec.CodecException; @@ -60,7 +60,7 @@ public class Jaxb2Decoder extends AbstractDecoder { @Override - public Publisher decode(Publisher inputStream, ResolvableType type, + public Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { Class outputClass = type.getRawClass(); @@ -68,19 +68,19 @@ public class Jaxb2Decoder extends AbstractDecoder { Source source = processSource(new StreamSource(new ByteBufferPublisherInputStream(inputStream))); Unmarshaller unmarshaller = createUnmarshaller(outputClass); if (outputClass.isAnnotationPresent(XmlRootElement.class)) { - return Publishers.just(unmarshaller.unmarshal(source)); + return Flux.just(unmarshaller.unmarshal(source)); } else { JAXBElement jaxbElement = unmarshaller.unmarshal(source, outputClass); - return Publishers.just(jaxbElement.getValue()); + return Flux.just(jaxbElement.getValue()); } } catch (UnmarshalException ex) { - return Publishers.error( + return Flux.error( new CodecException("Could not unmarshal to [" + outputClass + "]: " + ex.getMessage(), ex)); } catch (JAXBException ex) { - return Publishers.error(new CodecException("Could not instantiate JAXBContext: " + + return Flux.error(new CodecException("Could not instantiate JAXBContext: " + ex.getMessage(), ex)); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Encoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Encoder.java index a3a64e56aed..f518d9f5e26 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Encoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/Jaxb2Encoder.java @@ -26,7 +26,7 @@ import javax.xml.bind.MarshalException; import javax.xml.bind.Marshaller; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import reactor.io.buffer.Buffer; import org.springframework.core.ResolvableType; @@ -54,10 +54,10 @@ public class Jaxb2Encoder extends AbstractEncoder { @Override - public Publisher encode(Publisher messageStream, ResolvableType type, + public Flux encode(Publisher messageStream, ResolvableType type, MimeType mimeType, Object... hints) { - return Publishers.map(messageStream, value -> { + return Flux.from(messageStream).map(value -> { try { Buffer buffer = new Buffer(); BufferOutputStream outputStream = new BufferOutputStream(buffer); diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java index 95889640b49..59598d519eb 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java @@ -25,7 +25,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import reactor.fn.Function; import org.springframework.core.ResolvableType; @@ -95,10 +95,10 @@ public class JsonObjectDecoder extends AbstractDecoder { } @Override - public Publisher decode(Publisher inputStream, ResolvableType type, + public Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { - return Publishers.flatMap(inputStream, new Function>() { + return Flux.from(inputStream).flatMap(new Function>() { int openBraces; int index; @@ -120,13 +120,13 @@ public class JsonObjectDecoder extends AbstractDecoder { } if (this.state == ST_CORRUPTED) { this.input.skipBytes(this.input.readableBytes()); - return Publishers.error(new IllegalStateException("Corrupted stream")); + return Flux.error(new IllegalStateException("Corrupted stream")); } if (this.writerIndex > maxObjectLength) { // buffer size exceeded maxObjectLength; discarding the complete buffer. this.input.skipBytes(this.input.readableBytes()); reset(); - return Publishers.error(new IllegalStateException("object length exceeds " + + return Flux.error(new IllegalStateException("object length exceeds " + maxObjectLength + ": " + this.writerIndex + " bytes discarded")); } for (/* use current index */; this.index < this.writerIndex; this.index++) { @@ -199,7 +199,7 @@ public class JsonObjectDecoder extends AbstractDecoder { } else { this.state = ST_CORRUPTED; - return Publishers.error(new IllegalStateException( + return Flux.error(new IllegalStateException( "invalid JSON received at byte position " + this.index + ": " + ByteBufUtil.hexDump(this.input))); } @@ -208,7 +208,7 @@ public class JsonObjectDecoder extends AbstractDecoder { if (this.input.readableBytes() == 0) { this.index = 0; } - return Publishers.from(chunks); + return Flux.fromIterable(chunks); } /** diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java index ffe5a85d8cd..d00ef799a9c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import reactor.Flux; import reactor.core.subscriber.SubscriberBarrier; import reactor.core.support.BackpressureUtils; import reactor.io.buffer.Buffer; @@ -30,8 +31,6 @@ import reactor.io.buffer.Buffer; import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; -import static reactor.Publishers.lift; - /** * Encode a byte stream of individual JSON element to a byte stream representing * a single JSON array when if it contains more than one element. @@ -49,11 +48,11 @@ public class JsonObjectEncoder extends AbstractEncoder { } @Override - public Publisher encode(Publisher messageStream, + public Flux encode(Publisher messageStream, ResolvableType type, MimeType mimeType, Object... hints) { //noinspection Convert2MethodRef - return lift(messageStream, bbs -> new JsonEncoderBarrier(bbs)); + return Flux.from(messageStream).lift(bbs -> new JsonEncoderBarrier(bbs)); } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java index c2b20fd8b48..5205a1e0d04 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java @@ -21,7 +21,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import reactor.io.buffer.Buffer; import org.springframework.core.ResolvableType; @@ -48,7 +48,7 @@ public class StringDecoder extends AbstractDecoder { } @Override - public Publisher decode(Publisher inputStream, ResolvableType type, + public Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { Charset charset; @@ -58,7 +58,7 @@ public class StringDecoder extends AbstractDecoder { else { charset = DEFAULT_CHARSET; } - return Publishers.map(inputStream, content -> new String(new Buffer(content).asBytes(), charset)); + return Flux.from(inputStream).map(content -> new String(new Buffer(content).asBytes(), charset)); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java index 756638198e8..415e8d9f9a0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java @@ -21,7 +21,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; @@ -49,7 +49,7 @@ public class StringEncoder extends AbstractEncoder { } @Override - public Publisher encode(Publisher elementStream, + public Flux encode(Publisher elementStream, ResolvableType type, MimeType mimeType, Object... hints) { Charset charset; @@ -59,7 +59,7 @@ public class StringEncoder extends AbstractEncoder { else { charset = DEFAULT_CHARSET; } - return Publishers.map(elementStream, s -> ByteBuffer.wrap(s.getBytes(charset))); + return Flux.from(elementStream).map(s -> ByteBuffer.wrap(s.getBytes(charset))); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorStreamConverter.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java rename to spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorStreamConverter.java index 4a59deb976d..9ae1c8d19ce 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorStreamConverter.java @@ -31,7 +31,7 @@ import org.springframework.core.convert.converter.GenericConverter; * @author Stephane Maldini * @author Sebastien Deleuze */ -public final class ReactiveStreamsToReactorConverter implements GenericConverter { +public final class ReactiveStreamsToReactorStreamConverter implements GenericConverter { @Override public Set getConvertibleTypes() { @@ -52,13 +52,13 @@ public final class ReactiveStreamsToReactorConverter implements GenericConverter return source; } else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { - return Streams.wrap((Publisher)source); + return Streams.from((Publisher)source); } else if (Promise.class.isAssignableFrom(source.getClass())) { return source; } else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { - return Streams.wrap((Publisher)source).next(); + return Streams.from((Publisher)source).promise(); } return null; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java index 3cff8d95530..65778e219d7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java @@ -19,6 +19,7 @@ package org.springframework.http; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; +import reactor.Flux; /** * An "reactive" HTTP input message that exposes the input as {@link Publisher}. @@ -34,6 +35,6 @@ public interface ReactiveHttpInputMessage extends HttpMessage { * Return the body of the message as a {@link Publisher}. * @return the body content publisher */ - Publisher getBody(); + Flux getBody(); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 83b3507df4b..1b76212bca4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -19,6 +19,7 @@ package org.springframework.http; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; +import reactor.Mono; /** * A "reactive" HTTP output message that accepts output as a {@link Publisher}. @@ -37,6 +38,6 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { * @param body the body content publisher * @return a publisher that indicates completion or error. */ - Publisher setBody(Publisher body); + Mono setBody(Publisher body); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandler.java index 0050a4c0109..92e2fa2c9df 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandler.java @@ -18,8 +18,7 @@ package org.springframework.http.server.reactive; import java.util.Arrays; import java.util.List; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.util.Assert; @@ -44,13 +43,13 @@ public class ErrorHandlingHttpHandler extends HttpHandlerDecorator { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { - Publisher publisher; + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { + Mono publisher; try { publisher = getDelegate().handle(request, response); } catch (Throwable ex) { - publisher = Publishers.error(ex); + publisher = Mono.error(ex); } for (HttpExceptionHandler handler : this.exceptionHandlers) { publisher = applyExceptionHandler(publisher, handler, request, response); @@ -58,12 +57,10 @@ public class ErrorHandlingHttpHandler extends HttpHandlerDecorator { return publisher; } - private static Publisher applyExceptionHandler(Publisher publisher, + private static Mono applyExceptionHandler(Mono publisher, HttpExceptionHandler handler, ServerHttpRequest request, ServerHttpResponse response) { - return Publishers.onErrorResumeNext(publisher, ex -> { - return handler.handle(request, response, ex); - }); + return publisher.flux().onErrorResumeWith(ex -> handler.handle(request, response, ex)).after(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java index 224e0fcf9d5..80ac46057b5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java @@ -19,8 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.reactivestreams.Publisher; - +import reactor.Mono; /** * {@link HttpHandler} that delegates to a chain of {@link HttpFilter}s followed @@ -40,7 +39,7 @@ public class FilterChainHttpHandler extends HttpHandlerDecorator { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { return new DefaultHttpFilterChain().filter(request, response); } @@ -50,7 +49,7 @@ public class FilterChainHttpHandler extends HttpHandlerDecorator { private int index; @Override - public Publisher filter(ServerHttpRequest request, ServerHttpResponse response) { + public Mono filter(ServerHttpRequest request, ServerHttpResponse response) { if (this.index < filters.size()) { HttpFilter filter = filters.get(this.index++); return filter.filter(request, response, this); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpExceptionHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpExceptionHandler.java index 4fb95c20b60..5258d025646 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpExceptionHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpExceptionHandler.java @@ -15,7 +15,7 @@ */ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; +import reactor.Mono; /** * A contract for resolving exceptions from HTTP request handling. @@ -38,6 +38,6 @@ public interface HttpExceptionHandler { * @param ex the exception to handle * @return Publisher to indicate when exception handling is complete. */ - Publisher handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex); + Mono handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java index 525fb4d6251..b53fb8639b0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive; import org.reactivestreams.Publisher; +import reactor.Mono; /** * Contract for interception-style, chained processing of HTTP requests. @@ -40,7 +41,7 @@ public interface HttpFilter { * @param chain provides a way to delegate to the next HttpFilter. * @return Publisher to indicate when request processing is complete. */ - Publisher filter(ServerHttpRequest request, ServerHttpResponse response, + Mono filter(ServerHttpRequest request, ServerHttpResponse response, HttpFilterChain chain); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java index a18a644f2db..7650014cadb 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java @@ -15,8 +15,7 @@ */ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; - +import reactor.Mono; /** * Represents a chain of {@link HttpFilter}s allowing each {@link HttpFilter} to @@ -32,6 +31,6 @@ public interface HttpFilterChain { * @param response current HTTP response. * @return Publisher to indicate when request handling is complete. */ - Publisher filter(ServerHttpRequest request, ServerHttpResponse response); + Mono filter(ServerHttpRequest request, ServerHttpResponse response); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java index ab0e93197a6..3d90ca52894 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java @@ -16,7 +16,7 @@ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; +import reactor.Mono; /** * Contract for handling HTTP requests in a non-blocking way. @@ -35,6 +35,6 @@ public interface HttpHandler { * @param response current HTTP response. * @return Publisher to indicate when request handling is complete. */ - Publisher handle(ServerHttpRequest request, ServerHttpResponse response); + Mono handle(ServerHttpRequest request, ServerHttpResponse response); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandlerDecorator.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandlerDecorator.java index e8ba2baf773..49190be05d3 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandlerDecorator.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandlerDecorator.java @@ -15,7 +15,7 @@ */ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; +import reactor.Mono; import org.springframework.util.Assert; @@ -40,7 +40,7 @@ public class HttpHandlerDecorator implements HttpHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { return this.delegate.handle(request, response); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/InternalServerErrorExceptionHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/InternalServerErrorExceptionHandler.java index 7c88c8febe5..8d6cc4a22a8 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/InternalServerErrorExceptionHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/InternalServerErrorExceptionHandler.java @@ -15,8 +15,7 @@ */ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.http.HttpStatus; @@ -29,9 +28,9 @@ public class InternalServerErrorExceptionHandler implements HttpExceptionHandler @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); - return Publishers.empty(); + return Mono.empty(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java index b5005a31265..72ddbba8727 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java @@ -15,14 +15,11 @@ */ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; +import reactor.Mono; import reactor.io.buffer.Buffer; import reactor.io.net.ReactiveChannelHandler; import reactor.io.net.http.HttpChannel; -import org.springframework.http.server.reactive.HttpHandler; -import org.springframework.http.server.reactive.ReactorServerHttpRequest; -import org.springframework.http.server.reactive.ReactorServerHttpResponse; import org.springframework.util.Assert; /** @@ -40,7 +37,7 @@ public class ReactorHttpHandlerAdapter } @Override - public Publisher apply(HttpChannel channel) { + public Mono apply(HttpChannel channel) { ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel); ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel); return this.httpHandler.handle(adaptedRequest, adaptedResponse); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index 493e37df65d..6d569c93525 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -19,8 +19,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; import reactor.io.buffer.Buffer; import reactor.io.net.http.HttpChannel; @@ -82,8 +81,8 @@ public class ReactorServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { - return Publishers.map(this.channel.input(), Buffer::byteBuffer); + public Flux getBody() { + return Flux.from(this.channel.input()).map(Buffer::byteBuffer); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index d3fdf10a225..58a8947440e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -19,7 +19,8 @@ import java.nio.ByteBuffer; import java.util.List; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import reactor.io.buffer.Buffer; import reactor.io.net.http.HttpChannel; import reactor.io.net.http.model.Status; @@ -64,12 +65,12 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher setBody(Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + public Mono setBody(Publisher publisher) { + return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after(); } - protected Publisher setBodyInternal(Publisher publisher) { - return getReactorChannel().writeWith(Publishers.map(publisher, Buffer::new)); + protected Mono setBodyInternal(Publisher publisher) { + return Mono.from(getReactorChannel().writeWith(Flux.from(publisher).map(Buffer::new))); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java index 6afad378292..f4ee9f3dcf6 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import org.reactivestreams.Publisher; +import reactor.Flux; import reactor.core.publisher.convert.RxJava1Converter; import rx.Observable; @@ -85,7 +86,7 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { + public Flux getBody() { Observable content = this.getRxNettyRequest().getContent().map(ByteBuf::nioBuffer); content = content.concatWith(Observable.empty()); // See GH issue #58 return RxJava1Converter.from(content); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index aec66802888..05ed55e8c3d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -22,7 +22,8 @@ import java.util.List; import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import reactor.core.publisher.convert.RxJava1Converter; import rx.Observable; @@ -66,14 +67,14 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher setBody(Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + public Mono setBody(Publisher publisher) { + return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after(); } - protected Publisher setBodyInternal(Publisher publisher) { + protected Mono setBodyInternal(Publisher publisher) { Observable content = RxJava1Converter.from(publisher).map(this::toBytes); Observable completion = getRxNettyResponse().writeBytes(content); - return RxJava1Converter.from(completion); + return RxJava1Converter.from(completion).after(); } private byte[] toBytes(ByteBuffer buffer) { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 97bf0d7f4b1..41b229c6bc2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.Mono; import org.springframework.http.HttpStatus; import org.springframework.util.Assert; @@ -73,7 +74,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(synchronizer); ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse, - publisher -> subscriber -> publisher.subscribe(responseBodySubscriber)); + publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber))); servletResponse.getOutputStream().setWriteListener(responseBodySubscriber); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(synchronizer, response); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 3f1eb1c69ca..d5c14618608 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -46,14 +47,14 @@ public class ServletServerHttpRequest implements ServerHttpRequest { private HttpHeaders headers; - private final Publisher requestBodyPublisher; + private final Flux requestBodyPublisher; public ServletServerHttpRequest(HttpServletRequest request, Publisher body) { Assert.notNull(request, "'request' must not be null."); Assert.notNull(body, "'body' must not be null."); this.request = request; - this.requestBodyPublisher = body; + this.requestBodyPublisher = Flux.from(body); } @@ -126,7 +127,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { + public Flux getBody() { return this.requestBodyPublisher; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 19e10d31289..0fc838af418 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -22,7 +22,8 @@ import java.util.function.Function; import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import org.springframework.http.ExtendedHttpHeaders; import org.springframework.http.HttpHeaders; @@ -38,13 +39,13 @@ public class ServletServerHttpResponse implements ServerHttpResponse { private final HttpServletResponse response; - private final Function, Publisher> responseBodyWriter; + private final Function, Mono> responseBodyWriter; private final HttpHeaders headers; public ServletServerHttpResponse(HttpServletResponse response, - Function, Publisher> responseBodyWriter) { + Function, Mono> responseBodyWriter) { Assert.notNull(response, "'response' must not be null"); Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null"); @@ -69,11 +70,11 @@ public class ServletServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher setBody(final Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + public Mono setBody(final Publisher publisher) { + return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after(); } - protected Publisher setBodyInternal(Publisher publisher) { + protected Mono setBodyInternal(Publisher publisher) { return this.responseBodyWriter.apply(publisher); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index db61960ef66..0e4f86f60dd 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -35,7 +35,8 @@ import org.reactivestreams.Subscription; import org.xnio.ChannelListener; import org.xnio.channels.StreamSinkChannel; import org.xnio.channels.StreamSourceChannel; -import reactor.core.error.SpecificationExceptions; +import reactor.Mono; +import reactor.core.error.Exceptions; import reactor.core.subscriber.BaseSubscriber; import reactor.core.support.BackpressureUtils; @@ -72,7 +73,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(exchange); ServerHttpResponse response = new UndertowServerHttpResponse(exchange, - publisher -> subscriber -> publisher.subscribe(responseBodySubscriber)); + publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber))); exchange.dispatch(); @@ -126,7 +127,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @Override public void subscribe(Subscriber subscriber) { if (subscriber == null) { - throw SpecificationExceptions.spec_2_13_exception(); + throw Exceptions.spec_2_13_exception(); } if (this.subscriber != null) { subscriber.onError(new IllegalStateException("Only one subscriber allowed")); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 9b710945fd3..98c766e5898 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import io.undertow.server.HttpServerExchange; import io.undertow.util.HeaderValues; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -42,14 +43,14 @@ public class UndertowServerHttpRequest implements ServerHttpRequest { private HttpHeaders headers; - private final Publisher body; + private final Flux body; public UndertowServerHttpRequest(HttpServerExchange exchange, Publisher body) { Assert.notNull(exchange, "'exchange' is required."); Assert.notNull(exchange, "'body' is required."); this.exchange = exchange; - this.body = body; + this.body = Flux.from(body); } @@ -91,7 +92,7 @@ public class UndertowServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { + public Flux getBody() { return this.body; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index d13b80eda2a..2d303c4cd02 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -23,7 +23,8 @@ import java.util.function.Function; import io.undertow.server.HttpServerExchange; import io.undertow.util.HttpString; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import org.springframework.http.ExtendedHttpHeaders; import org.springframework.http.HttpHeaders; @@ -40,13 +41,13 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { private final HttpServerExchange exchange; - private final Function, Publisher> responseBodyWriter; + private final Function, Mono> responseBodyWriter; private final HttpHeaders headers; public UndertowServerHttpResponse(HttpServerExchange exchange, - Function, Publisher> responseBodyWriter) { + Function, Mono> responseBodyWriter) { Assert.notNull(exchange, "'exchange' is required."); Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null"); @@ -72,11 +73,11 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher setBody(Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + public Mono setBody(Publisher publisher) { + return Flux.from(publisher).lift(new WriteWithOperator<>(this::setBodyInternal)).after(); } - protected Publisher setBodyInternal(Publisher publisher) { + protected Mono setBodyInternal(Publisher publisher) { return this.responseBodyWriter.apply(publisher); } diff --git a/spring-web-reactive/src/main/java/org/springframework/util/ByteBufferPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/util/ByteBufferPublisherInputStream.java index 26d2e5ee41e..10aa4c23d27 100644 --- a/spring-web-reactive/src/main/java/org/springframework/util/ByteBufferPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/util/ByteBufferPublisherInputStream.java @@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; -import reactor.Publishers; +import reactor.rx.Streams; /** * {@code InputStream} implementation based on a byte array {@link Publisher}. @@ -60,7 +60,7 @@ public class ByteBufferPublisherInputStream extends InputStream { public ByteBufferPublisherInputStream(Publisher publisher, int requestSize) { Assert.notNull(publisher, "'publisher' must not be null"); - this.queue = Publishers.toReadQueue(publisher, requestSize); + this.queue = Streams.from(publisher).toBlockingQueue(requestSize); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java index ee3bfe70e3e..93e1aa30317 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java @@ -23,10 +23,8 @@ import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.Publishers; -import reactor.fn.BiConsumer; +import reactor.Flux; +import reactor.Mono; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactoryUtils; @@ -114,42 +112,25 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { if (logger.isDebugEnabled()) { logger.debug("Processing " + request.getMethod() + " request for [" + request.getURI() + "]"); } - - Publisher mappings = Publishers.from(this.handlerMappings); - Publisher handlerPublisher = Publishers.concatMap(mappings, m -> m.getHandler(request)); - handlerPublisher = first(handlerPublisher); - - Publisher resultPublisher = Publishers.concatMap(handlerPublisher, handler -> { - HandlerAdapter handlerAdapter = getHandlerAdapter(handler); - return handlerAdapter.handle(request, response, handler); - }); - - Publisher completionPublisher = Publishers.concatMap(resultPublisher, result -> { - Publisher publisher; - if (result.hasError()) { - publisher = Publishers.error(result.getError()); - } - else { - HandlerResultHandler handler = getResultHandler(result); - publisher = handler.handleResult(request, response, result); - } - if (result.hasExceptionMapper()) { - return Publishers.onErrorResumeNext(publisher, ex -> { - return Publishers.concatMap(result.getExceptionMapper().apply(ex), - errorResult -> { - HandlerResultHandler handler = getResultHandler(errorResult); - return handler.handleResult(request, response, errorResult); - }); - }); - } - return publisher; - }); - - return mapError(completionPublisher, this.errorMapper); + return Flux.fromIterable(this.handlerMappings) + .concatMap(m -> m.getHandler(request)) + .next() + .then(handler -> getHandlerAdapter(handler).handle(request, response, handler)) + .then(result -> { + Mono publisher = (result.hasError() ? Mono.error(result.getError()) : + getResultHandler(result).handleResult(request, response, result)); + if (result.hasExceptionMapper()) { + return publisher + .otherwise(ex -> result.getExceptionMapper().apply(ex) + .then(errorResult -> getResultHandler(errorResult).handleResult(request, response, errorResult))); + } + return publisher; + }) + .otherwise(ex -> Mono.error(this.errorMapper.apply(ex))); } protected HandlerAdapter getHandlerAdapter(Object handler) { @@ -171,22 +152,6 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware { } - private static Publisher first(Publisher source) { - return Publishers.lift(source, (e, subscriber) -> { - subscriber.onNext(e); - subscriber.onComplete(); - }); - } - - private static Publisher mapError(Publisher source, Function function) { - return Publishers.lift(source, null, new BiConsumer>() { - @Override - public void accept(Throwable throwable, Subscriber subscriber) { - subscriber.onError(function.apply(throwable)); - } - }, null); - } - private static class NotFoundHandlerMapping implements HandlerMapping { @SuppressWarnings("ThrowableInstanceNeverThrown") @@ -194,8 +159,8 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware { @Override - public Publisher getHandler(ServerHttpRequest request) { - return Publishers.error(HANDLER_NOT_FOUND_EXCEPTION); + public Mono getHandler(ServerHttpRequest request) { + return Mono.error(HANDLER_NOT_FOUND_EXCEPTION); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java index 40f0c7251f2..c6f5c3541c3 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive; import org.reactivestreams.Publisher; +import reactor.Mono; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; @@ -54,7 +55,7 @@ public interface HandlerAdapter { * returned {@code true}. * @return A {@link Publisher} object that produces a single {@link HandlerResult} element */ - Publisher handle(ServerHttpRequest request, ServerHttpResponse response, + Mono handle(ServerHttpRequest request, ServerHttpResponse response, Object handler); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java index 1f399035a1f..ae276d62300 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java @@ -16,7 +16,7 @@ package org.springframework.web.reactive; -import org.reactivestreams.Publisher; +import reactor.Mono; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -32,8 +32,8 @@ public interface HandlerMapping { /** * Return a handler for this request. * @param request current HTTP request - * @return A {@link Publisher} object that produces a single handler element + * @return A {@link Mono} object that produces a single handler element */ - Publisher getHandler(ServerHttpRequest request); + Mono getHandler(ServerHttpRequest request); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResult.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResult.java index 4a6bae92abe..ff84c5caa57 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResult.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResult.java @@ -17,10 +17,8 @@ package org.springframework.web.reactive; import java.util.function.Function; -import java.util.logging.Handler; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.core.ResolvableType; import org.springframework.util.Assert; @@ -40,7 +38,7 @@ public class HandlerResult { private final Throwable error; - private Function> exceptionMapper; + private Function> exceptionMapper; public HandlerResult(Object handler, Object result, ResolvableType resultType) { @@ -95,12 +93,12 @@ public class HandlerResult { * that results in an error. * @param function the exception resolving function */ - public HandlerResult setExceptionMapper(Function> function) { + public HandlerResult setExceptionMapper(Function> function) { this.exceptionMapper = function; return this; } - public Function> getExceptionMapper() { + public Function> getExceptionMapper() { return this.exceptionMapper; } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java index 0fc158c1209..e121bdff985 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java @@ -16,7 +16,7 @@ package org.springframework.web.reactive; -import org.reactivestreams.Publisher; +import reactor.Mono; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; @@ -42,13 +42,13 @@ public interface HandlerResultHandler { * Process the given result in an asynchronous non blocking way, by eventually modifying * response headers, or writing some data stream into the response. * Implementations should not throw exceptions but signal them via the returned - * {@code Publisher}. + * {@code Mono}. * - * @return A {@code Publisher} used to signal the demand, and receive a notification + * @return A {@code Mono} used to signal the demand, and receive a notification * when the handling is complete (success or error) including the flush of the data on the * network. */ - Publisher handleResult(ServerHttpRequest request, ServerHttpResponse response, + Mono handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result); } \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/ResponseStatusExceptionHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/ResponseStatusExceptionHandler.java index f9e3135532b..713a6928710 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/ResponseStatusExceptionHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/ResponseStatusExceptionHandler.java @@ -15,8 +15,7 @@ */ package org.springframework.web.reactive; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.http.server.reactive.HttpExceptionHandler; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -32,12 +31,12 @@ public class ResponseStatusExceptionHandler implements HttpExceptionHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { if (ex instanceof ResponseStatusException) { response.setStatusCode(((ResponseStatusException) ex).getHttpStatus()); - return Publishers.empty(); + return Mono.empty(); } - return Publishers.error(ex); + return Mono.error(ex); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java index 34ae1fcf986..853f522f754 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java @@ -17,7 +17,7 @@ package org.springframework.web.reactive.handler; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.core.ResolvableType; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -50,12 +50,12 @@ public class HttpHandlerAdapter implements HandlerAdapter { } @Override - public Publisher handle(ServerHttpRequest request, + public Mono handle(ServerHttpRequest request, ServerHttpResponse response, Object handler) { HttpHandler httpHandler = (HttpHandler)handler; - Publisher completion = httpHandler.handle(request, response); - return Publishers.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID)); + Mono completion = httpHandler.handle(request, response); + return Mono.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID)); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java index efa8d7e8319..0e27adc704b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java @@ -17,7 +17,7 @@ package org.springframework.web.reactive.handler; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.core.Ordered; import org.springframework.core.ResolvableType; @@ -74,17 +74,17 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler @SuppressWarnings("unchecked") @Override - public Publisher handleResult(ServerHttpRequest request, + public Mono handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) { Object value = result.getResult(); if (Void.TYPE.equals(result.getResultType().getRawClass())) { - return Publishers.empty(); + return Mono.empty(); } - return (value instanceof Publisher ? (Publisher)value : - this.conversionService.convert(value, Publisher.class)); + return (value instanceof Mono ? (Mono)value : + Mono.from(this.conversionService.convert(value, Publisher.class))); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java index b57ede729a1..0ef5e9c286a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java @@ -19,8 +19,8 @@ package org.springframework.web.reactive.handler; import java.util.HashMap; import java.util.Map; -import org.reactivestreams.Publisher; -import reactor.core.publisher.PublisherFactory; +import reactor.Flux; +import reactor.Mono; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.web.reactive.HandlerMapping; @@ -42,15 +42,15 @@ public class SimpleUrlHandlerMapping implements HandlerMapping { @Override - public Publisher getHandler(ServerHttpRequest request) { - return PublisherFactory.create(subscriber -> { + public Mono getHandler(ServerHttpRequest request) { + return Flux.create(subscriber -> { String path = request.getURI().getPath(); Object handler = this.handlerMap.get(path); if (handler != null) { subscriber.onNext(handler); } subscriber.onComplete(); - }); + }).next(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java index 1df3064b1fd..f7c9c6e1902 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java @@ -16,7 +16,7 @@ package org.springframework.web.reactive.method; -import org.reactivestreams.Publisher; +import reactor.Mono; import org.springframework.core.MethodParameter; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -39,6 +39,6 @@ public interface HandlerMethodArgumentResolver { * resolve to any value which will result in passing {@code null} as the * argument value. */ - Publisher resolveArgument(MethodParameter parameter, ServerHttpRequest request); + Mono resolveArgument(MethodParameter parameter, ServerHttpRequest request); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java index 5cebab6252f..a1c3babd4a3 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java @@ -21,15 +21,13 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import reactor.fn.tuple.Tuple; -import reactor.rx.Streams; import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.GenericTypeResolver; @@ -48,7 +46,7 @@ import org.springframework.web.reactive.HandlerResult; */ public class InvocableHandlerMethod extends HandlerMethod { - public static final Publisher NO_ARGS = Publishers.just(new Object[0]); + public static final Mono NO_ARGS = Mono.just(new Object[0]); private final static Object NO_VALUE = new Object(); @@ -86,38 +84,37 @@ public class InvocableHandlerMethod extends HandlerMethod { * @return Publisher that produces a single HandlerResult or an error signal; * never throws an exception. */ - public Publisher invokeForRequest(ServerHttpRequest request, Object... providedArgs) { + public Mono invokeForRequest(ServerHttpRequest request, Object... providedArgs) { - Publisher argsPublisher = NO_ARGS; + Mono argsPublisher = NO_ARGS; try { if (!ObjectUtils.isEmpty(getMethodParameters())) { - List> publishers = resolveArguments(request, providedArgs); - argsPublisher = Publishers.zip(publishers, this::initArgs); - argsPublisher = first(argsPublisher); + List> publishers = resolveArguments(request, providedArgs); + argsPublisher = Flux.zip(publishers, this::initArgs).next(); } } catch (Throwable ex) { - return Publishers.error(ex); + return Mono.error(ex); } - return Publishers.concatMap(argsPublisher, args -> { + return Flux.from(argsPublisher).concatMap(args -> { try { Object value = doInvoke(args); ResolvableType type = ResolvableType.forMethodParameter(getReturnType()); HandlerResult handlerResult = new HandlerResult(this, value, type); - return Publishers.just(handlerResult); + return Mono.just(handlerResult); } catch (InvocationTargetException ex) { - return Publishers.error(ex.getTargetException()); + return Mono.error(ex.getTargetException()); } catch (Throwable ex) { String s = getInvocationErrorMessage(args); - return Publishers.error(new IllegalStateException(s)); + return Mono.error(new IllegalStateException(s)); } - }); + }).next(); } - private List> resolveArguments(ServerHttpRequest request, Object... providedArgs) { + private List> resolveArguments(ServerHttpRequest request, Object... providedArgs) { return Stream.of(getMethodParameters()) .map(parameter -> { parameter.initParameterNameDiscovery(this.parameterNameDiscoverer); @@ -125,7 +122,7 @@ public class InvocableHandlerMethod extends HandlerMethod { if (!ObjectUtils.isEmpty(providedArgs)) { for (Object providedArg : providedArgs) { if (parameter.getParameterType().isInstance(providedArg)) { - return Publishers.just(providedArg); + return Mono.just(providedArg); } } } @@ -134,9 +131,10 @@ public class InvocableHandlerMethod extends HandlerMethod { .findFirst() .orElseThrow(() -> getArgError("No resolver for ", parameter, null)); try { - Publisher publisher = resolver.resolveArgument(parameter, request); - publisher = mapError(publisher, ex -> getArgError("Error resolving ", parameter, ex)); - return Streams.wrap(publisher).defaultIfEmpty(NO_VALUE); + return resolver.resolveArgument(parameter, request) + // TODO Add a defaultIfEmpty alias to Mono to avoid conversion to Flux + .flux().defaultIfEmpty(NO_VALUE).next() + .otherwise(ex -> Mono.error(getArgError("Error resolving ", parameter, ex))); } catch (Exception ex) { throw getArgError("Error resolving ", parameter, ex); @@ -180,17 +178,4 @@ public class InvocableHandlerMethod extends HandlerMethod { return Stream.of(tuple.toArray()).map(o -> o != NO_VALUE ? o : null).toArray(); } - - private static Publisher first(Publisher source) { - return Publishers.lift(source, (e, subscriber) -> { - subscriber.onNext(e); - subscriber.onComplete(); - }); - } - - private static Publisher mapError(Publisher source, Function function) { - return Publishers.lift(source, null, - (throwable, subscriber) -> subscriber.onError(function.apply(throwable)), null); - } - } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java index ec44088220d..2bb4cdafa17 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java @@ -20,7 +20,8 @@ import java.nio.ByteBuffer; import java.util.List; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; @@ -57,14 +58,14 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve } @Override - public Publisher resolveArgument(MethodParameter parameter, ServerHttpRequest request) { + public Mono resolveArgument(MethodParameter parameter, ServerHttpRequest request) { MediaType mediaType = request.getHeaders().getContentType(); if (mediaType == null) { mediaType = MediaType.APPLICATION_OCTET_STREAM; } ResolvableType type = ResolvableType.forMethodParameter(parameter); - Publisher body = request.getBody(); - Publisher elementStream = body; + Flux body = request.getBody(); + Flux elementStream = body; ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type; Decoder decoder = resolveDecoder(elementType, mediaType); @@ -73,10 +74,10 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve } if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) { - return Publishers.just(this.conversionService.convert(elementStream, type.getRawClass())); + return Mono.just(this.conversionService.convert(elementStream, type.getRawClass())); } - return Publishers.map(elementStream, element -> element); + return (Mono)Mono.from(elementStream); } private Decoder resolveDecoder(ResolvableType type, MediaType mediaType, Object... hints) { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java index de6106177dd..d1409e4e939 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java @@ -25,8 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.codec.Decoder; @@ -106,7 +106,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin } @Override - public Publisher handle(ServerHttpRequest request, + public Mono handle(ServerHttpRequest request, ServerHttpResponse response, Object handler) { HandlerMethod handlerMethod = (HandlerMethod) handler; @@ -114,20 +114,18 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod); invocable.setHandlerMethodArgumentResolvers(this.argumentResolvers); - Publisher publisher = invocable.invokeForRequest(request); + Flux publisher = invocable.invokeForRequest(request).flux(); + publisher = publisher.onErrorResumeWith(ex -> Mono.just(new HandlerResult(handler, ex))); - publisher = Publishers.onErrorResumeNext(publisher, ex -> { - return Publishers.just(new HandlerResult(handler, ex)); - }); - publisher = Publishers.map(publisher, + publisher = publisher.map( result -> result.setExceptionMapper( - ex -> mapException((Exception) ex, handlerMethod, request, response))); + ex -> mapException(ex, handlerMethod, request, response))); - return publisher; + return publisher.next(); } - private Publisher mapException(Throwable ex, HandlerMethod handlerMethod, + private Mono mapException(Throwable ex, HandlerMethod handlerMethod, ServerHttpRequest request, ServerHttpResponse response) { if (ex instanceof Exception) { @@ -147,7 +145,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin } } } - return Publishers.error(ex); + return Mono.error(ex); } protected InvocableHandlerMethod findExceptionHandler(HandlerMethod handlerMethod, Exception exception) { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java index f19f7f96cf1..476b0bc8f97 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java @@ -27,8 +27,8 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; -import reactor.core.publisher.PublisherFactory; +import reactor.Flux; +import reactor.Mono; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; @@ -94,8 +94,8 @@ public class RequestMappingHandlerMapping implements HandlerMapping, } @Override - public Publisher getHandler(ServerHttpRequest request) { - return PublisherFactory.create(subscriber -> { + public Mono getHandler(ServerHttpRequest request) { + return Flux.create(subscriber -> { for (Map.Entry entry : this.methodMap.entrySet()) { RequestMappingInfo info = entry.getKey(); if (info.matchesRequest(request)) { @@ -109,7 +109,7 @@ public class RequestMappingHandlerMapping implements HandlerMapping, } } subscriber.onComplete(); - }); + }).next(); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java index b50c5ebe8c2..8ba175976bd 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java @@ -16,11 +16,7 @@ package org.springframework.web.reactive.method.annotation; - -import java.util.Optional; - -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.core.MethodParameter; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -45,12 +41,12 @@ public class RequestParamArgumentResolver implements HandlerMethodArgumentResolv @Override - public Publisher resolveArgument(MethodParameter param, ServerHttpRequest request) { + public Mono resolveArgument(MethodParameter param, ServerHttpRequest request) { RequestParam annotation = param.getParameterAnnotation(RequestParam.class); String name = (annotation.value().length() != 0 ? annotation.value() : param.getParameterName()); UriComponents uriComponents = UriComponentsBuilder.fromUri(request.getURI()).build(); String value = uriComponents.getQueryParams().getFirst(name); - return (value != null ? Publishers.just(value) : Publishers.empty()); + return (value != null ? Mono.just(value) : Mono.empty()); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java index e52099f27dd..05e3a2adc06 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java @@ -28,7 +28,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import org.springframework.core.Ordered; import org.springframework.core.ResolvableType; @@ -127,12 +127,12 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered @Override @SuppressWarnings("unchecked") - public Publisher handleResult(ServerHttpRequest request, + public Mono handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) { Object value = result.getResult(); if (value == null) { - return Publishers.empty(); + return Mono.empty(); } Publisher publisher; @@ -143,7 +143,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered elementType = returnType.getGeneric(0); } else { - publisher = Publishers.just(value); + publisher = Mono.just(value); elementType = returnType; } @@ -163,7 +163,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered } } if (compatibleMediaTypes.isEmpty()) { - return Publishers.error(new HttpMediaTypeNotAcceptableException(producibleMediaTypes)); + return Mono.error(new HttpMediaTypeNotAcceptableException(producibleMediaTypes)); } List mediaTypes = new ArrayList<>(compatibleMediaTypes); @@ -189,7 +189,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered } } - return Publishers.error(new HttpMediaTypeNotAcceptableException(this.allMediaTypes)); + return Mono.error(new HttpMediaTypeNotAcceptableException(this.allMediaTypes)); } private List getAcceptableMediaTypes(ServerHttpRequest request) { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/EchoHandler.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/EchoHandler.java index 684f1f52be6..c6f7a7b28ad 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/EchoHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/EchoHandler.java @@ -16,11 +16,7 @@ package org.springframework.http.server.reactive; -import org.reactivestreams.Publisher; - -import org.springframework.http.server.reactive.HttpHandler; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import reactor.Mono; /** * @author Arjen Poutsma @@ -28,7 +24,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse; public class EchoHandler implements HttpHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { return response.setBody(request.getBody()); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandlerTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandlerTests.java index ce13fa5694a..925eeacc7c2 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandlerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlingHttpHandlerTests.java @@ -17,12 +17,11 @@ package org.springframework.http.server.reactive; import java.net.URI; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import reactor.rx.Streams; import reactor.rx.stream.Signal; @@ -56,8 +55,7 @@ public class ErrorHandlingHttpHandlerTests { HttpHandler targetHandler = new TestHttpHandler(new IllegalStateException("boo")); HttpHandler handler = new ErrorHandlingHttpHandler(targetHandler, exceptionHandler); - Publisher publisher = handler.handle(this.request, this.response); - Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + handler.handle(this.request, this.response).get(); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus()); } @@ -73,8 +71,7 @@ public class ErrorHandlingHttpHandlerTests { HttpHandler targetHandler = new TestHttpHandler(new IllegalStateException("boo")); HttpHandler httpHandler = new ErrorHandlingHttpHandler(targetHandler, exceptionHandlers); - Publisher publisher = httpHandler.handle(this.request, this.response); - Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + httpHandler.handle(this.request, this.response).get(); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus()); } @@ -98,15 +95,14 @@ public class ErrorHandlingHttpHandlerTests { HttpHandler targetHandler = new TestHttpHandler(new IllegalStateException("boo"), true); HttpHandler handler = new ErrorHandlingHttpHandler(targetHandler, exceptionHandler); - Publisher publisher = handler.handle(this.request, this.response); - Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + handler.handle(this.request, this.response).get(); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus()); } private Throwable awaitErrorSignal(Publisher publisher) throws Exception { - Signal signal = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS).get(0); + Signal signal = Streams.from(publisher).materialize().toList().get().get(0); assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType()); return signal.getThrowable(); } @@ -129,11 +125,11 @@ public class ErrorHandlingHttpHandlerTests { } @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.raise) { throw this.exception; } - return Publishers.error(this.exception); + return Mono.error(this.exception); } } @@ -142,8 +138,8 @@ public class ErrorHandlingHttpHandlerTests { private static class UnresolvedExceptionHandler implements HttpExceptionHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { - return Publishers.error(ex); + public Mono handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { + return Mono.error(ex); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FilterChainHttpHandlerTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FilterChainHttpHandlerTests.java index dcf1fff42db..3f88b4b97a1 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FilterChainHttpHandlerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FilterChainHttpHandlerTests.java @@ -16,15 +16,11 @@ package org.springframework.http.server.reactive; -import java.util.concurrent.TimeUnit; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Before; import org.junit.Test; -import org.reactivestreams.Publisher; -import reactor.Publishers; -import reactor.rx.Streams; +import reactor.Mono; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -57,8 +53,7 @@ public class FilterChainHttpHandlerTests { TestFilter filter3 = new TestFilter(); FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler, filter1, filter2, filter3); - Publisher voidPublisher = filterHandler.handle(this.request, this.response); - Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS); + filterHandler.handle(this.request, this.response).get(); assertTrue(filter1.invoked()); assertTrue(filter2.invoked()); @@ -71,8 +66,7 @@ public class FilterChainHttpHandlerTests { StubHandler handler = new StubHandler(); FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler); - Publisher voidPublisher = filterHandler.handle(this.request, this.response); - Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS); + filterHandler.handle(this.request, this.response).get(); assertTrue(handler.invoked()); } @@ -85,8 +79,7 @@ public class FilterChainHttpHandlerTests { TestFilter filter3 = new TestFilter(); FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler, filter1, filter2, filter3); - Publisher voidPublisher = filterHandler.handle(this.request, this.response); - Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS); + filterHandler.handle(this.request, this.response).get(); assertTrue(filter1.invoked()); assertTrue(filter2.invoked()); @@ -100,8 +93,7 @@ public class FilterChainHttpHandlerTests { AsyncFilter filter = new AsyncFilter(); FilterChainHttpHandler filterHandler = new FilterChainHttpHandler(handler, filter); - Publisher voidPublisher = filterHandler.handle(this.request, this.response); - Streams.wrap(voidPublisher).toList().await(10, TimeUnit.SECONDS); + filterHandler.handle(this.request, this.response).get(); assertTrue(filter.invoked()); assertTrue(handler.invoked()); @@ -119,14 +111,14 @@ public class FilterChainHttpHandlerTests { } @Override - public Publisher filter(ServerHttpRequest req, ServerHttpResponse res, + public Mono filter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { this.invoked = true; return doFilter(req, res, chain); } - public Publisher doFilter(ServerHttpRequest req, ServerHttpResponse res, + public Mono doFilter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { return chain.filter(req, res); @@ -136,25 +128,25 @@ public class FilterChainHttpHandlerTests { private static class ShortcircuitingFilter extends TestFilter { @Override - public Publisher doFilter(ServerHttpRequest req, ServerHttpResponse res, + public Mono doFilter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { - return Publishers.empty(); + return Mono.empty(); } } private static class AsyncFilter extends TestFilter { @Override - public Publisher doFilter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { - return Publishers.concatMap(doAsyncWork(), asyncResult -> { + public Mono doFilter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { + return doAsyncWork().then(asyncResult -> { logger.debug("Async result: " + asyncResult); return chain.filter(req, res); }); } - private Publisher doAsyncWork() { - return Publishers.just("123"); + private Mono doAsyncWork() { + return Mono.just("123"); } } @@ -168,10 +160,10 @@ public class FilterChainHttpHandlerTests { } @Override - public Publisher handle(ServerHttpRequest req, ServerHttpResponse res) { + public Mono handle(ServerHttpRequest req, ServerHttpResponse res) { logger.trace("StubHandler invoked."); this.invoked = true; - return Publishers.empty(); + return Mono.empty(); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpRequest.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpRequest.java index c5ffbc83fa9..db4412c6f5d 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpRequest.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpRequest.java @@ -19,6 +19,7 @@ import java.net.URI; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; +import reactor.Flux; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -34,7 +35,7 @@ public class MockServerHttpRequest implements ServerHttpRequest { private HttpHeaders headers = new HttpHeaders(); - private Publisher body; + private Flux body; public MockServerHttpRequest(HttpMethod httpMethod, URI uri) { @@ -43,7 +44,7 @@ public class MockServerHttpRequest implements ServerHttpRequest { } public MockServerHttpRequest(Publisher body, HttpMethod httpMethod, URI uri) { - this.body = body; + this.body = Flux.from(body); this.httpMethod = httpMethod; this.uri = uri; } @@ -77,11 +78,11 @@ public class MockServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { + public Flux getBody() { return this.body; } public void setBody(Publisher body) { - this.body = body; + this.body = Flux.from(body); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java index d478d567029..0a282a95b34 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java @@ -18,7 +18,8 @@ package org.springframework.http.server.reactive; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -50,9 +51,9 @@ public class MockServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher setBody(Publisher body) { + public Mono setBody(Publisher body) { this.body = body; - return Publishers.concatMap(body, b -> Publishers.empty()); + return Flux.from(body).after(); } public Publisher getBody() { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandler.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandler.java index 97af86dc70d..477114c8921 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandler.java @@ -21,11 +21,10 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.Mono; import reactor.io.buffer.Buffer; -import reactor.rx.Streams; import static org.junit.Assert.assertEquals; @@ -41,7 +40,7 @@ public class RandomHandler implements HttpHandler { private final Random rnd = new Random(); @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { request.getBody().subscribe(new Subscriber() { private Subscription s; @@ -73,7 +72,7 @@ public class RandomHandler implements HttpHandler { }); response.getHeaders().setContentLength(RESPONSE_SIZE); - return response.setBody(Streams.just(ByteBuffer.wrap(randomBytes()))); + return response.setBody(Mono.just(ByteBuffer.wrap(randomBytes()))); } private byte[] randomBytes() { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java index 2ad1ddd3d47..12b06a3d2e4 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java @@ -27,14 +27,12 @@ import org.junit.Test; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.Publishers; -import reactor.core.publisher.PublisherFactory; +import reactor.Flux; import reactor.core.subscriber.SubscriberBarrier; import reactor.rx.Streams; import reactor.rx.stream.Signal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -58,8 +56,8 @@ public class WriteWithOperatorTests { @Test public void errorBeforeFirstItem() throws Exception { IllegalStateException error = new IllegalStateException("boo"); - Publisher completion = Publishers.lift(Publishers.error(error), this.operator); - List> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS); + Publisher completion = Flux.error(error).lift(this.operator); + List> signals = Streams.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable()); @@ -67,8 +65,8 @@ public class WriteWithOperatorTests { @Test public void completionBeforeFirstItem() throws Exception { - Publisher completion = Publishers.lift(Publishers.empty(), this.operator); - List> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS); + Publisher completion = Flux.empty().lift(this.operator); + List> signals = Streams.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); @@ -79,8 +77,8 @@ public class WriteWithOperatorTests { @Test public void writeOneItem() throws Exception { - Publisher completion = Publishers.lift(Publishers.just("one"), this.operator); - List> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS); + Publisher completion = Flux.just("one").lift(this.operator); + List> signals = Streams.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); @@ -94,8 +92,8 @@ public class WriteWithOperatorTests { @Test public void writeMultipleItems() throws Exception { List items = Arrays.asList("one", "two", "three"); - Publisher completion = Publishers.lift(Publishers.from(items), this.operator); - List> signals = Streams.wrap(completion).materialize().consumeAsList().await(5, TimeUnit.SECONDS); + Publisher completion = Flux.fromIterable(items).lift(this.operator); + List> signals = Streams.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); @@ -110,15 +108,15 @@ public class WriteWithOperatorTests { @Test public void errorAfterMultipleItems() throws Exception { IllegalStateException error = new IllegalStateException("boo"); - Publisher publisher = PublisherFactory.create(subscriber -> { + Flux publisher = Flux.create(subscriber -> { int i = subscriber.context().incrementAndGet(); subscriber.onNext(String.valueOf(i)); if (i == 3) { subscriber.onError(error); } }, subscriber -> new AtomicInteger()); - Publisher completion = Publishers.lift(publisher, this.operator); - List> signals = Streams.wrap(completion).materialize().toList().await(5, TimeUnit.SECONDS); + Publisher completion = publisher.lift(this.operator); + List> signals = Streams.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable()); diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/XmlHandler.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/XmlHandler.java index 838d1ad8776..dd1a4278de9 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/XmlHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/XmlHandler.java @@ -22,7 +22,7 @@ import javax.xml.bind.Unmarshaller; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; +import reactor.Mono; import reactor.io.buffer.Buffer; import reactor.rx.Streams; @@ -40,7 +40,7 @@ public class XmlHandler implements HttpHandler { private static final Log logger = LogFactory.getLog(XmlHandler.class); @Override - public Publisher handle(ServerHttpRequest request, + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { try { JAXBContext jaxbContext = JAXBContext.newInstance(XmlHandlerIntegrationTests.Person.class); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java index 543fd60f684..a934afd7498 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java @@ -49,8 +49,8 @@ public class ByteBufferDecoderTests { ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer(); ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer(); Stream source = Streams.just(fooBuffer, barBuffer); - List results = Streams.wrap(decoder.decode(source, - ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await(); + List results = Streams.from(decoder.decode(source, + ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().get(); assertEquals(2, results.size()); assertEquals(fooBuffer, results.get(0)); assertEquals(barBuffer, results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JacksonJsonDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JacksonJsonDecoderTests.java index adc55f2e457..8059a681aef 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JacksonJsonDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JacksonJsonDecoderTests.java @@ -48,8 +48,8 @@ public class JacksonJsonDecoderTests { @Test public void decode() throws InterruptedException { Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, ResolvableType.forClass(Pojo.class), null)) - .toList().await(); + List results = Streams.from(decoder.decode(source, ResolvableType.forClass(Pojo.class), null)) + .toList().get(); assertEquals(1, results.size()); assertEquals("foofoo", ((Pojo) results.get(0)).getFoo()); } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/Jaxb2DecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/Jaxb2DecoderTests.java index 110db957994..65b23e4ac3f 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/Jaxb2DecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/Jaxb2DecoderTests.java @@ -49,8 +49,8 @@ public class Jaxb2DecoderTests { @Test public void decode() throws InterruptedException { Stream source = Streams.just(Buffer.wrap("barbarfoofoo").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, ResolvableType.forClass(Pojo.class), null)) - .toList().await(); + List results = Streams.from(decoder.decode(source, ResolvableType.forClass(Pojo.class), null)) + .toList().get(); assertEquals(1, results.size()); assertEquals("foofoo", ((Pojo) results.get(0)).getFoo()); } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java index 915d9677daf..ac3eea8399b 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java @@ -37,11 +37,11 @@ public class JsonObjectDecoderTests { public void decodeSingleChunkToJsonObject() throws InterruptedException { JsonObjectDecoder decoder = new JsonObjectDecoder(); Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { + List results = Streams.from(decoder.decode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(1, results.size()); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0)); } @@ -50,11 +50,11 @@ public class JsonObjectDecoderTests { public void decodeMultipleChunksToJsonObject() throws InterruptedException { JsonObjectDecoder decoder = new JsonObjectDecoder(); Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\"").byteBuffer(), Buffer.wrap(", \"bar\": \"barbar\"}").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { + List results = Streams.from(decoder.decode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(1, results.size()); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0)); } @@ -63,11 +63,11 @@ public class JsonObjectDecoderTests { public void decodeSingleChunkToArray() throws InterruptedException { JsonObjectDecoder decoder = new JsonObjectDecoder(); Stream source = Streams.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { + List results = Streams.from(decoder.decode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(2, results.size()); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0)); assertEquals("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}", results.get(1)); @@ -77,11 +77,11 @@ public class JsonObjectDecoderTests { public void decodeMultipleChunksToArray() throws InterruptedException { JsonObjectDecoder decoder = new JsonObjectDecoder(); Stream source = Streams.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\"").byteBuffer(), Buffer.wrap(": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { + List results = Streams.from(decoder.decode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(2, results.size()); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0)); assertEquals("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}", results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java index 90ab1978ecb..4e22989b9b1 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java @@ -49,8 +49,8 @@ public class StringDecoderTests { @Test public void decode() throws InterruptedException { Stream source = Streams.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, - ResolvableType.forClassWithGenerics(Publisher.class, String.class), null)).toList().await(); + List results = Streams.from(decoder.decode(source, + ResolvableType.forClassWithGenerics(Publisher.class, String.class), null)).toList().get(); assertEquals(2, results.size()); assertEquals("foo", results.get(0)); assertEquals("bar", results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferEncoderTests.java index 6c79d56bb44..168d150de99 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferEncoderTests.java @@ -49,8 +49,8 @@ public class ByteBufferEncoderTests { ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer(); ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer(); Stream source = Streams.just(fooBuffer, barBuffer); - List results = Streams.wrap(encoder.encode(source, - ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await(); + List results = Streams.from(encoder.encode(source, + ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().get(); assertEquals(2, results.size()); assertEquals(fooBuffer, results.get(0)); assertEquals(barBuffer, results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JacksonJsonEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JacksonJsonEncoderTests.java index 8193b938dcb..5350148d76d 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JacksonJsonEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JacksonJsonEncoderTests.java @@ -46,11 +46,11 @@ public class JacksonJsonEncoderTests { @Test public void write() throws InterruptedException { Stream source = Streams.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); - List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(2, results.size()); assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", results.get(0)); assertEquals("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/Jaxb2EncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/Jaxb2EncoderTests.java index 1b6a8b73606..2536b2c0271 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/Jaxb2EncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/Jaxb2EncoderTests.java @@ -47,11 +47,11 @@ public class Jaxb2EncoderTests { @Test public void encode() throws InterruptedException { Stream source = Streams.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); - List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(2, results.size()); assertEquals("barbarfoofoo", results.get(0)); assertEquals("barbarbarfoofoofoo", results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java index 5e56f044c5d..a41f62ad98b 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java @@ -37,11 +37,11 @@ public class JsonObjectEncoderTests { public void encodeSingleElement() throws InterruptedException { JsonObjectEncoder encoder = new JsonObjectEncoder(); Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); - List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); String result = String.join("", results); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", result); } @@ -52,11 +52,11 @@ public class JsonObjectEncoderTests { Stream source = Streams.just( Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer()); - List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); String result = String.join("", results); assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]", result); } @@ -69,11 +69,11 @@ public class JsonObjectEncoderTests { Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer() ); - List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); String result = String.join("", results); assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]", result); } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java index 2c371f0391f..3eef55cfe51 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java @@ -45,12 +45,12 @@ public class StringEncoderTests { @Test public void write() throws InterruptedException { - List results = Streams.wrap(encoder.encode(Streams.just("foo"), null, null)) + List results = Streams.from(encoder.encode(Streams.just("foo"), null, null)) .map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().await(); + }).toList().get(); assertEquals(1, results.size()); assertEquals("foo", results.get(0)); } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/DispatcherHandlerErrorTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/DispatcherHandlerErrorTests.java index 6ca079a6fc2..5a43444de45 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/DispatcherHandlerErrorTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/DispatcherHandlerErrorTests.java @@ -19,12 +19,11 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import reactor.rx.Streams; import reactor.rx.stream.Signal; @@ -154,7 +153,7 @@ public class DispatcherHandlerErrorTests { public void notAcceptable() throws Exception { this.request.setUri(new URI("/request-body")); this.request.getHeaders().setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); - this.request.setBody(Publishers.just(ByteBuffer.wrap("body".getBytes("UTF-8")))); + this.request.setBody(Mono.just(ByteBuffer.wrap("body".getBytes("UTF-8")))); Publisher publisher = this.dispatcherHandler.handle(this.request, this.response); Throwable ex = awaitErrorSignal(publisher); @@ -167,12 +166,14 @@ public class DispatcherHandlerErrorTests { @Test public void requestBodyError() throws Exception { this.request.setUri(new URI("/request-body")); - this.request.setBody(Publishers.error(EXCEPTION)); + this.request.setBody(Mono.error(EXCEPTION)); Publisher publisher = this.dispatcherHandler.handle(this.request, this.response); Throwable ex = awaitErrorSignal(publisher); + ex.printStackTrace(); assertSame(EXCEPTION, ex); + } @Test @@ -183,7 +184,7 @@ public class DispatcherHandlerErrorTests { HttpHandler httpHandler = new ErrorHandlingHttpHandler(this.dispatcherHandler, exceptionHandler); Publisher publisher = httpHandler.handle(this.request, this.response); - Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + Streams.from(publisher).toList().get(); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus()); } @@ -196,13 +197,13 @@ public class DispatcherHandlerErrorTests { httpHandler = new ErrorHandlingHttpHandler(httpHandler, new ServerError500ExceptionHandler()); Publisher publisher = httpHandler.handle(this.request, this.response); - Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + Streams.from(publisher).toList().get(); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus()); } private Throwable awaitErrorSignal(Publisher publisher) throws Exception { - Signal signal = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS).get(0); + Signal signal = Streams.from(publisher).materialize().toList().get().get(0); assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType()); return signal.getThrowable(); } @@ -245,7 +246,7 @@ public class DispatcherHandlerErrorTests { @RequestMapping("/error-signal") @ResponseBody public Publisher errorSignal() { - return Publishers.error(EXCEPTION); + return Mono.error(EXCEPTION); } @RequestMapping("/raise-exception") @@ -261,7 +262,7 @@ public class DispatcherHandlerErrorTests { @RequestMapping("/request-body") @ResponseBody public Publisher requestBody(@RequestBody Publisher body) { - return Publishers.map(body, s -> "hello " + s); + return Mono.from(body).map(s -> "hello " + s); } } @@ -271,16 +272,16 @@ public class DispatcherHandlerErrorTests { private static class ServerError500ExceptionHandler implements HttpExceptionHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex) { response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); - return Publishers.empty(); + return Mono.empty(); } } private static class TestHttpFilter implements HttpFilter { @Override - public Publisher filter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { + public Mono filter(ServerHttpRequest req, ServerHttpResponse res, HttpFilterChain chain) { return chain.filter(req, res); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/ResponseStatusExceptionHandlerTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/ResponseStatusExceptionHandlerTests.java index 076a589230c..3744ecaa6ee 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/ResponseStatusExceptionHandlerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/ResponseStatusExceptionHandlerTests.java @@ -60,7 +60,7 @@ public class ResponseStatusExceptionHandlerTests { Throwable ex = new ResponseStatusException(HttpStatus.BAD_REQUEST); Publisher publisher = this.handler.handle(this.request, this.response, ex); - Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + Streams.from(publisher).toList().get(); assertEquals(HttpStatus.BAD_REQUEST, this.response.getStatus()); } @@ -69,7 +69,7 @@ public class ResponseStatusExceptionHandlerTests { Throwable ex = new IllegalStateException(); Publisher publisher = this.handler.handle(this.request, this.response, ex); - List> signals = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS); + List> signals = Streams.from(publisher).materialize().toList().get(); assertEquals(1, signals.size()); assertTrue(signals.get(0).hasError()); assertSame(ex, signals.get(0).getThrowable()); diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandlerTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandlerTests.java index 0a04a12ebaa..9e6381c6727 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandlerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandlerTests.java @@ -28,7 +28,7 @@ import rx.Observable; import org.springframework.core.ResolvableType; import org.springframework.core.convert.support.GenericConversionService; import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter; -import org.springframework.core.convert.support.ReactiveStreamsToReactorConverter; +import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter; import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter; import org.springframework.web.method.HandlerMethod; import org.springframework.web.reactive.HandlerResult; @@ -75,7 +75,7 @@ public class SimpleHandlerResultHandlerTests { GenericConversionService conversionService = new GenericConversionService(); conversionService.addConverter(new ReactiveStreamsToCompletableFutureConverter()); - conversionService.addConverter(new ReactiveStreamsToReactorConverter()); + conversionService.addConverter(new ReactiveStreamsToReactorStreamConverter()); conversionService.addConverter(new ReactiveStreamsToRxJava1Converter()); SimpleHandlerResultHandler resultHandler = new SimpleHandlerResultHandler(conversionService); TestController controller = new TestController(); diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java index da6cf3fc28f..82690ca99d2 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java @@ -22,8 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.junit.Test; -import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import reactor.io.buffer.Buffer; import reactor.rx.Streams; @@ -136,7 +135,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler private static class FooHandler implements HttpHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { return response.setBody(Streams.just(Buffer.wrap("foo").byteBuffer())); } } @@ -144,7 +143,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler private static class BarHandler implements HttpHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { return response.setBody(Streams.just(Buffer.wrap("bar").byteBuffer())); } } @@ -152,9 +151,9 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler private static class HeaderSettingHandler implements HttpHandler { @Override - public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { response.getHeaders().add("foo", "bar"); - return Publishers.empty(); + return Mono.empty(); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/InvocableHandlerMethodTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/InvocableHandlerMethodTests.java index c7b0bee5fda..65a31fbd3ab 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/InvocableHandlerMethodTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/InvocableHandlerMethodTests.java @@ -21,12 +21,12 @@ import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Flux; +import reactor.Mono; import reactor.rx.Streams; import reactor.rx.stream.Signal; @@ -61,7 +61,7 @@ public class InvocableHandlerMethodTests { InvocableHandlerMethod hm = createHandlerMethod("noArgs"); Publisher publisher = hm.invokeForRequest(this.request); - List values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + List values = Streams.from(publisher).toList().get(); assertEquals(1, values.size()); assertEquals("success", values.get(0).getResult()); @@ -74,7 +74,7 @@ public class InvocableHandlerMethodTests { hm.setHandlerMethodArgumentResolvers(Collections.singletonList(new RequestParamArgumentResolver())); Publisher publisher = hm.invokeForRequest(this.request); - List values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + List values = Streams.from(publisher).toList().get(); assertEquals(1, values.size()); assertEquals("success:null", values.get(0).getResult()); @@ -83,10 +83,10 @@ public class InvocableHandlerMethodTests { @Test public void resolveArgToOneValue() throws Exception { InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class); - addResolver(hm, Publishers.just("value1")); + addResolver(hm, Mono.just("value1")); Publisher publisher = hm.invokeForRequest(this.request); - List values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + List values = Streams.from(publisher).toList().get(); assertEquals(1, values.size()); assertEquals("success:value1", values.get(0).getResult()); @@ -95,10 +95,10 @@ public class InvocableHandlerMethodTests { @Test public void resolveArgToMultipleValues() throws Exception { InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class); - addResolver(hm, Publishers.from(Arrays.asList("value1", "value2", "value3"))); + addResolver(hm, Flux.fromIterable(Arrays.asList("value1", "value2", "value3"))); Publisher publisher = hm.invokeForRequest(this.request); - List values = Streams.wrap(publisher).toList().await(5, TimeUnit.SECONDS); + List values = Streams.from(publisher).toList().get(); assertEquals(1, values.size()); assertEquals("success:value1", values.get(0).getResult()); @@ -137,7 +137,7 @@ public class InvocableHandlerMethodTests { @Test public void resolveArgumentWithErrorSignal() throws Exception { InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class); - addResolver(hm, Publishers.error(new IllegalStateException("boo"))); + addResolver(hm, Mono.error(new IllegalStateException("boo"))); Publisher publisher = hm.invokeForRequest(this.request); Throwable ex = awaitErrorSignal(publisher); @@ -151,7 +151,7 @@ public class InvocableHandlerMethodTests { @Test public void illegalArgumentExceptionIsWrappedWithHelpfulDetails() throws Exception { InvocableHandlerMethod hm = createHandlerMethod("singleArg", String.class); - addResolver(hm, Publishers.just(1)); + addResolver(hm, Mono.just(1)); Publisher publisher = hm.invokeForRequest(this.request); Throwable ex = awaitErrorSignal(publisher); @@ -183,12 +183,12 @@ public class InvocableHandlerMethodTests { private void addResolver(InvocableHandlerMethod handlerMethod, Publisher resolvedValue) { HandlerMethodArgumentResolver resolver = mock(HandlerMethodArgumentResolver.class); when(resolver.supportsParameter(any())).thenReturn(true); - when(resolver.resolveArgument(any(), any())).thenReturn(resolvedValue); + when(resolver.resolveArgument(any(), any())).thenReturn(Mono.from(resolvedValue)); handlerMethod.setHandlerMethodArgumentResolvers(Collections.singletonList(resolver)); } private Throwable awaitErrorSignal(Publisher publisher) throws Exception { - Signal signal = Streams.wrap(publisher).materialize().toList().await(5, TimeUnit.SECONDS).get(0); + Signal signal = Streams.from(publisher).materialize().toList().get().get(0); assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType()); return signal.getThrowable(); } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java index 4bbe2cd443f..0af8d8eadce 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java @@ -18,7 +18,6 @@ package org.springframework.web.reactive.method.annotation; import java.net.URI; import java.util.List; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -78,7 +77,7 @@ public class RequestMappingHandlerMappingTests { private HandlerMethod toHandlerMethod(Publisher handlerPublisher) throws InterruptedException { assertNotNull(handlerPublisher); - List handlerList = Streams.wrap(handlerPublisher).toList().await(5, TimeUnit.SECONDS); + List handlerList = Streams.from(handlerPublisher).toList().get(); assertEquals(1, handlerList.size()); return (HandlerMethod) handlerList.get(0); } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java index 50ac1bd8736..ea6cc95f347 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java @@ -25,10 +25,9 @@ import java.util.concurrent.CompletableFuture; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.Publishers; +import reactor.Mono; import reactor.io.buffer.Buffer; import reactor.rx.Promise; -import reactor.rx.Promises; import reactor.rx.Stream; import reactor.rx.Streams; import rx.Observable; @@ -46,7 +45,7 @@ import org.springframework.core.codec.support.StringEncoder; import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.GenericConversionService; import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter; -import org.springframework.core.convert.support.ReactiveStreamsToReactorConverter; +import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter; import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -321,7 +320,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati // TODO: test failures with DefaultConversionService GenericConversionService service = new GenericConversionService(); service.addConverter(new ReactiveStreamsToCompletableFutureConverter()); - service.addConverter(new ReactiveStreamsToReactorConverter()); + service.addConverter(new ReactiveStreamsToReactorStreamConverter()); service.addConverter(new ReactiveStreamsToRxJava1Converter()); return service; } @@ -398,7 +397,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @RequestMapping("/promise") @ResponseBody public Promise promiseResponseBody() { - return Promises.success(new Person("Robert")); + return Promise.success(new Person("Robert")); } @RequestMapping("/list") @@ -428,7 +427,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @RequestMapping("/publisher-capitalize") @ResponseBody public Publisher publisherCapitalize(@RequestBody Publisher persons) { - return Streams.wrap(persons).map(person -> { + return Streams.from(persons).map(person -> { person.setName(person.getName().toUpperCase()); return person; }); @@ -482,20 +481,20 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @RequestMapping("/promise-capitalize") @ResponseBody public Promise promiseCapitalize(@RequestBody Promise personFuture) { - return personFuture.map(person -> { + return Streams.from(personFuture.map(person -> { person.setName(person.getName().toUpperCase()); return person; - }); + })).promise(); } @RequestMapping("/publisher-create") public Publisher publisherCreate(@RequestBody Publisher personStream) { - return Streams.wrap(personStream).toList().onSuccess(persons::addAll).after(); + return Streams.from(personStream).toList().doOnSuccess(persons::addAll).after(); } @RequestMapping("/stream-create") - public Promise streamCreate(@RequestBody Stream personStream) { - return personStream.toList().onSuccess(persons::addAll).after(); + public Publisher streamCreate(@RequestBody Stream personStream) { + return Streams.from(personStream.toList().doOnSuccess(persons::addAll).after()).promise(); } @RequestMapping("/observable-create") @@ -512,7 +511,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @RequestMapping("/error-signal") @ResponseBody public Publisher handleWithError() { - return Publishers.error(new IllegalStateException("Boo")); + return Mono.error(new IllegalStateException("Boo")); } @ExceptionHandler