mirror of https://github.com/apache/kafka.git
Compare commits
7 Commits
d40a9fa151
...
23afdb4807
Author | SHA1 | Date |
---|---|---|
|
23afdb4807 | |
|
4a5aa37169 | |
|
1940c321de | |
|
b4e5e6b1dd | |
|
381ec4fd16 | |
|
cbb2a9c4c3 | |
|
bf56000f37 |
|
@ -1,132 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||
import org.apache.kafka.common.test.TestKitNodes;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@Tag("integration")
|
||||
public class ReconfigurableQuorumIntegrationTest {
|
||||
|
||||
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
|
||||
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
|
||||
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.waitForCondition(() -> {
|
||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
||||
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
|
||||
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
TestUtils.waitForCondition(() -> {
|
||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
||||
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
|
||||
|
||||
admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
var removeFuture = admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||
|
||||
var addFuture = admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -63,83 +63,22 @@ import joptsimple.OptionSpec;
|
|||
public class TestLinearWriteSpeed {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
OptionParser parser = new OptionParser();
|
||||
|
||||
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
|
||||
.withRequiredArg()
|
||||
.describedAs("path")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(System.getProperty("java.io.tmpdir"));
|
||||
|
||||
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Long.class);
|
||||
|
||||
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1024);
|
||||
|
||||
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_files")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
|
||||
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
|
||||
.withRequiredArg()
|
||||
.describedAs("ms")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(1000L);
|
||||
|
||||
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
|
||||
.withRequiredArg()
|
||||
.describedAs("mb")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(Integer.MAX_VALUE);
|
||||
|
||||
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
|
||||
.withRequiredArg()
|
||||
.describedAs("message_count")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(Long.MAX_VALUE);
|
||||
|
||||
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("codec")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(CompressionType.NONE.name);
|
||||
|
||||
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("level")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
|
||||
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
|
||||
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
|
||||
var parser = new OptionParser();
|
||||
var option = createOptions(parser);
|
||||
OptionSet options = parser.parse(args);
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
|
||||
|
||||
long bytesToWrite = options.valueOf(bytesOpt);
|
||||
int bufferSize = options.valueOf(sizeOpt);
|
||||
int numFiles = options.valueOf(filesOpt);
|
||||
long reportingInterval = options.valueOf(reportingIntervalOpt);
|
||||
String dir = options.valueOf(dirOpt);
|
||||
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, option.bytesOpt, option.sizeOpt);
|
||||
long bytesToWrite = options.valueOf(option.bytesOpt);
|
||||
int bufferSize = options.valueOf(option.sizeOpt);
|
||||
int numFiles = options.valueOf(option.filesOpt);
|
||||
long reportingInterval = options.valueOf(option.reportingIntervalOpt);
|
||||
String dir = options.valueOf(option.dirOpt);
|
||||
long maxThroughputBytes = options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
|
||||
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
|
||||
int messageSize = options.valueOf(messageSizeOpt);
|
||||
long flushInterval = options.valueOf(flushIntervalOpt);
|
||||
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
|
||||
int messageSize = options.valueOf(option.messageSizeOpt);
|
||||
long flushInterval = options.valueOf(option.flushIntervalOpt);
|
||||
CompressionType compressionType = CompressionType.forName(options.valueOf(option.compressionCodecOpt));
|
||||
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
|
||||
Integer compressionLevel = options.valueOf(compressionLevelOpt);
|
||||
Integer compressionLevel = options.valueOf(option.compressionLevelOpt);
|
||||
|
||||
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
|
||||
Compression compression = compressionBuilder.build();
|
||||
|
@ -159,17 +98,17 @@ public class TestLinearWriteSpeed {
|
|||
scheduler.startup();
|
||||
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
if (options.has(mmapOpt)) {
|
||||
if (options.has(option.mmapOpt)) {
|
||||
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
|
||||
} else if (options.has(channelOpt)) {
|
||||
} else if (options.has(option.channelOpt)) {
|
||||
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
|
||||
} else if (options.has(logOpt)) {
|
||||
} else if (options.has(option.logOpt)) {
|
||||
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
|
||||
Properties logProperties = new Properties();
|
||||
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
|
||||
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
|
||||
LogConfig logConfig = new LogConfig(logProperties);
|
||||
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
|
||||
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
|
||||
} else {
|
||||
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
|
||||
Exit.exit(1);
|
||||
|
@ -298,9 +237,13 @@ public class TestLinearWriteSpeed {
|
|||
static class LogWritable implements Writable {
|
||||
MemoryRecords messages;
|
||||
UnifiedLog log;
|
||||
Compression compression;
|
||||
SimpleRecord[] records;
|
||||
|
||||
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
|
||||
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
|
||||
this.messages = messages;
|
||||
this.compression = compression;
|
||||
this.records = recordsList.toArray(new SimpleRecord[0]);
|
||||
Utils.delete(dir);
|
||||
this.log = UnifiedLog.create(
|
||||
dir,
|
||||
|
@ -323,6 +266,7 @@ public class TestLinearWriteSpeed {
|
|||
}
|
||||
|
||||
public int write() {
|
||||
this.messages = MemoryRecords.withRecords(compression, records);
|
||||
log.appendAsLeader(
|
||||
messages,
|
||||
0,
|
||||
|
@ -338,4 +282,91 @@ public class TestLinearWriteSpeed {
|
|||
Utils.delete(log.dir());
|
||||
}
|
||||
}
|
||||
|
||||
private record Options(OptionSpec<String> dirOpt, OptionSpec<Long> bytesOpt, OptionSpec<Integer> sizeOpt,
|
||||
OptionSpec<Integer> messageSizeOpt, OptionSpec<Integer> filesOpt,
|
||||
OptionSpec<Long> reportingIntervalOpt, OptionSpec<Integer> maxThroughputOpt,
|
||||
OptionSpec<Long> flushIntervalOpt, OptionSpec<String> compressionCodecOpt,
|
||||
OptionSpec<Integer> compressionLevelOpt, OptionSpec<Void> channelOpt,
|
||||
OptionSpec<Void> logOpt, OptionSpec<Void> mmapOpt) {
|
||||
}
|
||||
|
||||
private static Options createOptions(OptionParser parser) {
|
||||
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
|
||||
.withRequiredArg()
|
||||
.describedAs("path")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(System.getProperty("java.io.tmpdir"));
|
||||
|
||||
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Long.class);
|
||||
|
||||
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1024);
|
||||
|
||||
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_files")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
|
||||
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
|
||||
.withRequiredArg()
|
||||
.describedAs("ms")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(1000L);
|
||||
|
||||
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
|
||||
.withRequiredArg()
|
||||
.describedAs("mb")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(Integer.MAX_VALUE);
|
||||
|
||||
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
|
||||
.withRequiredArg()
|
||||
.describedAs("message_count")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(Long.MAX_VALUE);
|
||||
|
||||
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("codec")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(CompressionType.NONE.name);
|
||||
|
||||
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("level")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
|
||||
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
|
||||
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
|
||||
|
||||
return new Options(
|
||||
dirOpt,
|
||||
bytesOpt,
|
||||
sizeOpt,
|
||||
messageSizeOpt,
|
||||
filesOpt,
|
||||
reportingIntervalOpt,
|
||||
maxThroughputOpt,
|
||||
flushIntervalOpt,
|
||||
compressionCodecOpt,
|
||||
compressionLevelOpt,
|
||||
channelOpt,
|
||||
logOpt,
|
||||
mmapOpt
|
||||
);
|
||||
}
|
||||
}
|
|
@ -15,13 +15,16 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server;
|
||||
package org.apache.kafka.server;
|
||||
|
||||
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.FeatureMetadata;
|
||||
import org.apache.kafka.clients.admin.QuorumInfo;
|
||||
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
||||
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||
import org.apache.kafka.common.test.TestKitNodes;
|
||||
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||
|
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
|
|||
import org.apache.kafka.server.common.KRaftVersion;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Tag("integration")
|
||||
public class ReconfigurableQuorumIntegrationTest {
|
||||
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
||||
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
||||
|
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
||||
});
|
||||
|
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
).setStandalone(true).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
||||
});
|
||||
|
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
|
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
||||
|
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
|
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
|
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
for (int replicaId : new int[] {3000, 3001, 3002}) {
|
||||
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||
}
|
||||
});
|
||||
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||
for (int replicaId : new int[] {3001, 3002}) {
|
||||
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||
}
|
||||
});
|
||||
|
||||
admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
var removeFuture = admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||
|
||||
var addFuture = admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue