KAFKA-19702 Move MetadataVersionConfigValidator and related test code to metadata module (#20526)

1. Move `MetadataVersionConfigValidator` to metadata module.
2. Move `MetadataVersionConfigValidatorTest` to metadata module.
3. Remove `KafkaConfig#validateWithMetadataVersion`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Lan Ding 2025-09-29 01:32:27 +08:00 committed by GitHub
parent 7d098cfbbd
commit e27ea8d4db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 169 additions and 149 deletions

View File

@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
@ -469,7 +469,10 @@ class BrokerServer(
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent")
metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,
sharedServer.metadataPublishingFaultHandler
))
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,

View File

@ -43,7 +43,6 @@ import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -653,18 +652,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
}
/**
* Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
*/
def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) {
require(metadataVersion.isDirectoryAssignmentSupported,
s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " +
s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
}
}
/**
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...

View File

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.junit.jupiter.api.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class MetadataVersionConfigValidatorTest {
private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(90)
.numBytes(88)
.build();
public static final MetadataProvenance TEST_PROVENANCE =
new MetadataProvenance(50, 3, 8000, true);
void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception {
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) {
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build();
if (metadataVersion != null) {
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()));
}
MetadataImage image = delta.apply(TEST_PROVENANCE);
validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
}
}
@Test
void testValidatesConfigOnMetadataChange() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
KafkaConfig config = mock(KafkaConfig.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(config.brokerId()).thenReturn(8);
testWith(metadataVersion, config, faultHandler);
verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion));
verifyNoMoreInteractions(faultHandler);
}
@SuppressWarnings("ThrowableNotThrown")
@Test
void testInvokesFaultHandlerOnException() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
Exception exception = new Exception();
KafkaConfig config = mock(KafkaConfig.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(faultHandler.handleFault(any(), any())).thenReturn(new RuntimeException("returned exception"));
when(config.brokerId()).thenReturn(8);
willAnswer(invocation -> {
throw exception;
}).given(config).validateWithMetadataVersion(eq(metadataVersion));
testWith(metadataVersion, config, faultHandler);
verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion));
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
eq(exception));
}
}

View File

@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
import org.apache.kafka.common.config.ConfigDef.Type.INT
import org.apache.kafka.common.config.{ConfigException, SslConfigs, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -429,21 +428,4 @@ class LogConfigTest {
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
LogConfig.validate(logProps)
}
@Test
def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
KafkaConfig.fromProps(
TestUtils.createBrokerConfig(nodeId = 0, logDirCount = if (jbodConfig) 2 else 1)
).validateWithMetadataVersion(metadataVersion)
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
assertThrows(classOf[IllegalArgumentException], () =>
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
assertThrows(classOf[IllegalArgumentException], () =>
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.metadata;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -24,18 +24,20 @@ import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import java.util.function.Supplier;
public class MetadataVersionConfigValidator implements MetadataPublisher {
private final String name;
private final KafkaConfig config;
private final Supplier<Boolean> hasMultiLogDirs;
private final FaultHandler faultHandler;
public MetadataVersionConfigValidator(
KafkaConfig config,
int id,
Supplier<Boolean> hasMultiLogDirs,
FaultHandler faultHandler
) {
int id = config.brokerId();
this.name = "MetadataVersionPublisher(id=" + id + ")";
this.config = config;
this.hasMultiLogDirs = hasMultiLogDirs;
this.faultHandler = faultHandler;
}
@ -57,13 +59,22 @@ public class MetadataVersionConfigValidator implements MetadataPublisher {
}
}
/**
* Validate some configurations for the new MetadataVersion. A new MetadataVersion can take place when
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
*/
@SuppressWarnings("ThrowableNotThrown")
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
try {
this.config.validateWithMetadataVersion(metadataVersion);
} catch (Throwable t) {
if (this.hasMultiLogDirs.get() && !metadataVersion.isDirectoryAssignmentSupported()) {
String errorMsg = String.format(
"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion %s. Need %s or higher",
metadataVersion, MetadataVersion.IBP_3_7_IV2
);
this.faultHandler.handleFault(
"Broker configuration does not support the cluster MetadataVersion", t);
"Broker configuration does not support the cluster MetadataVersion",
new IllegalArgumentException(errorMsg)
);
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.junit.jupiter.api.Test;
import java.util.function.Supplier;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
public class MetadataVersionConfigValidatorTest {
private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(90)
.numBytes(88)
.build();
public static final MetadataProvenance TEST_PROVENANCE =
new MetadataProvenance(50, 3, 8000, true);
void executeMetadataUpdate(
MetadataVersion metadataVersion,
Supplier<Boolean> multiLogDirSupplier,
FaultHandler faultHandler
) throws Exception {
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(0, multiLogDirSupplier, faultHandler)) {
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build();
if (metadataVersion != null) {
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()));
}
MetadataImage image = delta.apply(TEST_PROVENANCE);
validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
}
}
@Test
void testValidatesConfigOnMetadataChange() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
FaultHandler faultHandler = mock(FaultHandler.class);
Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
when(multiLogDirSupplier.get()).thenReturn(false);
executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
verify(multiLogDirSupplier, times(1)).get();
verifyNoMoreInteractions(faultHandler);
}
@Test
void testInvokesFaultHandlerOnException() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV1;
Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(multiLogDirSupplier.get()).thenReturn(true);
executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
verify(multiLogDirSupplier, times(1)).get();
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
any(IllegalArgumentException.class));
}
@Test
void testValidateWithMetadataVersionJbodSupport() throws Exception {
FaultHandler faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_6_IV2, false, faultHandler);
verifyNoMoreInteractions(faultHandler);
faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV0, false, faultHandler);
verifyNoMoreInteractions(faultHandler);
faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV2, false, faultHandler);
verifyNoMoreInteractions(faultHandler);
faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_6_IV2, true, faultHandler);
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
any(IllegalArgumentException.class));
faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV0, true, faultHandler);
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
any(IllegalArgumentException.class));
faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV2, true, faultHandler);
verifyNoMoreInteractions(faultHandler);
}
private void validate(MetadataVersion metadataVersion, boolean jbodConfig, FaultHandler faultHandler)
throws Exception {
Supplier<Boolean> multiLogDirSupplier = () -> jbodConfig;
executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
}
}