mirror of https://github.com/apache/kafka.git
Compare commits
15 Commits
ab4f39fd59
...
f3e4e6debd
Author | SHA1 | Date |
---|---|---|
|
f3e4e6debd | |
|
4a5aa37169 | |
|
509a0d2854 | |
|
c46ce30600 | |
|
73f0257972 | |
|
b6b71021ef | |
|
46e5e7e179 | |
|
90655da996 | |
|
5db6b53401 | |
|
70eb3a36fc | |
|
53098dbff9 | |
|
034c4981a9 | |
|
4bca99e040 | |
|
191f3bc957 | |
|
716cac2d8e |
|
@ -1,132 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.kafka.clients.admin;
|
|
||||||
|
|
||||||
import org.apache.kafka.common.Uuid;
|
|
||||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
|
||||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
|
||||||
import org.apache.kafka.common.test.TestKitNodes;
|
|
||||||
import org.apache.kafka.test.TestUtils;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Tag;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
|
|
||||||
@Tag("integration")
|
|
||||||
public class ReconfigurableQuorumIntegrationTest {
|
|
||||||
|
|
||||||
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
|
|
||||||
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
|
|
||||||
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
|
||||||
final var nodes = new TestKitNodes.Builder()
|
|
||||||
.setClusterId("test-cluster")
|
|
||||||
.setNumBrokerNodes(1)
|
|
||||||
.setNumControllerNodes(3)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
|
||||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
|
||||||
initialVoters.put(
|
|
||||||
controllerNode.id(),
|
|
||||||
controllerNode.metadataDirectoryId()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
|
||||||
cluster.format();
|
|
||||||
cluster.startup();
|
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
|
||||||
TestUtils.waitForCondition(() -> {
|
|
||||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
|
||||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
|
||||||
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
|
|
||||||
|
|
||||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
|
||||||
admin.removeRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
|
||||||
).all().get();
|
|
||||||
TestUtils.waitForCondition(() -> {
|
|
||||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
|
||||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
|
||||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
|
||||||
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
|
|
||||||
|
|
||||||
admin.addRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
|
||||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
|
||||||
).all().get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
|
||||||
final var nodes = new TestKitNodes.Builder()
|
|
||||||
.setClusterId("test-cluster")
|
|
||||||
.setNumBrokerNodes(1)
|
|
||||||
.setNumControllerNodes(3)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
|
||||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
|
||||||
initialVoters.put(
|
|
||||||
controllerNode.id(),
|
|
||||||
controllerNode.metadataDirectoryId()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
|
||||||
cluster.format();
|
|
||||||
cluster.startup();
|
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
|
||||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
|
||||||
var removeFuture = admin.removeRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
|
||||||
).all();
|
|
||||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
|
||||||
|
|
||||||
var addFuture = admin.addRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
|
||||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
|
||||||
).all();
|
|
||||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -467,7 +467,7 @@ class BrokerServer(
|
||||||
|
|
||||||
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
|
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
|
||||||
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
|
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
|
||||||
config.numIoThreads, "RequestHandlerAvgIdlePercent")
|
config.numIoThreads)
|
||||||
|
|
||||||
metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
|
metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
|
||||||
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,
|
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,
|
||||||
|
|
|
@ -288,7 +288,6 @@ class ControllerServer(
|
||||||
controllerApis,
|
controllerApis,
|
||||||
time,
|
time,
|
||||||
config.numIoThreads,
|
config.numIoThreads,
|
||||||
"RequestHandlerAvgIdlePercent",
|
|
||||||
"controller")
|
"controller")
|
||||||
|
|
||||||
// Set up the metadata cache publisher.
|
// Set up the metadata cache publisher.
|
||||||
|
|
|
@ -24,6 +24,7 @@ import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChan
|
||||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import com.yammer.metrics.core.Meter
|
import com.yammer.metrics.core.Meter
|
||||||
|
import kafka.server.KafkaRequestHandlerPool.{aggregateThreads, requestHandlerAvgIdleMetricName}
|
||||||
import org.apache.kafka.common.internals.FatalExitError
|
import org.apache.kafka.common.internals.FatalExitError
|
||||||
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time}
|
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time}
|
||||||
import org.apache.kafka.server.common.RequestLocal
|
import org.apache.kafka.server.common.RequestLocal
|
||||||
|
@ -89,7 +90,8 @@ class KafkaRequestHandler(
|
||||||
id: Int,
|
id: Int,
|
||||||
brokerId: Int,
|
brokerId: Int,
|
||||||
val aggregateIdleMeter: Meter,
|
val aggregateIdleMeter: Meter,
|
||||||
val totalHandlerThreads: AtomicInteger,
|
val perPoolIdleMeter: Meter,
|
||||||
|
val poolHandlerThreads: AtomicInteger,
|
||||||
val requestChannel: RequestChannel,
|
val requestChannel: RequestChannel,
|
||||||
apis: ApiRequestHandler,
|
apis: ApiRequestHandler,
|
||||||
time: Time,
|
time: Time,
|
||||||
|
@ -112,7 +114,10 @@ class KafkaRequestHandler(
|
||||||
val req = requestChannel.receiveRequest(300)
|
val req = requestChannel.receiveRequest(300)
|
||||||
val endTime = time.nanoseconds
|
val endTime = time.nanoseconds
|
||||||
val idleTime = endTime - startSelectTime
|
val idleTime = endTime - startSelectTime
|
||||||
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
|
// Per-pool idle ratio uses the pool's own thread count as denominator
|
||||||
|
perPoolIdleMeter.mark(idleTime / poolHandlerThreads.get)
|
||||||
|
// Aggregate idle ratio uses the total threads across all pools as denominator
|
||||||
|
aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
|
||||||
|
|
||||||
req match {
|
req match {
|
||||||
case RequestChannel.ShutdownRequest =>
|
case RequestChannel.ShutdownRequest =>
|
||||||
|
@ -192,19 +197,26 @@ class KafkaRequestHandler(
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object KafkaRequestHandlerPool {
|
||||||
|
val aggregateThreads = new AtomicInteger(0)
|
||||||
|
val requestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
|
||||||
|
}
|
||||||
|
|
||||||
class KafkaRequestHandlerPool(
|
class KafkaRequestHandlerPool(
|
||||||
val brokerId: Int,
|
val brokerId: Int,
|
||||||
val requestChannel: RequestChannel,
|
val requestChannel: RequestChannel,
|
||||||
val apis: ApiRequestHandler,
|
val apis: ApiRequestHandler,
|
||||||
time: Time,
|
time: Time,
|
||||||
numThreads: Int,
|
numThreads: Int,
|
||||||
requestHandlerAvgIdleMetricName: String,
|
|
||||||
nodeName: String = "broker"
|
nodeName: String = "broker"
|
||||||
) extends Logging {
|
) extends Logging {
|
||||||
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
|
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
|
||||||
|
|
||||||
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
|
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
|
||||||
/* a meter to track the average free capacity of the request handlers */
|
/* Per-pool idle meter (broker-only or controller-only) */
|
||||||
|
private val perPoolIdleMeterName = nodeName.capitalize + requestHandlerAvgIdleMetricName
|
||||||
|
private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, "percent", TimeUnit.NANOSECONDS)
|
||||||
|
/* Aggregate meter to track the average free capacity of the request handlers */
|
||||||
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
|
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
|
||||||
|
|
||||||
this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
|
this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
|
||||||
|
@ -214,7 +226,18 @@ class KafkaRequestHandlerPool(
|
||||||
}
|
}
|
||||||
|
|
||||||
def createHandler(id: Int): Unit = synchronized {
|
def createHandler(id: Int): Unit = synchronized {
|
||||||
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName)
|
runnables += new KafkaRequestHandler(
|
||||||
|
id,
|
||||||
|
brokerId,
|
||||||
|
aggregateIdleMeter,
|
||||||
|
perPoolIdleMeter,
|
||||||
|
threadPoolSize,
|
||||||
|
requestChannel,
|
||||||
|
apis,
|
||||||
|
time,
|
||||||
|
nodeName,
|
||||||
|
)
|
||||||
|
aggregateThreads.getAndIncrement()
|
||||||
KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start()
|
KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,6 +251,7 @@ class KafkaRequestHandlerPool(
|
||||||
} else if (newSize < currentSize) {
|
} else if (newSize < currentSize) {
|
||||||
for (i <- 1 to (currentSize - newSize)) {
|
for (i <- 1 to (currentSize - newSize)) {
|
||||||
runnables.remove(currentSize - i).stop()
|
runnables.remove(currentSize - i).stop()
|
||||||
|
aggregateThreads.getAndDecrement()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
threadPoolSize.set(newSize)
|
threadPoolSize.set(newSize)
|
||||||
|
@ -239,6 +263,8 @@ class KafkaRequestHandlerPool(
|
||||||
handler.initiateShutdown()
|
handler.initiateShutdown()
|
||||||
for (handler <- runnables)
|
for (handler <- runnables)
|
||||||
handler.awaitShutdown()
|
handler.awaitShutdown()
|
||||||
|
// Unregister this pool's threads from shared aggregate counter
|
||||||
|
aggregateThreads.addAndGet(-threadPoolSize.get)
|
||||||
info("shut down completely")
|
info("shut down completely")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,8 +130,7 @@ class TestRaftServer(
|
||||||
socketServer.dataPlaneRequestChannel,
|
socketServer.dataPlaneRequestChannel,
|
||||||
requestHandler,
|
requestHandler,
|
||||||
time,
|
time,
|
||||||
config.numIoThreads,
|
config.numIoThreads
|
||||||
"RequestHandlerAvgIdlePercent"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
workloadGenerator.start()
|
workloadGenerator.start()
|
||||||
|
|
|
@ -50,6 +50,7 @@ class KafkaRequestHandlerTest {
|
||||||
val topic2 = "topic2"
|
val topic2 = "topic2"
|
||||||
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
|
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
|
||||||
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
|
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
|
||||||
|
KafkaRequestHandlerPool.aggregateThreads.set(1)
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testCallbackTiming(): Unit = {
|
def testCallbackTiming(): Unit = {
|
||||||
|
@ -59,7 +60,7 @@ class KafkaRequestHandlerTest {
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
try {
|
try {
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
val request = makeRequest(time, metrics)
|
val request = makeRequest(time, metrics)
|
||||||
requestChannel.sendRequest(request)
|
requestChannel.sendRequest(request)
|
||||||
|
@ -95,7 +96,7 @@ class KafkaRequestHandlerTest {
|
||||||
val metrics = mock(classOf[RequestChannelMetrics])
|
val metrics = mock(classOf[RequestChannelMetrics])
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
var handledCount = 0
|
var handledCount = 0
|
||||||
var tryCompleteActionCount = 0
|
var tryCompleteActionCount = 0
|
||||||
|
@ -131,7 +132,7 @@ class KafkaRequestHandlerTest {
|
||||||
val metrics = mock(classOf[RequestChannelMetrics])
|
val metrics = mock(classOf[RequestChannelMetrics])
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
val originalRequestLocal = mock(classOf[RequestLocal])
|
val originalRequestLocal = mock(classOf[RequestLocal])
|
||||||
|
|
||||||
|
@ -165,7 +166,7 @@ class KafkaRequestHandlerTest {
|
||||||
val metrics = mock(classOf[RequestChannelMetrics])
|
val metrics = mock(classOf[RequestChannelMetrics])
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
val originalRequestLocal = mock(classOf[RequestLocal])
|
val originalRequestLocal = mock(classOf[RequestLocal])
|
||||||
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
|
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
|
||||||
|
@ -698,4 +699,94 @@ class KafkaRequestHandlerTest {
|
||||||
// cleanup
|
// cleanup
|
||||||
brokerTopicStats.close()
|
brokerTopicStats.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testRequestThreadMetrics(): Unit = {
|
||||||
|
val time = Time.SYSTEM
|
||||||
|
val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
|
||||||
|
val metricsController = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
|
||||||
|
val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
|
||||||
|
val requestChannelController = new RequestChannel(10, time, metricsController)
|
||||||
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
|
|
||||||
|
// Reset global shared counter for test
|
||||||
|
KafkaRequestHandlerPool.aggregateThreads.set(0)
|
||||||
|
|
||||||
|
// Create broker pool with 4 threads
|
||||||
|
val brokerPool = new KafkaRequestHandlerPool(
|
||||||
|
0,
|
||||||
|
requestChannelBroker,
|
||||||
|
apiHandler,
|
||||||
|
time,
|
||||||
|
4,
|
||||||
|
"broker"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Verify global counter is updated
|
||||||
|
assertEquals(4, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 4 after broker pool")
|
||||||
|
|
||||||
|
// Create controller pool with 4 threads
|
||||||
|
val controllerPool = new KafkaRequestHandlerPool(
|
||||||
|
0,
|
||||||
|
requestChannelController,
|
||||||
|
apiHandler,
|
||||||
|
time,
|
||||||
|
4,
|
||||||
|
"controller"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Verify global counter is updated to sum of both pools
|
||||||
|
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 8 after both pools")
|
||||||
|
|
||||||
|
try {
|
||||||
|
val aggregateMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
|
||||||
|
aggregateMeterField.setAccessible(true)
|
||||||
|
val aggregateMeter = aggregateMeterField.get(brokerPool).asInstanceOf[Meter]
|
||||||
|
|
||||||
|
val perPoolIdleMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("perPoolIdleMeter")
|
||||||
|
perPoolIdleMeterField.setAccessible(true)
|
||||||
|
val brokerPerPoolIdleMeter = perPoolIdleMeterField.get(brokerPool).asInstanceOf[Meter]
|
||||||
|
val controllerPerPoolIdleMeter = perPoolIdleMeterField.get(controllerPool).asInstanceOf[Meter]
|
||||||
|
|
||||||
|
var aggregateValue = 0.0
|
||||||
|
var brokerPerPoolValue = 0.0
|
||||||
|
var controllerPerPoolValue = 0.0
|
||||||
|
|
||||||
|
Thread.sleep(2000)
|
||||||
|
aggregateValue = aggregateMeter.oneMinuteRate()
|
||||||
|
brokerPerPoolValue = brokerPerPoolIdleMeter.oneMinuteRate()
|
||||||
|
controllerPerPoolValue = controllerPerPoolIdleMeter.oneMinuteRate()
|
||||||
|
|
||||||
|
// Verify that the meter shows reasonable idle percentage
|
||||||
|
assertTrue(aggregateValue >= 0.0 && aggregateValue <= 1.00, s"aggregate idle percent should be within [0,1], got $aggregateValue")
|
||||||
|
assertTrue(brokerPerPoolValue >= 0.0 && brokerPerPoolValue <= 1.00, s"broker per-pool idle percent should be within [0,1], got $brokerPerPoolValue")
|
||||||
|
assertTrue(controllerPerPoolValue >= 0.0 && controllerPerPoolValue <= 1.00, s"controller per-pool idle percent should be within [0,1], got $controllerPerPoolValue")
|
||||||
|
|
||||||
|
// Test pool resizing
|
||||||
|
// Shrink broker pool from 4 to 2 threads
|
||||||
|
brokerPool.resizeThreadPool(2)
|
||||||
|
assertEquals(2, brokerPool.threadPoolSize.get)
|
||||||
|
assertEquals(4, controllerPool.threadPoolSize.get)
|
||||||
|
assertEquals(6, KafkaRequestHandlerPool.aggregateThreads.get)
|
||||||
|
|
||||||
|
// Expand controller pool from 4 to 6 threads
|
||||||
|
controllerPool.resizeThreadPool(6)
|
||||||
|
assertEquals(2, brokerPool.threadPoolSize.get)
|
||||||
|
assertEquals(6, controllerPool.threadPoolSize.get)
|
||||||
|
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get)
|
||||||
|
|
||||||
|
// Wait for removed threads to fully exit before shutdown to avoid deadlock.
|
||||||
|
Thread.sleep(1000)
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
controllerPool.shutdown()
|
||||||
|
assertEquals(2, KafkaRequestHandlerPool.aggregateThreads.get)
|
||||||
|
brokerPool.shutdown()
|
||||||
|
metricsBroker.close()
|
||||||
|
metricsController.close()
|
||||||
|
|
||||||
|
// Verify global counter is reset after shutdown
|
||||||
|
assertEquals(0, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 0 after shutdown")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,13 +15,16 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package kafka.server;
|
package org.apache.kafka.server;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.clients.admin.FeatureMetadata;
|
import org.apache.kafka.clients.admin.FeatureMetadata;
|
||||||
import org.apache.kafka.clients.admin.QuorumInfo;
|
import org.apache.kafka.clients.admin.QuorumInfo;
|
||||||
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
||||||
|
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||||
import org.apache.kafka.common.test.TestKitNodes;
|
import org.apache.kafka.common.test.TestKitNodes;
|
||||||
import org.apache.kafka.common.test.api.TestKitDefaults;
|
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||||
|
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
|
||||||
import org.apache.kafka.server.common.KRaftVersion;
|
import org.apache.kafka.server.common.KRaftVersion;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@Tag("integration")
|
||||||
public class ReconfigurableQuorumIntegrationTest {
|
public class ReconfigurableQuorumIntegrationTest {
|
||||||
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
||||||
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
||||||
|
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
).build()) {
|
).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
||||||
});
|
});
|
||||||
|
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
).setStandalone(true).build()) {
|
).setStandalone(true).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
||||||
});
|
});
|
||||||
|
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
||||||
|
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||||
|
final var nodes = new TestKitNodes.Builder()
|
||||||
|
.setClusterId("test-cluster")
|
||||||
|
.setNumBrokerNodes(1)
|
||||||
|
.setNumControllerNodes(3)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||||
|
cluster.format();
|
||||||
|
cluster.startup();
|
||||||
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
for (int replicaId : new int[] {3000, 3001, 3002}) {
|
||||||
|
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||||
|
admin.removeRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||||
|
).all().get();
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
|
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||||
|
for (int replicaId : new int[] {3001, 3002}) {
|
||||||
|
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
admin.addRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||||
|
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||||
|
).all().get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||||
|
final var nodes = new TestKitNodes.Builder()
|
||||||
|
.setClusterId("test-cluster")
|
||||||
|
.setNumBrokerNodes(1)
|
||||||
|
.setNumControllerNodes(3)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||||
|
cluster.format();
|
||||||
|
cluster.startup();
|
||||||
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||||
|
var removeFuture = admin.removeRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||||
|
).all();
|
||||||
|
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||||
|
|
||||||
|
var addFuture = admin.addRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||||
|
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||||
|
).all();
|
||||||
|
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue