From 1653a32a04fa68832fb7c1c991d99e9bf9ced229 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 6 Feb 2018 11:01:19 -0500 Subject: [PATCH] Stabilize Flux.interval emissions in integration tests After this commit the use of interval in tests is combined with take(n).onBackpressureBuffer(n) to ensure emissions don't fail if the fixed rate is exceeded (e.g. on slow CI server). Tests that verify N number of items followed by verifyOnComplete() should set the number of emissions to N. Tests that verify N number of items followed by thenCancel() should set the number of buffered to an arbitrary number greater than N. --- .../server/samples/ResponseEntityTests.java | 17 ++++---- .../AbstractHttpHandlerIntegrationTests.java | 13 +++++- .../reactive/FlushingIntegrationTests.java | 20 ++++----- .../SseHandlerFunctionIntegrationTests.java | 41 +++++++++---------- .../JacksonStreamingIntegrationTests.java | 14 +++---- .../RequestMappingIntegrationTests.java | 11 ++--- .../annotation/SseIntegrationTests.java | 28 ++++++++----- 7 files changed, 77 insertions(+), 67 deletions(-) diff --git a/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java b/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java index 7d39d9817a..1177be557e 100644 --- a/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java +++ b/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ public class ResponseEntityTests { @Test - public void entity() throws Exception { + public void entity() { this.client.get().uri("/John") .exchange() .expectStatus().isOk() @@ -68,7 +68,7 @@ public class ResponseEntityTests { } @Test - public void entityWithConsumer() throws Exception { + public void entityWithConsumer() { this.client.get().uri("/John") .exchange() .expectStatus().isOk() @@ -78,7 +78,7 @@ public class ResponseEntityTests { } @Test - public void entityList() throws Exception { + public void entityList() { List expected = Arrays.asList( new Person("Jane"), new Person("Jason"), new Person("John")); @@ -91,7 +91,7 @@ public class ResponseEntityTests { } @Test - public void entityMap() throws Exception { + public void entityMap() { Map map = new LinkedHashMap<>(); map.put("Jane", new Person("Jane")); @@ -105,7 +105,7 @@ public class ResponseEntityTests { } @Test - public void entityStream() throws Exception { + public void entityStream() { FluxExchangeResult result = this.client.get() .accept(TEXT_EVENT_STREAM) @@ -123,7 +123,7 @@ public class ResponseEntityTests { } @Test - public void postEntity() throws Exception { + public void postEntity() { this.client.post() .syncBody(new Person("John")) .exchange() @@ -158,7 +158,8 @@ public class ResponseEntityTests { @GetMapping(produces = "text/event-stream") Flux getPersonStream() { - return Flux.interval(ofMillis(100)).onBackpressureBuffer(10).map(index -> new Person("N" + index)); + return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50) + .map(index -> new Person("N" + index)); } @PostMapping diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java index 9ebf31f777..2ad80fb772 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive; import java.io.File; +import java.time.Duration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,6 +25,7 @@ import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import reactor.core.publisher.Flux; import org.springframework.http.server.reactive.bootstrap.HttpServer; import org.springframework.http.server.reactive.bootstrap.JettyHttpServer; @@ -73,4 +75,13 @@ public abstract class AbstractHttpHandlerIntegrationTests { protected abstract HttpHandler createHttpHandler(); + + /** + * Return an interval stream of with n number of ticks and buffer the + * emissions to avoid back pressure failures (e.g. on slow CI server). + */ + public static Flux interval(Duration period, int count) { + return Flux.interval(period).take(count).onBackpressureBuffer(2); + } + } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java index 8d30429729..9f73c560fd 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java @@ -53,7 +53,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest @Test - public void writeAndFlushWith() throws Exception { + public void writeAndFlushWith() { Mono result = this.webClient.get() .uri("/write-and-flush") .retrieve() @@ -64,7 +64,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest StepVerifier.create(result) .expectNext("data0data1") .expectComplete() - .verify(Duration.ofSeconds(5L)); + .verify(Duration.ofSeconds(10L)); } @Test // SPR-14991 @@ -79,7 +79,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest StepVerifier.create(result) .consumeNextWith(value -> assertTrue(value.length() == 20000 * "0123456789".length())) .expectComplete() - .verify(Duration.ofSeconds(5L)); + .verify(Duration.ofSeconds(10L)); } catch (AssertionError err) { String os = System.getProperty("os.name").toLowerCase(); @@ -103,7 +103,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest StepVerifier.create(result) .expectNextMatches(s -> s.startsWith("0123456789")) .expectComplete() - .verify(Duration.ofSeconds(5L)); + .verify(Duration.ofSeconds(10L)); } @@ -119,13 +119,10 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { String path = request.getURI().getPath(); if (path.endsWith("write-and-flush")) { - Flux> responseBody = Flux - .interval(Duration.ofMillis(50)) + Flux> responseBody = interval(Duration.ofMillis(50), 2) .map(l -> toDataBuffer("data" + l + "\n", response.bufferFactory())) - .take(2) .map(Flux::just); - responseBody = responseBody.concatWith(Flux.never()); - return response.writeAndFlushWith(responseBody); + return response.writeAndFlushWith(responseBody.concatWith(Flux.never())); } else if (path.endsWith("write-and-complete")) { Flux responseBody = Flux @@ -138,9 +135,8 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest Flux responseBody = Flux .just("0123456789") .repeat(20000) - .map(value -> toDataBuffer(value + "\n", response.bufferFactory())) - .mergeWith(Flux.never()); - return response.writeWith(responseBody); + .map(value -> toDataBuffer(value + "\n", response.bufferFactory())); + return response.writeWith(responseBody.mergeWith(Flux.never())); } return response.writeWith(Flux.empty()); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java index 356ac87c65..24f5a5abf7 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,10 +30,10 @@ import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import static org.junit.Assert.*; -import static org.springframework.http.MediaType.TEXT_EVENT_STREAM; -import static org.springframework.web.reactive.function.BodyExtractors.toFlux; -import static org.springframework.web.reactive.function.BodyInserters.fromServerSentEvents; -import static org.springframework.web.reactive.function.server.RouterFunctions.route; +import static org.springframework.http.MediaType.*; +import static org.springframework.web.reactive.function.BodyExtractors.*; +import static org.springframework.web.reactive.function.BodyInserters.*; +import static org.springframework.web.reactive.function.server.RouterFunctions.*; /** * @author Arjen Poutsma @@ -59,7 +59,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn @Test - public void sseAsString() throws Exception { + public void sseAsString() { Flux result = this.webClient.get() .uri("/string") .accept(TEXT_EVENT_STREAM) @@ -74,7 +74,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn } @Test - public void sseAsPerson() throws Exception { + public void sseAsPerson() { Flux result = this.webClient.get() .uri("/person") .accept(TEXT_EVENT_STREAM) @@ -89,7 +89,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn } @Test - public void sseAsEvent() throws Exception { + public void sseAsEvent() { Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) @@ -119,28 +119,25 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn private static class SseHandler { - public Mono string(ServerRequest request) { - Flux flux = Flux.interval(Duration.ofMillis(100)).map(l -> "foo " + l).take(2); + private static final Flux INTERVAL = interval(Duration.ofMillis(100), 2); + + + Mono string(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.TEXT_EVENT_STREAM) - .body(flux, String.class); + .body(INTERVAL.map(aLong -> "foo " + aLong), String.class); } - public Mono person(ServerRequest request) { - Flux flux = Flux.interval(Duration.ofMillis(100)) - .map(l -> new Person("foo " + l)).take(2); + Mono person(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.TEXT_EVENT_STREAM) - .body(flux, Person.class); + .body(INTERVAL.map(aLong -> new Person("foo " + aLong)), Person.class); } - public Mono sse(ServerRequest request) { - Flux> flux = Flux.interval(Duration.ofMillis(100)) - .map(l -> ServerSentEvent.builder().data("foo") - .id(Long.toString(l)) - .comment("bar") - .build()).take(2); - return ServerResponse.ok().body(fromServerSentEvents(flux)); + Mono sse(ServerRequest request) { + Flux> body = INTERVAL + .map(aLong -> ServerSentEvent.builder("foo").id("" + aLong).comment("bar").build()); + return ServerResponse.ok().body(fromServerSentEvents(body)); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java index 8a8c0ca410..f729b8145e 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java @@ -29,15 +29,14 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; -import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; -import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON; -import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON_VALUE; +import static org.springframework.http.MediaType.*; /** * @author Sebastien Deleuze @@ -67,7 +66,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra } @Test - public void jsonStreaming() throws Exception { + public void jsonStreaming() { Flux result = this.webClient.get() .uri("/stream") .accept(APPLICATION_STREAM_JSON) @@ -82,7 +81,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra } @Test - public void smileStreaming() throws Exception { + public void smileStreaming() { Flux result = this.webClient.get() .uri("/stream") .accept(new MediaType("application", "stream+x-jackson-smile")) @@ -100,9 +99,10 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra @SuppressWarnings("unused") static class JacksonStreamingController { - @RequestMapping(value = "/stream", produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" }) + @GetMapping(value = "/stream", + produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" }) Flux person() { - return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)); + return interval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l)); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java index 5eee687983..b126b32d11 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import java.time.Duration; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -30,9 +29,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.config.EnableWebFlux; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; /** * Integration tests with {@code @RequestMapping} handler methods. @@ -56,7 +53,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr @Test - public void httpHead() throws Exception { + public void httpHead() { String url = "http://localhost:" + this.port + "/text"; HttpHeaders headers = getRestTemplate().headForHeaders(url); String contentType = headers.getFirst("Content-Type"); @@ -89,7 +86,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr @GetMapping("/stream") public Publisher stream() { - return Flux.interval(Duration.ofMillis(50)).take(5); + return interval(Duration.ofMillis(50), 5); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 06c17b3408..287609df3f 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsString() throws Exception { + public void sseAsString() { Flux result = this.webClient.get() .uri("/string") .accept(TEXT_EVENT_STREAM) @@ -86,7 +86,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsPerson() throws Exception { + public void sseAsPerson() { Flux result = this.webClient.get() .uri("/person") .accept(TEXT_EVENT_STREAM) @@ -101,13 +101,14 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsEvent() throws Exception { + public void sseAsEvent() { ResolvableType type = forClassWithGenerics(ServerSentEvent.class, String.class); Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) .exchange() - .flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference>() {}))); + .flatMapMany(response -> response.body( + toFlux(new ParameterizedTypeReference>() {}))); StepVerifier.create(result) .consumeNextWith( event -> { @@ -129,12 +130,13 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsEventWithoutAcceptHeader() throws Exception { + public void sseAsEventWithoutAcceptHeader() { Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) .exchange() - .flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference>() {}))); + .flatMapMany(response -> response.body( + toFlux(new ParameterizedTypeReference>() {}))); StepVerifier.create(result) .consumeNextWith( event -> { @@ -155,23 +157,27 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .verify(Duration.ofSeconds(5L)); } + @RestController @SuppressWarnings("unused") static class SseController { + private static final Flux INTERVAL = interval(Duration.ofMillis(100), 50); + + @RequestMapping("/sse/string") Flux string() { - return Flux.interval(Duration.ofMillis(100)).map(l -> "foo " + l); + return INTERVAL.map(l -> "foo " + l); } @RequestMapping("/sse/person") Flux person() { - return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)); + return INTERVAL.map(l -> new Person("foo " + l)); } @RequestMapping("/sse/event") Flux> sse() { - return Flux.interval(Duration.ofMillis(100)).map(l -> ServerSentEvent.builder("foo") + return INTERVAL.map(l -> ServerSentEvent.builder("foo") .id(Long.toString(l)) .comment("bar") .build()); @@ -179,6 +185,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } + @Configuration @EnableWebFlux @SuppressWarnings("unused") @@ -190,6 +197,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } } + @SuppressWarnings("unused") private static class Person {