Polish stream Java test projects

Format, bump log dependencies, configure auto-format to get latest
dependencies.
This commit is contained in:
Arnaud Cogoluègnes 2025-07-07 14:35:10 +02:00
parent 27a362df3d
commit f35c7d72c1
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
10 changed files with 96 additions and 38 deletions

View File

@ -26,14 +26,15 @@
</developers> </developers>
<properties> <properties>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version> <stream-client.version>[1.2.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.13.3</junit.jupiter.version> <junit.jupiter.version>5.13.3</junit.jupiter.version>
<assertj.version>3.27.3</assertj.version> <assertj.version>3.27.3</assertj.version>
<logback.version>1.2.13</logback.version> <slf4j.version>2.0.17</slf4j.version>
<logback.version>1.5.18</logback.version>
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version> <maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version> <maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version>
<spotless.version>2.44.5</spotless.version> <spotless.version>2.44.5</spotless.version>
<google-java-format.version>1.17.0</google-java-format.version> <google-java-format.version>1.27.0</google-java-format.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
@ -45,6 +46,12 @@
<version>${stream-client.version}</version> <version>${stream-client.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId> <artifactId>junit-jupiter-engine</artifactId>
@ -73,6 +80,14 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- add explicitly to update automatically with dependabot -->
<dependency>
<groupId>com.google.googlejavaformat</groupId>
<artifactId>google-java-format</artifactId>
<version>${google-java-format.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;
@ -33,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -220,16 +220,23 @@ public class FailureTest {
executorService.submit( executorService.submit(
() -> { () -> {
connected.set(false); connected.set(false);
AtomicReference<Client> locator = new AtomicReference<>();
try { Thread.sleep(2000); } catch (Exception e) {}
Client locator =
cf.get(new Client.ClientParameters().port(streamPortNode2()));
// wait until there's a new leader
try { try {
waitAtMost( waitAtMost(
Duration.ofSeconds(5), Duration.ofSeconds(5),
() -> { () -> {
Client.StreamMetadata m = locator.metadata(stream).get(stream); try {
locator.set(
cf.get(new Client.ClientParameters().port(streamPortNode2())));
return true;
} catch (Exception e) {
return false;
}
});
waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
return m.getLeader() != null return m.getLeader() != null
&& m.getLeader().getPort() != streamPortNode1(); && m.getLeader().getPort() != streamPortNode1();
}); });
@ -238,7 +245,8 @@ public class FailureTest {
return; return;
} }
int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); int newLeaderPort =
locator.get().metadata(stream).get(stream).getLeader().getPort();
Client newPublisher = Client newPublisher =
cf.get( cf.get(
new Client.ClientParameters() new Client.ClientParameters()
@ -468,14 +476,23 @@ public class FailureTest {
// avoid long-running task in the IO thread // avoid long-running task in the IO thread
executorService.submit( executorService.submit(
() -> { () -> {
try { Thread.sleep(2000); } catch (Exception e) {} AtomicInteger newReplicaPort = new AtomicInteger(-1);
waitAtMost(
Duration.ofSeconds(5),
() -> {
try {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
int newReplicaPort = m.getReplicas().get(0).getPort(); newReplicaPort.set(m.getReplicas().get(0).getPort());
return true;
} catch (Exception e) {
return false;
}
});
Client newConsumer = Client newConsumer =
cf.get( cf.get(
new Client.ClientParameters() new Client.ClientParameters()
.port(newReplicaPort) .port(newReplicaPort.get())
.shutdownListener(shutdownListenerReference.get()) .shutdownListener(shutdownListenerReference.get())
.chunkListener(credit()) .chunkListener(credit())
.messageListener(messageListener)); .messageListener(messageListener));
@ -588,7 +605,8 @@ public class FailureTest {
} }
@Test @Test
void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSameConnection() throws Exception { void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSameConnection()
throws Exception {
Client metadataClient = cf.get(new Client.ClientParameters().port(streamPortNode1())); Client metadataClient = cf.get(new Client.ClientParameters().port(streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream); Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream); Client.StreamMetadata streamMetadata = metadata.get(stream);
@ -602,8 +620,7 @@ public class FailureTest {
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(streamPortNode1()); assertThat(streamMetadata.getLeader().getPort()).isEqualTo(streamPortNode1());
Client.Broker broker = Client.Broker broker =
streamMetadata.getReplicas().stream() streamMetadata.getReplicas().stream()
.filter( .filter(r -> r.getPort() == streamPortNode1() || r.getPort() == streamPortNode2())
r -> r.getPort() == streamPortNode1() || r.getPort() == streamPortNode2())
.findFirst() .findFirst()
.get(); .get();
@ -612,8 +629,7 @@ public class FailureTest {
cf.get( cf.get(
new ClientParameters() new ClientParameters()
.port(broker.getPort()) .port(broker.getPort())
.metadataListener( .metadataListener((stream, code) -> metadataNotifications.incrementAndGet()));
(stream, code) -> metadataNotifications.incrementAndGet()));
client.declarePublisher((byte) 42, null, stream); client.declarePublisher((byte) 42, null, stream);
client.subscribe((byte) 66, stream, OffsetSpecification.first(), 1); client.subscribe((byte) 66, stream, OffsetSpecification.first(), 1);

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;
@ -101,7 +102,8 @@ public class Host {
return System.getProperty("node2.name", "rabbit-2@" + hostname()); return System.getProperty("node2.name", "rabbit-2@" + hostname());
} }
public static Process killStreamLocalMemberProcess(String stream, String nodename) throws IOException { public static Process killStreamLocalMemberProcess(String stream, String nodename)
throws IOException {
return rabbitmqctl( return rabbitmqctl(
"eval 'case rabbit_stream_manager:lookup_local_member(<<\"/\">>, <<\"" "eval 'case rabbit_stream_manager:lookup_local_member(<<\"/\">>, <<\""
+ stream + stream

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;
@ -27,8 +28,8 @@ import com.rabbitmq.stream.impl.Client.Broker;
import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.Client.StreamMetadata; import com.rabbitmq.stream.impl.Client.StreamMetadata;
import java.util.Collections;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -58,9 +59,8 @@ public class LeaderLocatorTest {
void clientLocalLocatorShouldMakeLeaderOnConnectedNode() { void clientLocalLocatorShouldMakeLeaderOnConnectedNode() {
int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()}; int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()};
for (int port : ports) { for (int port : ports) {
Client client = cf.get(new Client.ClientParameters() Client client =
.port(port) cf.get(new Client.ClientParameters().port(port).rpcTimeout(Duration.ofSeconds(30)));
.rpcTimeout(Duration.ofSeconds(30)));
String s = UUID.randomUUID().toString(); String s = UUID.randomUUID().toString();
try { try {
Response response = Response response =

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;
@ -52,7 +53,7 @@ public class TestUtils {
waitAtMost(Duration.ofSeconds(10), condition); waitAtMost(Duration.ofSeconds(10), condition);
} }
static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { static void waitAtMost(Duration duration, BooleanSupplier condition) {
if (condition.getAsBoolean()) { if (condition.getAsBoolean()) {
return; return;
} }
@ -60,7 +61,12 @@ public class TestUtils {
int waitedTime = 0; int waitedTime = 0;
long timeoutInMs = duration.toMillis(); long timeoutInMs = duration.toMillis();
while (waitedTime <= timeoutInMs) { while (waitedTime <= timeoutInMs) {
try {
Thread.sleep(waitTime); Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (condition.getAsBoolean()) { if (condition.getAsBoolean()) {
return; return;
} }

View File

@ -26,14 +26,15 @@
</developers> </developers>
<properties> <properties>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version> <stream-client.version>[1.2.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.13.3</junit.jupiter.version> <junit.jupiter.version>5.13.3</junit.jupiter.version>
<assertj.version>3.27.3</assertj.version> <assertj.version>3.27.3</assertj.version>
<logback.version>1.2.13</logback.version> <slf4j.version>2.0.17</slf4j.version>
<logback.version>1.5.18</logback.version>
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version> <maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version> <maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version>
<spotless.version>2.44.5</spotless.version> <spotless.version>2.44.5</spotless.version>
<google-java-format.version>1.18.1</google-java-format.version> <google-java-format.version>1.27.0</google-java-format.version>
<okhttp.version>5.0.0</okhttp.version> <okhttp.version>5.0.0</okhttp.version>
<gson.version>2.13.1</gson.version> <gson.version>2.13.1</gson.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -47,6 +48,12 @@
<version>${stream-client.version}</version> <version>${stream-client.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId> <artifactId>junit-jupiter-engine</artifactId>
@ -89,6 +96,14 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- add explicitly to update automatically with dependabot -->
<dependency>
<groupId>com.google.googlejavaformat</groupId>
<artifactId>google-java-format</artifactId>
<version>${google-java-format.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;

View File

@ -11,7 +11,8 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.stream; package com.rabbitmq.stream;