KAFKA-17574 Allow overriding TestKitNodes baseDirectory (#17225)

This allows shutting down a KafkaClusterTestKit from a JVM shutdown hook without risking error logs because the base directory has already been deleted by the shutdown hook TestUtils.tempDirectory sets up.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Stig Døssing 2024-10-24 19:48:25 +02:00 committed by GitHub
parent 24c6e8d085
commit 98b7e4deaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 34 additions and 9 deletions

View File

@ -43,6 +43,7 @@
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>

View File

@ -27,6 +27,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
@ -51,6 +52,7 @@ public class TestKitNodes {
public static class Builder {
private boolean combined;
private String clusterId;
private Path baseDirectory;
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
@ -104,6 +106,11 @@ public class TestKitNodes {
return this;
}
public Builder setBaseDirectory(Path baseDirectory) {
this.baseDirectory = baseDirectory;
return this;
}
public Builder setBrokerListenerName(ListenerName listenerName) {
this.brokerListenerName = listenerName;
return this;
@ -128,8 +135,9 @@ public class TestKitNodes {
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}
String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
}
if (clusterId == null) {
clusterId = Uuid.randomUuid().toString();
}
@ -160,7 +168,7 @@ public class TestKitNodes {
for (int id : controllerNodeIds) {
TestKitNode controllerNode = TestKitNodes.buildControllerNode(
id,
baseDirectory,
baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap())
@ -172,7 +180,7 @@ public class TestKitNodes {
for (int id : brokerNodeIds) {
TestKitNode brokerNode = TestKitNodes.buildBrokerNode(
id,
baseDirectory,
baseDirectory.toFile().getAbsolutePath(),
clusterId,
controllerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap()),
@ -181,7 +189,7 @@ public class TestKitNodes {
brokerNodes.put(id, brokerNode);
}
return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
}
}

View File

@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
@ -32,7 +33,7 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaClusterTestKitTest {
@ParameterizedTest
@ -88,7 +89,7 @@ public class KafkaClusterTestKitTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(5).
@ -118,8 +119,23 @@ public class KafkaClusterTestKitTest {
String expected = combined ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
assertEquals(expected, Paths.get(node.metadataDirectory()).getFileName().toString());
});
} catch (Exception e) {
fail("failed to init cluster", e);
}
}
@Test
public void testCreateClusterWithSpecificBaseDir() throws Exception {
Path baseDirectory = TestUtils.tempDirectory().toPath();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBaseDirectory(baseDirectory).
setNumBrokerNodes(1).
setCombined(true).
setNumControllerNodes(1).build()).build()) {
assertEquals(cluster.nodes().baseDirectory(), baseDirectory.toFile().getAbsolutePath());
cluster.nodes().controllerNodes().values().forEach(controller ->
assertTrue(Paths.get(controller.metadataDirectory()).startsWith(baseDirectory)));
cluster.nodes().brokerNodes().values().forEach(broker ->
assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
}
}
}