diff --git a/build.gradle b/build.gradle index 5f9b8411948..494c902c96d 100644 --- a/build.gradle +++ b/build.gradle @@ -77,7 +77,7 @@ configure(allprojects) { project -> ext.poiVersion = "3.15" ext.protobufVersion = "3.2.0" ext.quartzVersion = "2.2.3" - ext.reactorVersion = "Aluminium-SR1" + ext.reactorVersion = "Aluminium-BUILD-SNAPSHOT" ext.romeVersion = "1.7.1" ext.rxjavaVersion = '1.2.7' ext.rxjavaAdapterVersion = '1.2.1' @@ -178,6 +178,7 @@ configure(allprojects) { project -> repositories { maven { url "https://repo.spring.io/libs-release" } + maven { url "https://repo.spring.io/libs-snapshot" } maven { url "https://repo.spring.io/milestone" } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 5e12bed9e6c..69ebec00616 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -113,7 +113,7 @@ public abstract class DataBufferUtils { ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); return Flux.create(emitter -> { - emitter.setCancellation(() -> closeChannel(channel)); + emitter.onDispose(() -> closeChannel(channel)); AsynchronousFileChannelCompletionHandler completionHandler = new AsynchronousFileChannelCompletionHandler(emitter, position, dataBufferFactory, byteBuffer); diff --git a/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java index 1271c343926..50fc78ea981 100644 --- a/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java +++ b/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java @@ -15,6 +15,7 @@ */ package org.springframework.core.convert.support; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -109,7 +110,7 @@ public class ReactiveAdapterRegistryTests { Publisher source = Flowable.fromIterable(sequence); Object target = getAdapter(Flux.class).fromPublisher(source); assertTrue(target instanceof Flux); - assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000)); + assertEquals(sequence, ((Flux) target).collectList().block(Duration.ofMillis(1000))); } // TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)? @@ -119,7 +120,7 @@ public class ReactiveAdapterRegistryTests { Publisher source = Flowable.fromArray(1, 2, 3); Object target = getAdapter(Mono.class).fromPublisher(source); assertTrue(target instanceof Mono); - assertEquals(new Integer(1), ((Mono) target).blockMillis(1000)); + assertEquals(new Integer(1), ((Mono) target).block(Duration.ofMillis(1000))); } @Test @@ -195,7 +196,7 @@ public class ReactiveAdapterRegistryTests { Object source = rx.Observable.from(sequence); Object target = getAdapter(rx.Observable.class).toPublisher(source); assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux); - assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000)); + assertEquals(sequence, ((Flux) target).collectList().block(Duration.ofMillis(1000))); } @Test @@ -203,7 +204,7 @@ public class ReactiveAdapterRegistryTests { Object source = rx.Single.just(1); Object target = getAdapter(rx.Single.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); - assertEquals(new Integer(1), ((Mono) target).blockMillis(1000)); + assertEquals(new Integer(1), ((Mono) target).block(Duration.ofMillis(1000))); } @Test @@ -211,7 +212,7 @@ public class ReactiveAdapterRegistryTests { Object source = rx.Completable.complete(); Object target = getAdapter(rx.Completable.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); - ((Mono) target).blockMillis(1000); + ((Mono) target).block(Duration.ofMillis(1000)); } @Test @@ -220,7 +221,7 @@ public class ReactiveAdapterRegistryTests { Object source = io.reactivex.Flowable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source); assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux); - assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000)); + assertEquals(sequence, ((Flux) target).collectList().block(Duration.ofMillis(1000))); } @Test @@ -229,7 +230,7 @@ public class ReactiveAdapterRegistryTests { Object source = io.reactivex.Observable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source); assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux); - assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000)); + assertEquals(sequence, ((Flux) target).collectList().block(Duration.ofMillis(1000))); } @Test @@ -237,7 +238,7 @@ public class ReactiveAdapterRegistryTests { Object source = io.reactivex.Single.just(1); Object target = getAdapter(io.reactivex.Single.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); - assertEquals(new Integer(1), ((Mono) target).blockMillis(1000)); + assertEquals(new Integer(1), ((Mono) target).block(Duration.ofMillis(1000))); } @Test @@ -245,7 +246,7 @@ public class ReactiveAdapterRegistryTests { Object source = io.reactivex.Completable.complete(); Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); - ((Mono) target).blockMillis(1000); + ((Mono) target).block(Duration.ofMillis(1000)); } @Test @@ -254,7 +255,7 @@ public class ReactiveAdapterRegistryTests { future.complete(1); Object target = getAdapter(CompletableFuture.class).toPublisher(future); assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono); - assertEquals(new Integer(1), ((Mono) target).blockMillis(1000)); + assertEquals(new Integer(1), ((Mono) target).block(Duration.ofMillis(1000))); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index f5c19a57c48..ff139b87610 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -17,6 +17,7 @@ package org.springframework.messaging.tcp.reactor; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.function.BiFunction; @@ -175,7 +176,8 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private Function, Publisher> reconnectFunction(ReconnectStrategy reconnectStrategy) { return flux -> flux .scan(1, (count, element) -> count++) - .flatMap(attempt -> Mono.delayMillis(reconnectStrategy.getTimeToNextAttempt(attempt))); + .flatMap(attempt -> Mono.delay( + Duration.ofMillis(reconnectStrategy.getTimeToNextAttempt(attempt)))); } @Override diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java index 0f13ff3b943..978cf541290 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java @@ -187,7 +187,7 @@ public class ExchangeResult { private String formatBody(MediaType contentType, MonoProcessor body) { if (body.isSuccess()) { - byte[] bytes = body.blockMillis(0); + byte[] bytes = body.block(Duration.ZERO); if (bytes.length == 0) { return "No content"; } diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java index 691b481ccfe..d7ca424ce5e 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java @@ -67,7 +67,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll Mono> source = Mono.just(event); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class), - new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).blockMillis(5000); + new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("id:c42\n" + @@ -86,7 +86,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll Flux source = Flux.just("foo", "bar"); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); messageWriter.write(source, ResolvableType.forClass(String.class), - new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).blockMillis(5000); + new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:foo\n\ndata:bar\n\n") @@ -99,7 +99,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll Flux source = Flux.just("foo\nbar", "foo\nbaz"); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); messageWriter.write(source, ResolvableType.forClass(String.class), - new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).blockMillis(5000); + new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:foo\n" + @@ -116,7 +116,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll new Pojo("foofoofoo", "barbarbar")); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); messageWriter.write(source, ResolvableType.forClass(Pojo.class), - MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).blockMillis(5000); + MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n\n" + @@ -135,7 +135,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll new Pojo("foofoofoo", "barbarbar")); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); messageWriter.write(source, ResolvableType.forClass(Pojo.class), - MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).blockMillis(5000); + MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000)); StepVerifier.create(outputMessage.getBodyAsString()) .expectNext("data:{\n" + diff --git a/spring-web/src/test/java/org/springframework/web/bind/support/WebExchangeDataBinderTests.java b/spring-web/src/test/java/org/springframework/web/bind/support/WebExchangeDataBinderTests.java index 7aeb8330f73..693057466c3 100644 --- a/spring-web/src/test/java/org/springframework/web/bind/support/WebExchangeDataBinderTests.java +++ b/spring-web/src/test/java/org/springframework/web/bind/support/WebExchangeDataBinderTests.java @@ -19,6 +19,7 @@ package org.springframework.web.bind.support; import java.beans.PropertyEditorSupport; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.time.Duration; import java.util.Iterator; import org.junit.Before; @@ -64,7 +65,7 @@ public class WebExchangeDataBinderTests { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("spouse", "someValue"); formData.add("spouse.name", "test"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertNotNull(this.testBean.getSpouse()); assertEquals("test", testBean.getSpouse().getName()); @@ -75,11 +76,11 @@ public class WebExchangeDataBinderTests { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("_postProcessed", "visible"); formData.add("postProcessed", "on"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertTrue(this.testBean.isPostProcessed()); formData.remove("postProcessed"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertFalse(this.testBean.isPostProcessed()); } @@ -90,11 +91,11 @@ public class WebExchangeDataBinderTests { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("_postProcessed", "visible"); formData.add("postProcessed", "on"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertTrue(this.testBean.isPostProcessed()); formData.remove("postProcessed"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertFalse(this.testBean.isPostProcessed()); } @@ -103,11 +104,11 @@ public class WebExchangeDataBinderTests { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("!postProcessed", "off"); formData.add("postProcessed", "on"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertTrue(this.testBean.isPostProcessed()); formData.remove("postProcessed"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertFalse(this.testBean.isPostProcessed()); } @@ -117,15 +118,15 @@ public class WebExchangeDataBinderTests { formData.add("!postProcessed", "on"); formData.add("_postProcessed", "visible"); formData.add("postProcessed", "on"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertTrue(this.testBean.isPostProcessed()); formData.remove("postProcessed"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertTrue(this.testBean.isPostProcessed()); formData.remove("!postProcessed"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertFalse(this.testBean.isPostProcessed()); } @@ -134,11 +135,11 @@ public class WebExchangeDataBinderTests { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("!name", "anonymous"); formData.add("name", "Scott"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertEquals("Scott", this.testBean.getName()); formData.remove("name"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertEquals("anonymous", this.testBean.getName()); } @@ -148,12 +149,12 @@ public class WebExchangeDataBinderTests { formData.add("stringArray", "bar"); formData.add("stringArray", "abc"); formData.add("stringArray", "123,def"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertEquals("Expected all three items to be bound", 3, this.testBean.getStringArray().length); formData.remove("stringArray"); formData.add("stringArray", "123,def"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertEquals("Expected only 1 item to be bound", 1, this.testBean.getStringArray().length); } @@ -162,7 +163,7 @@ public class WebExchangeDataBinderTests { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("spouse.name", "test"); formData.add("spouse", "someValue"); - this.binder.bind(exchange(formData)).blockMillis(5000); + this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000)); assertNotNull(this.testBean.getSpouse()); assertEquals("test", this.testBean.getSpouse().getName()); @@ -173,7 +174,7 @@ public class WebExchangeDataBinderTests { String url = "/path?spouse=someValue&spouse.name=test"; MockServerHttpRequest request = MockServerHttpRequest.post(url).build(); ServerWebExchange exchange = new DefaultServerWebExchange(request, new MockServerHttpResponse()); - this.binder.bind(exchange).blockMillis(5000); + this.binder.bind(exchange).block(Duration.ofMillis(5000)); assertNotNull(this.testBean.getSpouse()); assertEquals("test", this.testBean.getSpouse().getName()); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/resource/AppCacheManifestTransformerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/resource/AppCacheManifestTransformerTests.java index a75140a5af5..3a06af80977 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/resource/AppCacheManifestTransformerTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/resource/AppCacheManifestTransformerTests.java @@ -16,6 +16,7 @@ package org.springframework.web.reactive.resource; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -87,7 +88,7 @@ public class AppCacheManifestTransformerTests { given(resource.getFilename()).willReturn("foobar.file"); given(this.chain.transform(exchange, resource)).willReturn(Mono.just(resource)); - Resource result = this.transformer.transform(exchange, resource, this.chain).blockMillis(5000); + Resource result = this.transformer.transform(exchange, resource, this.chain).block(Duration.ofMillis(5000)); assertEquals(resource, result); } @@ -98,7 +99,7 @@ public class AppCacheManifestTransformerTests { Resource resource = new ClassPathResource("test/error.appcache", getClass()); given(this.chain.transform(exchange, resource)).willReturn(Mono.just(resource)); - Resource result = this.transformer.transform(exchange, resource, this.chain).blockMillis(5000); + Resource result = this.transformer.transform(exchange, resource, this.chain).block(Duration.ofMillis(5000)); assertEquals(resource, result); } @@ -119,7 +120,7 @@ public class AppCacheManifestTransformerTests { this.chain = new DefaultResourceTransformerChain(resolverChain, transformers); Resource resource = new ClassPathResource("test/test.appcache", getClass()); - Resource result = this.transformer.transform(exchange, resource, this.chain).blockMillis(5000); + Resource result = this.transformer.transform(exchange, resource, this.chain).block(Duration.ofMillis(5000)); byte[] bytes = FileCopyUtils.copyToByteArray(result.getInputStream()); String content = new String(bytes, "UTF-8"); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java index ee5a4c7f76d..c19ea4589c3 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java @@ -195,17 +195,17 @@ public class ViewResolutionResultHandlerTests { this.request = MockServerHttpRequest.get("/account").build(); ServerWebExchange exchange = createExchange(); - handler.handleResult(exchange, result).blockMillis(5000); + handler.handleResult(exchange, result).block(Duration.ofMillis(5000)); assertResponseBody(exchange, "account: {id=123}"); this.request = MockServerHttpRequest.get("/account/").build(); exchange = createExchange(); - handler.handleResult(exchange, result).blockMillis(5000); + handler.handleResult(exchange, result).block(Duration.ofMillis(5000)); assertResponseBody(exchange, "account: {id=123}"); this.request = MockServerHttpRequest.get("/account.123").build(); exchange = createExchange(); - handler.handleResult(exchange, result).blockMillis(5000); + handler.handleResult(exchange, result).block(Duration.ofMillis(5000)); assertResponseBody(exchange, "account: {id=123}"); } @@ -281,7 +281,7 @@ public class ViewResolutionResultHandlerTests { this.request = MockServerHttpRequest.get("/account").build(); ServerWebExchange exchange = createExchange(); - handler.handleResult(exchange, result).blockMillis(5000); + handler.handleResult(exchange, result).block(Duration.ofMillis(5000)); assertResponseBody(exchange, "account: {" + "attr1=TestBean[name=Bean1], " + "attr2=[TestBean[name=Bean1], TestBean[name=Bean2]], " +