Improve RSocketClientToServer*IntegrationTests reliability
Those new delays seems to make tests more reliable. Closes gh-30844
This commit is contained in:
parent
a17cf742b2
commit
c91041b675
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2021 the original author or authors.
|
* Copyright 2002-2023 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -56,6 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||||
* Server-side handling of RSocket requests.
|
* Server-side handling of RSocket requests.
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
|
* @author Sebastien Deleuze
|
||||||
*/
|
*/
|
||||||
public class RSocketClientToServerIntegrationTests {
|
public class RSocketClientToServerIntegrationTests {
|
||||||
|
|
||||||
|
@ -104,6 +105,7 @@ public class RSocketClientToServerIntegrationTests {
|
||||||
@Test
|
@Test
|
||||||
public void fireAndForget() {
|
public void fireAndForget() {
|
||||||
Flux.range(1, 3)
|
Flux.range(1, 3)
|
||||||
|
.delayElements(Duration.ofMillis(10))
|
||||||
.concatMap(i -> requester.route("receive").data("Hello " + i).send())
|
.concatMap(i -> requester.route("receive").data("Hello " + i).send())
|
||||||
.blockLast();
|
.blockLast();
|
||||||
|
|
||||||
|
@ -111,7 +113,7 @@ public class RSocketClientToServerIntegrationTests {
|
||||||
.expectNext("Hello 1")
|
.expectNext("Hello 1")
|
||||||
.expectNext("Hello 2")
|
.expectNext("Hello 2")
|
||||||
.expectNext("Hello 3")
|
.expectNext("Hello 3")
|
||||||
.thenAwait(Duration.ofMillis(50))
|
.thenAwait(Duration.ofMillis(10))
|
||||||
.thenCancel()
|
.thenCancel()
|
||||||
.verify(Duration.ofSeconds(5));
|
.verify(Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
@ -173,13 +175,14 @@ public class RSocketClientToServerIntegrationTests {
|
||||||
@Test
|
@Test
|
||||||
public void metadataPush() {
|
public void metadataPush() {
|
||||||
Flux.just("bar", "baz")
|
Flux.just("bar", "baz")
|
||||||
|
.delayElements(Duration.ofMillis(10))
|
||||||
.concatMap(s -> requester.route("foo-updates").metadata(s, FOO_MIME_TYPE).sendMetadata())
|
.concatMap(s -> requester.route("foo-updates").metadata(s, FOO_MIME_TYPE).sendMetadata())
|
||||||
.blockLast();
|
.blockLast();
|
||||||
|
|
||||||
StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads.asFlux())
|
StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads.asFlux())
|
||||||
.expectNext("bar")
|
.expectNext("bar")
|
||||||
.expectNext("baz")
|
.expectNext("baz")
|
||||||
.thenAwait(Duration.ofMillis(50))
|
.thenAwait(Duration.ofMillis(10))
|
||||||
.thenCancel()
|
.thenCancel()
|
||||||
.verify(Duration.ofSeconds(5));
|
.verify(Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
|
|
@ -54,13 +54,14 @@ class RSocketClientToServerCoroutinesIntegrationTests {
|
||||||
@Test
|
@Test
|
||||||
fun fireAndForget() {
|
fun fireAndForget() {
|
||||||
Flux.range(1, 3)
|
Flux.range(1, 3)
|
||||||
|
.delayElements(Duration.ofMillis(10))
|
||||||
.concatMap { requester.route("receive").data("Hello $it").send() }
|
.concatMap { requester.route("receive").data("Hello $it").send() }
|
||||||
.blockLast()
|
.blockLast()
|
||||||
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads.asFlux())
|
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads.asFlux())
|
||||||
.expectNext("Hello 1")
|
.expectNext("Hello 1")
|
||||||
.expectNext("Hello 2")
|
.expectNext("Hello 2")
|
||||||
.expectNext("Hello 3")
|
.expectNext("Hello 3")
|
||||||
.thenAwait(Duration.ofMillis(50))
|
.thenAwait(Duration.ofMillis(10))
|
||||||
.thenCancel()
|
.thenCancel()
|
||||||
.verify(Duration.ofSeconds(5))
|
.verify(Duration.ofSeconds(5))
|
||||||
}
|
}
|
||||||
|
@ -68,13 +69,14 @@ class RSocketClientToServerCoroutinesIntegrationTests {
|
||||||
@Test
|
@Test
|
||||||
fun fireAndForgetAsync() {
|
fun fireAndForgetAsync() {
|
||||||
Flux.range(1, 3)
|
Flux.range(1, 3)
|
||||||
|
.delayElements(Duration.ofMillis(10))
|
||||||
.concatMap { i: Int -> requester.route("receive-async").data("Hello $i").send() }
|
.concatMap { i: Int -> requester.route("receive-async").data("Hello $i").send() }
|
||||||
.blockLast()
|
.blockLast()
|
||||||
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads.asFlux())
|
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads.asFlux())
|
||||||
.expectNext("Hello 1")
|
.expectNext("Hello 1")
|
||||||
.expectNext("Hello 2")
|
.expectNext("Hello 2")
|
||||||
.expectNext("Hello 3")
|
.expectNext("Hello 3")
|
||||||
.thenAwait(Duration.ofMillis(50))
|
.thenAwait(Duration.ofMillis(10))
|
||||||
.thenCancel()
|
.thenCancel()
|
||||||
.verify(Duration.ofSeconds(5))
|
.verify(Duration.ofSeconds(5))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue