Compare commits

...

7 Commits

Author SHA1 Message Date
Chang-Chi Hsu 23afdb4807
Merge 1940c321de into 4a5aa37169 2025-10-08 04:35:57 +08:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
Ubuntu 1940c321de Refactor TestLinearWriteSpeed to use array type for records 2025-10-06 22:20:38 +00:00
Ubuntu b4e5e6b1dd Merge branch 'trunk' of github.com:apache/kafka into KAFKA-19614 2025-10-06 22:18:35 +00:00
Ubuntu 381ec4fd16 Rewrite the Options class using a record 2025-09-30 21:13:25 +00:00
Chang-Chi Hsu cbb2a9c4c3 Extract options into main function 2025-09-30 21:13:25 +00:00
Chang-Chi Hsu bf56000f37 Refactor main function of TestLinearWriteSpeed 2025-09-30 21:11:44 +00:00
3 changed files with 216 additions and 220 deletions

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -63,83 +63,22 @@ import joptsimple.OptionSpec;
public class TestLinearWriteSpeed {
public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class);
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
var parser = new OptionParser();
var option = createOptions(parser);
OptionSet options = parser.parse(args);
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
long bytesToWrite = options.valueOf(bytesOpt);
int bufferSize = options.valueOf(sizeOpt);
int numFiles = options.valueOf(filesOpt);
long reportingInterval = options.valueOf(reportingIntervalOpt);
String dir = options.valueOf(dirOpt);
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
CommandLineUtils.checkRequiredArgs(parser, options, option.bytesOpt, option.sizeOpt);
long bytesToWrite = options.valueOf(option.bytesOpt);
int bufferSize = options.valueOf(option.sizeOpt);
int numFiles = options.valueOf(option.filesOpt);
long reportingInterval = options.valueOf(option.reportingIntervalOpt);
String dir = options.valueOf(option.dirOpt);
long maxThroughputBytes = options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int messageSize = options.valueOf(messageSizeOpt);
long flushInterval = options.valueOf(flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
int messageSize = options.valueOf(option.messageSizeOpt);
long flushInterval = options.valueOf(option.flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(options.valueOf(option.compressionCodecOpt));
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
Integer compressionLevel = options.valueOf(compressionLevelOpt);
Integer compressionLevel = options.valueOf(option.compressionLevelOpt);
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
Compression compression = compressionBuilder.build();
@ -159,17 +98,17 @@ public class TestLinearWriteSpeed {
scheduler.startup();
for (int i = 0; i < numFiles; i++) {
if (options.has(mmapOpt)) {
if (options.has(option.mmapOpt)) {
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
} else if (options.has(channelOpt)) {
} else if (options.has(option.channelOpt)) {
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
} else if (options.has(logOpt)) {
} else if (options.has(option.logOpt)) {
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
Properties logProperties = new Properties();
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
LogConfig logConfig = new LogConfig(logProperties);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
Exit.exit(1);
@ -298,9 +237,13 @@ public class TestLinearWriteSpeed {
static class LogWritable implements Writable {
MemoryRecords messages;
UnifiedLog log;
Compression compression;
SimpleRecord[] records;
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
this.messages = messages;
this.compression = compression;
this.records = recordsList.toArray(new SimpleRecord[0]);
Utils.delete(dir);
this.log = UnifiedLog.create(
dir,
@ -323,6 +266,7 @@ public class TestLinearWriteSpeed {
}
public int write() {
this.messages = MemoryRecords.withRecords(compression, records);
log.appendAsLeader(
messages,
0,
@ -338,4 +282,91 @@ public class TestLinearWriteSpeed {
Utils.delete(log.dir());
}
}
private record Options(OptionSpec<String> dirOpt, OptionSpec<Long> bytesOpt, OptionSpec<Integer> sizeOpt,
OptionSpec<Integer> messageSizeOpt, OptionSpec<Integer> filesOpt,
OptionSpec<Long> reportingIntervalOpt, OptionSpec<Integer> maxThroughputOpt,
OptionSpec<Long> flushIntervalOpt, OptionSpec<String> compressionCodecOpt,
OptionSpec<Integer> compressionLevelOpt, OptionSpec<Void> channelOpt,
OptionSpec<Void> logOpt, OptionSpec<Void> mmapOpt) {
}
private static Options createOptions(OptionParser parser) {
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class);
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
return new Options(
dirOpt,
bytesOpt,
sizeOpt,
messageSizeOpt,
filesOpt,
reportingIntervalOpt,
maxThroughputOpt,
flushIntervalOpt,
compressionCodecOpt,
compressionLevelOpt,
channelOpt,
logOpt,
mmapOpt
);
}
}

View File

@ -15,13 +15,16 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}