Stabilize Flux.interval emissions in integration tests
After this commit the use of interval in tests is combined with take(n).onBackpressureBuffer(n) to ensure emissions don't fail if the fixed rate is exceeded (e.g. on slow CI server). Tests that verify N number of items followed by verifyOnComplete() should set the number of emissions to N. Tests that verify N number of items followed by thenCancel() should set the number of buffered to an arbitrary number greater than N.
This commit is contained in:
parent
41a4bdea55
commit
1653a32a04
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -59,7 +59,7 @@ public class ResponseEntityTests {
|
|||
|
||||
|
||||
@Test
|
||||
public void entity() throws Exception {
|
||||
public void entity() {
|
||||
this.client.get().uri("/John")
|
||||
.exchange()
|
||||
.expectStatus().isOk()
|
||||
|
@ -68,7 +68,7 @@ public class ResponseEntityTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void entityWithConsumer() throws Exception {
|
||||
public void entityWithConsumer() {
|
||||
this.client.get().uri("/John")
|
||||
.exchange()
|
||||
.expectStatus().isOk()
|
||||
|
@ -78,7 +78,7 @@ public class ResponseEntityTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void entityList() throws Exception {
|
||||
public void entityList() {
|
||||
|
||||
List<Person> expected = Arrays.asList(
|
||||
new Person("Jane"), new Person("Jason"), new Person("John"));
|
||||
|
@ -91,7 +91,7 @@ public class ResponseEntityTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void entityMap() throws Exception {
|
||||
public void entityMap() {
|
||||
|
||||
Map<String, Person> map = new LinkedHashMap<>();
|
||||
map.put("Jane", new Person("Jane"));
|
||||
|
@ -105,7 +105,7 @@ public class ResponseEntityTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void entityStream() throws Exception {
|
||||
public void entityStream() {
|
||||
|
||||
FluxExchangeResult<Person> result = this.client.get()
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
|
@ -123,7 +123,7 @@ public class ResponseEntityTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void postEntity() throws Exception {
|
||||
public void postEntity() {
|
||||
this.client.post()
|
||||
.syncBody(new Person("John"))
|
||||
.exchange()
|
||||
|
@ -158,7 +158,8 @@ public class ResponseEntityTests {
|
|||
|
||||
@GetMapping(produces = "text/event-stream")
|
||||
Flux<Person> getPersonStream() {
|
||||
return Flux.interval(ofMillis(100)).onBackpressureBuffer(10).map(index -> new Person("N" + index));
|
||||
return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50)
|
||||
.map(index -> new Person("N" + index));
|
||||
}
|
||||
|
||||
@PostMapping
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.io.File;
|
||||
import java.time.Duration;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -24,6 +25,7 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.http.server.reactive.bootstrap.HttpServer;
|
||||
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
|
||||
|
@ -73,4 +75,13 @@ public abstract class AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
protected abstract HttpHandler createHttpHandler();
|
||||
|
||||
|
||||
/**
|
||||
* Return an interval stream of with n number of ticks and buffer the
|
||||
* emissions to avoid back pressure failures (e.g. on slow CI server).
|
||||
*/
|
||||
public static Flux<Long> interval(Duration period, int count) {
|
||||
return Flux.interval(period).take(count).onBackpressureBuffer(2);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
|
||||
|
||||
@Test
|
||||
public void writeAndFlushWith() throws Exception {
|
||||
public void writeAndFlushWith() {
|
||||
Mono<String> result = this.webClient.get()
|
||||
.uri("/write-and-flush")
|
||||
.retrieve()
|
||||
|
@ -64,7 +64,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
StepVerifier.create(result)
|
||||
.expectNext("data0data1")
|
||||
.expectComplete()
|
||||
.verify(Duration.ofSeconds(5L));
|
||||
.verify(Duration.ofSeconds(10L));
|
||||
}
|
||||
|
||||
@Test // SPR-14991
|
||||
|
@ -79,7 +79,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
StepVerifier.create(result)
|
||||
.consumeNextWith(value -> assertTrue(value.length() == 20000 * "0123456789".length()))
|
||||
.expectComplete()
|
||||
.verify(Duration.ofSeconds(5L));
|
||||
.verify(Duration.ofSeconds(10L));
|
||||
}
|
||||
catch (AssertionError err) {
|
||||
String os = System.getProperty("os.name").toLowerCase();
|
||||
|
@ -103,7 +103,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
StepVerifier.create(result)
|
||||
.expectNextMatches(s -> s.startsWith("0123456789"))
|
||||
.expectComplete()
|
||||
.verify(Duration.ofSeconds(5L));
|
||||
.verify(Duration.ofSeconds(10L));
|
||||
}
|
||||
|
||||
|
||||
|
@ -119,13 +119,10 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
String path = request.getURI().getPath();
|
||||
if (path.endsWith("write-and-flush")) {
|
||||
Flux<Publisher<DataBuffer>> responseBody = Flux
|
||||
.interval(Duration.ofMillis(50))
|
||||
Flux<Publisher<DataBuffer>> responseBody = interval(Duration.ofMillis(50), 2)
|
||||
.map(l -> toDataBuffer("data" + l + "\n", response.bufferFactory()))
|
||||
.take(2)
|
||||
.map(Flux::just);
|
||||
responseBody = responseBody.concatWith(Flux.never());
|
||||
return response.writeAndFlushWith(responseBody);
|
||||
return response.writeAndFlushWith(responseBody.concatWith(Flux.never()));
|
||||
}
|
||||
else if (path.endsWith("write-and-complete")) {
|
||||
Flux<DataBuffer> responseBody = Flux
|
||||
|
@ -138,9 +135,8 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
Flux<DataBuffer> responseBody = Flux
|
||||
.just("0123456789")
|
||||
.repeat(20000)
|
||||
.map(value -> toDataBuffer(value + "\n", response.bufferFactory()))
|
||||
.mergeWith(Flux.never());
|
||||
return response.writeWith(responseBody);
|
||||
.map(value -> toDataBuffer(value + "\n", response.bufferFactory()));
|
||||
return response.writeWith(responseBody.mergeWith(Flux.never()));
|
||||
}
|
||||
return response.writeWith(Flux.empty());
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -30,10 +30,10 @@ import org.springframework.http.codec.ServerSentEvent;
|
|||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
|
||||
import static org.springframework.web.reactive.function.BodyExtractors.toFlux;
|
||||
import static org.springframework.web.reactive.function.BodyInserters.fromServerSentEvents;
|
||||
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
|
||||
import static org.springframework.http.MediaType.*;
|
||||
import static org.springframework.web.reactive.function.BodyExtractors.*;
|
||||
import static org.springframework.web.reactive.function.BodyInserters.*;
|
||||
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
|
@ -59,7 +59,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
|
|||
|
||||
|
||||
@Test
|
||||
public void sseAsString() throws Exception {
|
||||
public void sseAsString() {
|
||||
Flux<String> result = this.webClient.get()
|
||||
.uri("/string")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
|
@ -74,7 +74,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sseAsPerson() throws Exception {
|
||||
public void sseAsPerson() {
|
||||
Flux<Person> result = this.webClient.get()
|
||||
.uri("/person")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
|
@ -89,7 +89,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sseAsEvent() throws Exception {
|
||||
public void sseAsEvent() {
|
||||
Flux<ServerSentEvent<String>> result = this.webClient.get()
|
||||
.uri("/event")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
|
@ -119,28 +119,25 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
|
|||
|
||||
private static class SseHandler {
|
||||
|
||||
public Mono<ServerResponse> string(ServerRequest request) {
|
||||
Flux<String> flux = Flux.interval(Duration.ofMillis(100)).map(l -> "foo " + l).take(2);
|
||||
private static final Flux<Long> INTERVAL = interval(Duration.ofMillis(100), 2);
|
||||
|
||||
|
||||
Mono<ServerResponse> string(ServerRequest request) {
|
||||
return ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_EVENT_STREAM)
|
||||
.body(flux, String.class);
|
||||
.body(INTERVAL.map(aLong -> "foo " + aLong), String.class);
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> person(ServerRequest request) {
|
||||
Flux<Person> flux = Flux.interval(Duration.ofMillis(100))
|
||||
.map(l -> new Person("foo " + l)).take(2);
|
||||
Mono<ServerResponse> person(ServerRequest request) {
|
||||
return ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_EVENT_STREAM)
|
||||
.body(flux, Person.class);
|
||||
.body(INTERVAL.map(aLong -> new Person("foo " + aLong)), Person.class);
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> sse(ServerRequest request) {
|
||||
Flux<ServerSentEvent<String>> flux = Flux.interval(Duration.ofMillis(100))
|
||||
.map(l -> ServerSentEvent.<String>builder().data("foo")
|
||||
.id(Long.toString(l))
|
||||
.comment("bar")
|
||||
.build()).take(2);
|
||||
return ServerResponse.ok().body(fromServerSentEvents(flux));
|
||||
Mono<ServerResponse> sse(ServerRequest request) {
|
||||
Flux<ServerSentEvent<String>> body = INTERVAL
|
||||
.map(aLong -> ServerSentEvent.builder("foo").id("" + aLong).comment("bar").build());
|
||||
return ServerResponse.ok().body(fromServerSentEvents(body));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,15 +29,14 @@ import org.springframework.context.annotation.Configuration;
|
|||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
|
||||
import org.springframework.http.server.reactive.HttpHandler;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.reactive.DispatcherHandler;
|
||||
import org.springframework.web.reactive.config.EnableWebFlux;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
|
||||
|
||||
import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON;
|
||||
import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON_VALUE;
|
||||
import static org.springframework.http.MediaType.*;
|
||||
|
||||
/**
|
||||
* @author Sebastien Deleuze
|
||||
|
@ -67,7 +66,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra
|
|||
}
|
||||
|
||||
@Test
|
||||
public void jsonStreaming() throws Exception {
|
||||
public void jsonStreaming() {
|
||||
Flux<Person> result = this.webClient.get()
|
||||
.uri("/stream")
|
||||
.accept(APPLICATION_STREAM_JSON)
|
||||
|
@ -82,7 +81,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra
|
|||
}
|
||||
|
||||
@Test
|
||||
public void smileStreaming() throws Exception {
|
||||
public void smileStreaming() {
|
||||
Flux<Person> result = this.webClient.get()
|
||||
.uri("/stream")
|
||||
.accept(new MediaType("application", "stream+x-jackson-smile"))
|
||||
|
@ -100,9 +99,10 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra
|
|||
@SuppressWarnings("unused")
|
||||
static class JacksonStreamingController {
|
||||
|
||||
@RequestMapping(value = "/stream", produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" })
|
||||
@GetMapping(value = "/stream",
|
||||
produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" })
|
||||
Flux<Person> person() {
|
||||
return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l));
|
||||
return interval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -20,7 +20,6 @@ import java.time.Duration;
|
|||
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
|
@ -30,9 +29,7 @@ import org.springframework.web.bind.annotation.GetMapping;
|
|||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.reactive.config.EnableWebFlux;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Integration tests with {@code @RequestMapping} handler methods.
|
||||
|
@ -56,7 +53,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr
|
|||
|
||||
|
||||
@Test
|
||||
public void httpHead() throws Exception {
|
||||
public void httpHead() {
|
||||
String url = "http://localhost:" + this.port + "/text";
|
||||
HttpHeaders headers = getRestTemplate().headForHeaders(url);
|
||||
String contentType = headers.getFirst("Content-Type");
|
||||
|
@ -89,7 +86,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr
|
|||
|
||||
@GetMapping("/stream")
|
||||
public Publisher<Long> stream() {
|
||||
return Flux.interval(Duration.ofMillis(50)).take(5);
|
||||
return interval(Duration.ofMillis(50), 5);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -71,7 +71,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sseAsString() throws Exception {
|
||||
public void sseAsString() {
|
||||
Flux<String> result = this.webClient.get()
|
||||
.uri("/string")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
|
@ -86,7 +86,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sseAsPerson() throws Exception {
|
||||
public void sseAsPerson() {
|
||||
Flux<Person> result = this.webClient.get()
|
||||
.uri("/person")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
|
@ -101,13 +101,14 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sseAsEvent() throws Exception {
|
||||
public void sseAsEvent() {
|
||||
ResolvableType type = forClassWithGenerics(ServerSentEvent.class, String.class);
|
||||
Flux<ServerSentEvent<String>> result = this.webClient.get()
|
||||
.uri("/event")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
.exchange()
|
||||
.flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
|
||||
.flatMapMany(response -> response.body(
|
||||
toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith( event -> {
|
||||
|
@ -129,12 +130,13 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sseAsEventWithoutAcceptHeader() throws Exception {
|
||||
public void sseAsEventWithoutAcceptHeader() {
|
||||
Flux<ServerSentEvent<String>> result = this.webClient.get()
|
||||
.uri("/event")
|
||||
.accept(TEXT_EVENT_STREAM)
|
||||
.exchange()
|
||||
.flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
|
||||
.flatMapMany(response -> response.body(
|
||||
toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith( event -> {
|
||||
|
@ -155,23 +157,27 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
.verify(Duration.ofSeconds(5L));
|
||||
}
|
||||
|
||||
|
||||
@RestController
|
||||
@SuppressWarnings("unused")
|
||||
static class SseController {
|
||||
|
||||
private static final Flux<Long> INTERVAL = interval(Duration.ofMillis(100), 50);
|
||||
|
||||
|
||||
@RequestMapping("/sse/string")
|
||||
Flux<String> string() {
|
||||
return Flux.interval(Duration.ofMillis(100)).map(l -> "foo " + l);
|
||||
return INTERVAL.map(l -> "foo " + l);
|
||||
}
|
||||
|
||||
@RequestMapping("/sse/person")
|
||||
Flux<Person> person() {
|
||||
return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l));
|
||||
return INTERVAL.map(l -> new Person("foo " + l));
|
||||
}
|
||||
|
||||
@RequestMapping("/sse/event")
|
||||
Flux<ServerSentEvent<String>> sse() {
|
||||
return Flux.interval(Duration.ofMillis(100)).map(l -> ServerSentEvent.builder("foo")
|
||||
return INTERVAL.map(l -> ServerSentEvent.builder("foo")
|
||||
.id(Long.toString(l))
|
||||
.comment("bar")
|
||||
.build());
|
||||
|
@ -179,6 +185,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
@EnableWebFlux
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -190,6 +197,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private static class Person {
|
||||
|
||||
|
|
Loading…
Reference in New Issue