Compare commits

...

15 Commits

Author SHA1 Message Date
anonymous f3e4e6debd
Merge 509a0d2854 into 4a5aa37169 2025-10-07 12:16:53 -05:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
tony tang 509a0d2854
fix 2025-10-06 17:56:01 -05:00
tony tang c46ce30600
fix 2025-10-06 16:03:45 -05:00
tony tang 73f0257972
fix 2025-10-06 14:41:26 -05:00
tony tang b6b71021ef
fix 2025-10-06 14:28:05 -05:00
tony tang 46e5e7e179
fix 2025-10-06 13:38:37 -05:00
tony tang 90655da996
fix 2025-10-06 11:56:51 -05:00
tony tang 5db6b53401
fix 2025-10-06 10:59:10 -05:00
tony tang 70eb3a36fc
fix 2025-10-02 16:52:39 -05:00
tony tang 53098dbff9
fix 2025-10-02 12:12:56 -05:00
tony tang 034c4981a9
fix 2025-09-19 15:18:29 -05:00
tony tang 4bca99e040
Merge branch 'trunk' into KIP-1207 2025-09-19 15:02:40 -05:00
tony tang 191f3bc957
fix 2025-09-04 15:18:27 -05:00
tony tang 716cac2d8e
add metrics Broker/ControllerRequestHandlerAvgIdlePercent 2025-09-04 15:18:26 -05:00
7 changed files with 233 additions and 153 deletions

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -467,7 +467,7 @@ class BrokerServer(
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent") config.numIoThreads)
metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId, metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1, () => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,

View File

@ -288,7 +288,6 @@ class ControllerServer(
controllerApis, controllerApis,
time, time,
config.numIoThreads, config.numIoThreads,
"RequestHandlerAvgIdlePercent",
"controller") "controller")
// Set up the metadata cache publisher. // Set up the metadata cache publisher.

View File

@ -24,6 +24,7 @@ import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChan
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.server.KafkaRequestHandlerPool.{aggregateThreads, requestHandlerAvgIdleMetricName}
import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time} import org.apache.kafka.common.utils.{Exit, KafkaThread, Time}
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
@ -89,7 +90,8 @@ class KafkaRequestHandler(
id: Int, id: Int,
brokerId: Int, brokerId: Int,
val aggregateIdleMeter: Meter, val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger, val perPoolIdleMeter: Meter,
val poolHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel, val requestChannel: RequestChannel,
apis: ApiRequestHandler, apis: ApiRequestHandler,
time: Time, time: Time,
@ -112,7 +114,10 @@ class KafkaRequestHandler(
val req = requestChannel.receiveRequest(300) val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) // Per-pool idle ratio uses the pool's own thread count as denominator
perPoolIdleMeter.mark(idleTime / poolHandlerThreads.get)
// Aggregate idle ratio uses the total threads across all pools as denominator
aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
req match { req match {
case RequestChannel.ShutdownRequest => case RequestChannel.ShutdownRequest =>
@ -192,19 +197,26 @@ class KafkaRequestHandler(
} }
object KafkaRequestHandlerPool {
val aggregateThreads = new AtomicInteger(0)
val requestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
}
class KafkaRequestHandlerPool( class KafkaRequestHandlerPool(
val brokerId: Int, val brokerId: Int,
val requestChannel: RequestChannel, val requestChannel: RequestChannel,
val apis: ApiRequestHandler, val apis: ApiRequestHandler,
time: Time, time: Time,
numThreads: Int, numThreads: Int,
requestHandlerAvgIdleMetricName: String,
nodeName: String = "broker" nodeName: String = "broker"
) extends Logging { ) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass) private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */ /* Per-pool idle meter (broker-only or controller-only) */
private val perPoolIdleMeterName = nodeName.capitalize + requestHandlerAvgIdleMetricName
private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, "percent", TimeUnit.NANOSECONDS)
/* Aggregate meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] " this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
@ -214,7 +226,18 @@ class KafkaRequestHandlerPool(
} }
def createHandler(id: Int): Unit = synchronized { def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName) runnables += new KafkaRequestHandler(
id,
brokerId,
aggregateIdleMeter,
perPoolIdleMeter,
threadPoolSize,
requestChannel,
apis,
time,
nodeName,
)
aggregateThreads.getAndIncrement()
KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start() KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start()
} }
@ -228,6 +251,7 @@ class KafkaRequestHandlerPool(
} else if (newSize < currentSize) { } else if (newSize < currentSize) {
for (i <- 1 to (currentSize - newSize)) { for (i <- 1 to (currentSize - newSize)) {
runnables.remove(currentSize - i).stop() runnables.remove(currentSize - i).stop()
aggregateThreads.getAndDecrement()
} }
} }
threadPoolSize.set(newSize) threadPoolSize.set(newSize)
@ -239,6 +263,8 @@ class KafkaRequestHandlerPool(
handler.initiateShutdown() handler.initiateShutdown()
for (handler <- runnables) for (handler <- runnables)
handler.awaitShutdown() handler.awaitShutdown()
// Unregister this pool's threads from shared aggregate counter
aggregateThreads.addAndGet(-threadPoolSize.get)
info("shut down completely") info("shut down completely")
} }
} }

View File

@ -130,8 +130,7 @@ class TestRaftServer(
socketServer.dataPlaneRequestChannel, socketServer.dataPlaneRequestChannel,
requestHandler, requestHandler,
time, time,
config.numIoThreads, config.numIoThreads
"RequestHandlerAvgIdlePercent"
) )
workloadGenerator.start() workloadGenerator.start()

View File

@ -50,6 +50,7 @@ class KafkaRequestHandlerTest {
val topic2 = "topic2" val topic2 = "topic2"
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic) val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
KafkaRequestHandlerPool.aggregateThreads.set(1)
@Test @Test
def testCallbackTiming(): Unit = { def testCallbackTiming(): Unit = {
@ -59,7 +60,7 @@ class KafkaRequestHandlerTest {
val requestChannel = new RequestChannel(10, time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
try { try {
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
val request = makeRequest(time, metrics) val request = makeRequest(time, metrics)
requestChannel.sendRequest(request) requestChannel.sendRequest(request)
@ -95,7 +96,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics]) val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
var handledCount = 0 var handledCount = 0
var tryCompleteActionCount = 0 var tryCompleteActionCount = 0
@ -131,7 +132,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics]) val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
val originalRequestLocal = mock(classOf[RequestLocal]) val originalRequestLocal = mock(classOf[RequestLocal])
@ -165,7 +166,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics]) val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
val originalRequestLocal = mock(classOf[RequestLocal]) val originalRequestLocal = mock(classOf[RequestLocal])
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create()) when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
@ -698,4 +699,94 @@ class KafkaRequestHandlerTest {
// cleanup // cleanup
brokerTopicStats.close() brokerTopicStats.close()
} }
@Test
def testRequestThreadMetrics(): Unit = {
val time = Time.SYSTEM
val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
val metricsController = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
val requestChannelController = new RequestChannel(10, time, metricsController)
val apiHandler = mock(classOf[ApiRequestHandler])
// Reset global shared counter for test
KafkaRequestHandlerPool.aggregateThreads.set(0)
// Create broker pool with 4 threads
val brokerPool = new KafkaRequestHandlerPool(
0,
requestChannelBroker,
apiHandler,
time,
4,
"broker"
)
// Verify global counter is updated
assertEquals(4, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 4 after broker pool")
// Create controller pool with 4 threads
val controllerPool = new KafkaRequestHandlerPool(
0,
requestChannelController,
apiHandler,
time,
4,
"controller"
)
// Verify global counter is updated to sum of both pools
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 8 after both pools")
try {
val aggregateMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
aggregateMeterField.setAccessible(true)
val aggregateMeter = aggregateMeterField.get(brokerPool).asInstanceOf[Meter]
val perPoolIdleMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("perPoolIdleMeter")
perPoolIdleMeterField.setAccessible(true)
val brokerPerPoolIdleMeter = perPoolIdleMeterField.get(brokerPool).asInstanceOf[Meter]
val controllerPerPoolIdleMeter = perPoolIdleMeterField.get(controllerPool).asInstanceOf[Meter]
var aggregateValue = 0.0
var brokerPerPoolValue = 0.0
var controllerPerPoolValue = 0.0
Thread.sleep(2000)
aggregateValue = aggregateMeter.oneMinuteRate()
brokerPerPoolValue = brokerPerPoolIdleMeter.oneMinuteRate()
controllerPerPoolValue = controllerPerPoolIdleMeter.oneMinuteRate()
// Verify that the meter shows reasonable idle percentage
assertTrue(aggregateValue >= 0.0 && aggregateValue <= 1.00, s"aggregate idle percent should be within [0,1], got $aggregateValue")
assertTrue(brokerPerPoolValue >= 0.0 && brokerPerPoolValue <= 1.00, s"broker per-pool idle percent should be within [0,1], got $brokerPerPoolValue")
assertTrue(controllerPerPoolValue >= 0.0 && controllerPerPoolValue <= 1.00, s"controller per-pool idle percent should be within [0,1], got $controllerPerPoolValue")
// Test pool resizing
// Shrink broker pool from 4 to 2 threads
brokerPool.resizeThreadPool(2)
assertEquals(2, brokerPool.threadPoolSize.get)
assertEquals(4, controllerPool.threadPoolSize.get)
assertEquals(6, KafkaRequestHandlerPool.aggregateThreads.get)
// Expand controller pool from 4 to 6 threads
controllerPool.resizeThreadPool(6)
assertEquals(2, brokerPool.threadPoolSize.get)
assertEquals(6, controllerPool.threadPoolSize.get)
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get)
// Wait for removed threads to fully exit before shutdown to avoid deadlock.
Thread.sleep(1000)
} finally {
controllerPool.shutdown()
assertEquals(2, KafkaRequestHandlerPool.aggregateThreads.get)
brokerPool.shutdown()
metricsBroker.close()
metricsController.close()
// Verify global counter is reset after shutdown
assertEquals(0, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 0 after shutdown")
}
}
} }

View File

@ -15,13 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.server; package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata; import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint; import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest { public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception { static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get(); FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) { ).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
}); });
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) { ).setStandalone(true).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
}); });
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
} }
} }
} }
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
} }