diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index 96bc8ee556..ba74393489 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -26,14 +26,15 @@ - [0.12.0-SNAPSHOT,) + [1.2.0-SNAPSHOT,) 5.13.3 3.27.3 - 1.2.13 + 2.0.17 + 1.5.18 3.14.0 3.5.3 2.44.5 - 1.17.0 + 1.27.0 UTF-8 @@ -45,6 +46,12 @@ ${stream-client.version} + + org.slf4j + slf4j-api + ${slf4j.version} + + org.junit.jupiter junit-jupiter-engine @@ -73,6 +80,14 @@ test + + + com.google.googlejavaformat + google-java-format + ${google-java-format.version} + test + + diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java index a51e512f6c..cec7caf29f 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index cb6a80832f..e04fd2042d 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; @@ -33,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -220,16 +220,23 @@ public class FailureTest { executorService.submit( () -> { connected.set(false); - - try { Thread.sleep(2000); } catch (Exception e) {} - Client locator = - cf.get(new Client.ClientParameters().port(streamPortNode2())); - // wait until there's a new leader + AtomicReference locator = new AtomicReference<>(); try { waitAtMost( Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = locator.metadata(stream).get(stream); + try { + locator.set( + cf.get(new Client.ClientParameters().port(streamPortNode2()))); + return true; + } catch (Exception e) { + return false; + } + }); + waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = locator.get().metadata(stream).get(stream); return m.getLeader() != null && m.getLeader().getPort() != streamPortNode1(); }); @@ -238,7 +245,8 @@ public class FailureTest { return; } - int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); + int newLeaderPort = + locator.get().metadata(stream).get(stream).getLeader().getPort(); Client newPublisher = cf.get( new Client.ClientParameters() @@ -468,14 +476,23 @@ public class FailureTest { // avoid long-running task in the IO thread executorService.submit( () -> { - try { Thread.sleep(2000); } catch (Exception e) {} - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - int newReplicaPort = m.getReplicas().get(0).getPort(); + AtomicInteger newReplicaPort = new AtomicInteger(-1); + waitAtMost( + Duration.ofSeconds(5), + () -> { + try { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + newReplicaPort.set(m.getReplicas().get(0).getPort()); + return true; + } catch (Exception e) { + return false; + } + }); Client newConsumer = cf.get( new Client.ClientParameters() - .port(newReplicaPort) + .port(newReplicaPort.get()) .shutdownListener(shutdownListenerReference.get()) .chunkListener(credit()) .messageListener(messageListener)); @@ -588,7 +605,8 @@ public class FailureTest { } @Test - void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSameConnection() throws Exception { + void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSameConnection() + throws Exception { Client metadataClient = cf.get(new Client.ClientParameters().port(streamPortNode1())); Map metadata = metadataClient.metadata(stream); Client.StreamMetadata streamMetadata = metadata.get(stream); @@ -602,8 +620,7 @@ public class FailureTest { assertThat(streamMetadata.getLeader().getPort()).isEqualTo(streamPortNode1()); Client.Broker broker = streamMetadata.getReplicas().stream() - .filter( - r -> r.getPort() == streamPortNode1() || r.getPort() == streamPortNode2()) + .filter(r -> r.getPort() == streamPortNode1() || r.getPort() == streamPortNode2()) .findFirst() .get(); @@ -612,8 +629,7 @@ public class FailureTest { cf.get( new ClientParameters() .port(broker.getPort()) - .metadataListener( - (stream, code) -> metadataNotifications.incrementAndGet())); + .metadataListener((stream, code) -> metadataNotifications.incrementAndGet())); client.declarePublisher((byte) 42, null, stream); client.subscribe((byte) 66, stream, OffsetSpecification.first(), 1); diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java index 893400e4c3..7254ad47e3 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; @@ -101,7 +102,8 @@ public class Host { return System.getProperty("node2.name", "rabbit-2@" + hostname()); } - public static Process killStreamLocalMemberProcess(String stream, String nodename) throws IOException { + public static Process killStreamLocalMemberProcess(String stream, String nodename) + throws IOException { return rabbitmqctl( "eval 'case rabbit_stream_manager:lookup_local_member(<<\"/\">>, <<\"" + stream diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java index 24718f87b9..dceac532c8 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; @@ -27,8 +28,8 @@ import com.rabbitmq.stream.impl.Client.Broker; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.Client.StreamMetadata; -import java.util.Collections; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -58,9 +59,8 @@ public class LeaderLocatorTest { void clientLocalLocatorShouldMakeLeaderOnConnectedNode() { int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()}; for (int port : ports) { - Client client = cf.get(new Client.ClientParameters() - .port(port) - .rpcTimeout(Duration.ofSeconds(30))); + Client client = + cf.get(new Client.ClientParameters().port(port).rpcTimeout(Duration.ofSeconds(30))); String s = UUID.randomUUID().toString(); try { Response response = diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index 08ba934021..78161e007d 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 279beada99..09c82a6983 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; @@ -52,7 +53,7 @@ public class TestUtils { waitAtMost(Duration.ofSeconds(10), condition); } - static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { + static void waitAtMost(Duration duration, BooleanSupplier condition) { if (condition.getAsBoolean()) { return; } @@ -60,7 +61,12 @@ public class TestUtils { int waitedTime = 0; long timeoutInMs = duration.toMillis(); while (waitedTime <= timeoutInMs) { - Thread.sleep(waitTime); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } if (condition.getAsBoolean()) { return; } diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml index 59b910cdee..46dd843af7 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml @@ -26,14 +26,15 @@ - [0.12.0-SNAPSHOT,) + [1.2.0-SNAPSHOT,) 5.13.3 3.27.3 - 1.2.13 + 2.0.17 + 1.5.18 3.14.0 3.5.3 2.44.5 - 1.18.1 + 1.27.0 5.0.0 2.13.1 UTF-8 @@ -47,6 +48,12 @@ ${stream-client.version} + + org.slf4j + slf4j-api + ${slf4j.version} + + org.junit.jupiter junit-jupiter-engine @@ -89,6 +96,14 @@ test + + + com.google.googlejavaformat + google-java-format + ${google-java-format.version} + test + + diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index ec4bde0a90..6996b04e15 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream; diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 1dee78380f..5669be8da1 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom +// Inc. and/or its subsidiaries. All rights reserved. // package com.rabbitmq.stream;