diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index a6c5d92a31..5622ec9ac3 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -26,7 +26,7 @@ - 0.11.0 + [0.12.0-SNAPSHOT,) 5.9.3 3.24.2 1.2.12 @@ -115,4 +115,15 @@ + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + true + false + + + + diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 4d28511400..d4315a1311 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -16,9 +16,8 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.*; import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok; -import static com.rabbitmq.stream.TestUtils.waitAtMost; -import static com.rabbitmq.stream.TestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -164,7 +163,7 @@ public class FailureTest { new Client.ClientParameters() .port(TestUtils.streamPortNode1()) .messageListener( - (subscriptionId, offset, chunkTimestamp, committedChunkId, msg) -> { + (subscriptionId, offset, chunkTimestamp, committedChunkId, context, msg) -> { bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); consumeLatch.countDown(); })); @@ -341,11 +340,14 @@ public class FailureTest { cf.get( new Client.ClientParameters() .port(m.getReplicas().get(0).getPort()) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(credit()) .messageListener( - (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + (subscriptionId, + offset, + chunkTimestamp, + committedChunkId, + context, + message) -> { consumed.add(message); generations.add((Long) message.getApplicationProperties().get("generation")); if (consumed.size() == confirmed.size()) { @@ -447,7 +449,7 @@ public class FailureTest { Set generations = ConcurrentHashMap.newKeySet(); Set consumedIds = ConcurrentHashMap.newKeySet(); Client.MessageListener messageListener = - (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + (subscriptionId, offset, chunkTimestamp, committedChunkId, context, message) -> { consumed.add(message); generations.add((Long) message.getApplicationProperties().get("generation")); consumedIds.add(message.getProperties().getMessageIdAsLong()); @@ -471,9 +473,7 @@ public class FailureTest { new Client.ClientParameters() .port(newReplicaPort) .shutdownListener(shutdownListenerReference.get()) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(credit()) .messageListener(messageListener)); newConsumer.subscribe( @@ -494,9 +494,7 @@ public class FailureTest { new Client.ClientParameters() .port(replica.getPort()) .shutdownListener(shutdownListener) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(credit()) .messageListener(messageListener)); Client.Response response = diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index cc1464382b..7a4d214c39 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -16,6 +16,7 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.credit; import static org.assertj.core.api.Assertions.assertThat; import com.rabbitmq.stream.impl.Client; @@ -115,11 +116,14 @@ public class StreamTest { cf.get( new Client.ClientParameters() .port(consumerBroker.apply(streamMetadata).getPort()) - .chunkListener( - (client1, subscriptionId, offset, messageCount1, dataSize) -> - client1.credit(subscriptionId, 10)) + .chunkListener(credit()) .messageListener( - (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + (subscriptionId, + offset, + chunkTimestamp, + committedChunkId, + context, + message) -> { bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); consumingLatch.countDown(); })); @@ -128,7 +132,8 @@ public class StreamTest { assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(bodies).hasSize(messageCount); - IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); + IntStream.range(0, messageCount) + .forEach(i -> assertThat(bodies.contains("hello " + i)).isTrue()); } @Test diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 3ddb695f4b..3b54147a0b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -218,4 +218,11 @@ public class TestUtils { expectedResponse); } } + + static Client.ChunkListener credit() { + return (client, subscriptionId, offset, messageCount, dataSize) -> { + client.credit(subscriptionId, 1); + return null; + }; + } } diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml index 879a1986fc..4af736f9a5 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml @@ -26,7 +26,7 @@ - 0.11.0 + [0.12.0-SNAPSHOT,) 5.9.3 3.24.2 1.2.12 @@ -131,4 +131,15 @@ + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + true + false + + + + diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index f4448e6e23..442c223263 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -140,11 +140,6 @@ public class HttpTest { .collect(Collectors.toList()); } - static List> entities( - List> entities, Predicate> filter) { - return entities.stream().filter(filter).collect(Collectors.toList()); - } - static Map entity( List> entities, Predicate> filter) { return entities.stream().filter(filter).findFirst().orElse(Collections.emptyMap()); @@ -558,9 +553,7 @@ public class HttpTest { cf.get( new ClientParameters() .clientProperty("connection_name", connectionProvidedName) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(TestUtils.credit()) .shutdownListener(shutdownContext -> closed.set(true))); client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10, subscriptionProperties); diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 0e3ab66a5d..1f70845383 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -257,4 +257,11 @@ public class TestUtils { static Condition isNull() { return new Condition<>(Objects::isNull, "null"); } + + static Client.ChunkListener credit() { + return (client, subscriptionId, offset, messageCount, dataSize) -> { + client.credit(subscriptionId, 1); + return null; + }; + } }