KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup

Some kcontroller dynamic configurations may fail to apply at startup. This happens because there is a race between registering the reconfigurables to the DynamicBrokerConfig class, and receiving the first update from the metadata publisher. We can fix this by registering the reconfigurables first. This seems to have been introduced by the "MINOR: Install ControllerServer metadata publishers sooner" change.
This commit is contained in:
Colin P. McCabe 2024-01-14 13:50:10 -08:00
parent b16df3b103
commit d7f158e9d4
4 changed files with 55 additions and 17 deletions

View File

@ -368,6 +368,13 @@ class ControllerServer(
),
"controller"))
// Register this instance for dynamic config changes to the KafkaConfig. This must be called
// after the authorizer and quotaManagers are initialized, since it references those objects.
// It must be called before DynamicClientQuotaPublisher is installed, since otherwise we may
// miss the initial update which establishes the dynamic configurations that are in effect on
// startup.
config.dynamicConfig.addReconfigurables(this)
// Set up the client quotas publisher. This will enable controller mutation quotas and any
// other quotas which are applicable.
metadataPublishers.add(new DynamicClientQuotaPublisher(
@ -384,7 +391,6 @@ class ControllerServer(
credentialProvider
))
// Set up the DelegationToken publisher.
// We need a tokenManager for the Publisher
// The tokenCache in the tokenManager is the same used in DelegationTokenControlManager
@ -450,9 +456,6 @@ class ControllerServer(
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)
// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)

View File

@ -206,7 +206,7 @@ class KafkaRequestHandlerPool(
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

View File

@ -33,7 +33,8 @@ public class TestKitNodes {
public static class Builder {
private boolean combined = false;
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latest(), "testkit");
private final NavigableMap<Integer, ControllerNode.Builder> controllerNodeBuilders = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode.Builder> brokerNodeBuilders = new TreeMap<>();
@ -43,7 +44,12 @@ public class TestKitNodes {
}
public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
this.bootstrapMetadataVersion = metadataVersion;
this.bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "testkit");
return this;
}
public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
this.bootstrapMetadata = bootstrapMetadata;
return this;
}
@ -97,9 +103,6 @@ public class TestKitNodes {
if (clusterId == null) {
clusterId = Uuid.randomUuid();
}
if (bootstrapMetadataVersion == null) {
bootstrapMetadataVersion = MetadataVersion.latest();
}
TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
for (ControllerNode.Builder builder : controllerNodeBuilders.values()) {
ControllerNode node = builder.
@ -118,7 +121,7 @@ public class TestKitNodes {
}
return new TestKitNodes(baseDirectory,
clusterId,
bootstrapMetadataVersion,
bootstrapMetadata,
controllerNodes,
brokerNodes);
} catch (Exception e) {
@ -145,20 +148,20 @@ public class TestKitNodes {
private final String baseDirectory;
private final Uuid clusterId;
private final MetadataVersion bootstrapMetadataVersion;
private final BootstrapMetadata bootstrapMetadata;
private final NavigableMap<Integer, ControllerNode> controllerNodes;
private final NavigableMap<Integer, BrokerNode> brokerNodes;
private TestKitNodes(
String baseDirectory,
Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
BootstrapMetadata bootstrapMetadata,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
) {
this.baseDirectory = baseDirectory;
this.clusterId = clusterId;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.bootstrapMetadata = bootstrapMetadata;
this.controllerNodes = controllerNodes;
this.brokerNodes = brokerNodes;
}
@ -176,7 +179,7 @@ public class TestKitNodes {
}
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
return bootstrapMetadata.metadataVersion();
}
public Map<Integer, ControllerNode> controllerNodes() {
@ -184,7 +187,7 @@ public class TestKitNodes {
}
public BootstrapMetadata bootstrapMetadata() {
return BootstrapMetadata.fromVersion(bootstrapMetadataVersion(), "testkit");
return bootstrapMetadata;
}
public NavigableMap<Integer, BrokerNode> brokerNodes() {

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
@ -38,8 +39,9 @@ import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartitio
import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils}
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.quota
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
@ -1219,6 +1221,36 @@ class KRaftClusterTest {
cluster.close()
}
}
@Test
def testStartupWithNonDefaultKControllerDynamicConfiguration(): Unit = {
val bootstrapRecords = util.Arrays.asList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel), 0.toShort),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(ConfigResource.Type.BROKER.id).
setResourceName("").
setName("num.io.threads").
setValue("9"), 0.toShort))
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadata(BootstrapMetadata.fromRecords(bootstrapRecords, "testRecords")).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).
build()
try {
cluster.format()
cluster.startup()
val controller = cluster.controllers().values().iterator().next()
TestUtils.retry(60000) {
assertNotNull(controller.controllerApisHandlerPool)
assertEquals(9, controller.controllerApisHandlerPool.threadPoolSize.get())
}
} finally {
cluster.close()
}
}
}
class BadAuthorizer() extends Authorizer {