From e27ea8d4dbb17681a05672dd6fedb8a85c37a28f Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Mon, 29 Sep 2025 01:32:27 +0800 Subject: [PATCH] KAFKA-19702 Move MetadataVersionConfigValidator and related test code to metadata module (#20526) 1. Move `MetadataVersionConfigValidator` to metadata module. 2. Move `MetadataVersionConfigValidatorTest` to metadata module. 3. Remove `KafkaConfig#validateWithMetadataVersion`. Reviewers: Chia-Ping Tsai --- .../scala/kafka/server/BrokerServer.scala | 7 +- .../main/scala/kafka/server/KafkaConfig.scala | 13 -- .../MetadataVersionConfigValidatorTest.java | 103 ------------- .../scala/unit/kafka/log/LogConfigTest.scala | 18 --- .../MetadataVersionConfigValidator.java | 37 +++-- .../MetadataVersionConfigValidatorTest.java | 140 ++++++++++++++++++ 6 files changed, 169 insertions(+), 149 deletions(-) delete mode 100644 core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java rename {core/src/main/java/kafka/server => metadata/src/main/java/org/apache/kafka/metadata}/MetadataVersionConfigValidator.java (64%) create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 689c62b8687..a9217c4d023 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService} import org.apache.kafka.coordinator.transaction.ProducerIdManager import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} -import org.apache.kafka.metadata.{BrokerState, ListenerInfo} +import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator} import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.authorizer.Authorizer @@ -469,7 +469,10 @@ class BrokerServer( socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, "RequestHandlerAvgIdlePercent") - metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler)) + metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId, + () => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1, + sharedServer.metadataPublishingFaultHandler + )) brokerMetadataPublisher = new BrokerMetadataPublisher(config, metadataCache, logManager, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 671bee32d28..d9b8c5cd91f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -43,7 +43,6 @@ import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig} import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.AbstractKafkaConfig.getMap import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig @@ -653,18 +652,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde") } - /** - * Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when - * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. - */ - def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = { - if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) { - require(metadataVersion.isDirectoryAssignmentSupported, - s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " + - s"Need ${MetadataVersion.IBP_3_7_IV2} or higher") - } - } - /** * Copy the subset of properties that are relevant to Logs. The individual properties * are listed here since the names are slightly different in each Config class... diff --git a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java deleted file mode 100644 index daa0aacca7d..00000000000 --- a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server; - -import org.apache.kafka.common.metadata.FeatureLevelRecord; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.image.MetadataProvenance; -import org.apache.kafka.image.loader.LogDeltaManifest; -import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.server.common.MetadataVersion; -import org.apache.kafka.server.fault.FaultHandler; - -import org.junit.jupiter.api.Test; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.BDDMockito.willAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -public class MetadataVersionConfigValidatorTest { - - private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder() - .provenance(MetadataProvenance.EMPTY) - .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) - .numBatches(1) - .elapsedNs(90) - .numBytes(88) - .build(); - public static final MetadataProvenance TEST_PROVENANCE = - new MetadataProvenance(50, 3, 8000, true); - - void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception { - try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) { - MetadataDelta delta = new MetadataDelta.Builder() - .setImage(MetadataImage.EMPTY) - .build(); - if (metadataVersion != null) { - delta.replay(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(metadataVersion.featureLevel())); - } - MetadataImage image = delta.apply(TEST_PROVENANCE); - - validator.onMetadataUpdate(delta, image, TEST_MANIFEST); - } - } - - @Test - void testValidatesConfigOnMetadataChange() throws Exception { - MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; - KafkaConfig config = mock(KafkaConfig.class); - FaultHandler faultHandler = mock(FaultHandler.class); - - when(config.brokerId()).thenReturn(8); - - testWith(metadataVersion, config, faultHandler); - - verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion)); - verifyNoMoreInteractions(faultHandler); - } - - @SuppressWarnings("ThrowableNotThrown") - @Test - void testInvokesFaultHandlerOnException() throws Exception { - MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; - Exception exception = new Exception(); - KafkaConfig config = mock(KafkaConfig.class); - FaultHandler faultHandler = mock(FaultHandler.class); - - when(faultHandler.handleFault(any(), any())).thenReturn(new RuntimeException("returned exception")); - when(config.brokerId()).thenReturn(8); - willAnswer(invocation -> { - throw exception; - }).given(config).validateWithMetadataVersion(eq(metadataVersion)); - - testWith(metadataVersion, config, faultHandler); - - verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion)); - verify(faultHandler, times(1)).handleFault( - eq("Broker configuration does not support the cluster MetadataVersion"), - eq(exception)); - } -} diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index e23e16fa40a..e942e7e3380 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM import org.apache.kafka.common.config.ConfigDef.Type.INT import org.apache.kafka.common.config.{ConfigException, SslConfigs, TopicConfig} import org.apache.kafka.common.errors.InvalidConfigurationException -import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -429,21 +428,4 @@ class LogConfigTest { logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString) LogConfig.validate(logProps) } - - @Test - def testValidateWithMetadataVersionJbodSupport(): Unit = { - def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit = - KafkaConfig.fromProps( - TestUtils.createBrokerConfig(nodeId = 0, logDirCount = if (jbodConfig) 2 else 1) - ).validateWithMetadataVersion(metadataVersion) - - validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false) - validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false) - validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false) - assertThrows(classOf[IllegalArgumentException], () => - validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true)) - assertThrows(classOf[IllegalArgumentException], () => - validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true)) - validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true) - } } diff --git a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java similarity index 64% rename from core/src/main/java/kafka/server/MetadataVersionConfigValidator.java rename to metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java index 6f53ec092f6..2df73463c25 100644 --- a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server; +package org.apache.kafka.metadata; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -24,18 +24,20 @@ import org.apache.kafka.image.publisher.MetadataPublisher; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.fault.FaultHandler; +import java.util.function.Supplier; + public class MetadataVersionConfigValidator implements MetadataPublisher { private final String name; - private final KafkaConfig config; + private final Supplier hasMultiLogDirs; private final FaultHandler faultHandler; public MetadataVersionConfigValidator( - KafkaConfig config, - FaultHandler faultHandler + int id, + Supplier hasMultiLogDirs, + FaultHandler faultHandler ) { - int id = config.brokerId(); this.name = "MetadataVersionPublisher(id=" + id + ")"; - this.config = config; + this.hasMultiLogDirs = hasMultiLogDirs; this.faultHandler = faultHandler; } @@ -46,9 +48,9 @@ public class MetadataVersionConfigValidator implements MetadataPublisher { @Override public void onMetadataUpdate( - MetadataDelta delta, - MetadataImage newImage, - LoaderManifest manifest + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest ) { if (delta.featuresDelta() != null) { if (delta.metadataVersionChanged().isPresent()) { @@ -57,13 +59,22 @@ public class MetadataVersionConfigValidator implements MetadataPublisher { } } + /** + * Validate some configurations for the new MetadataVersion. A new MetadataVersion can take place when + * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. + */ @SuppressWarnings("ThrowableNotThrown") private void onMetadataVersionChanged(MetadataVersion metadataVersion) { - try { - this.config.validateWithMetadataVersion(metadataVersion); - } catch (Throwable t) { + if (this.hasMultiLogDirs.get() && !metadataVersion.isDirectoryAssignmentSupported()) { + String errorMsg = String.format( + "Multiple log directories (aka JBOD) are not supported in the current MetadataVersion %s. Need %s or higher", + metadataVersion, MetadataVersion.IBP_3_7_IV2 + ); + this.faultHandler.handleFault( - "Broker configuration does not support the cluster MetadataVersion", t); + "Broker configuration does not support the cluster MetadataVersion", + new IllegalArgumentException(errorMsg) + ); } } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java new file mode 100644 index 00000000000..50ad1b07ccd --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.loader.LogDeltaManifest; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; + +import org.junit.jupiter.api.Test; + +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@SuppressWarnings({"unchecked", "ThrowableNotThrown"}) +public class MetadataVersionConfigValidatorTest { + + private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(90) + .numBytes(88) + .build(); + public static final MetadataProvenance TEST_PROVENANCE = + new MetadataProvenance(50, 3, 8000, true); + + void executeMetadataUpdate( + MetadataVersion metadataVersion, + Supplier multiLogDirSupplier, + FaultHandler faultHandler + ) throws Exception { + try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(0, multiLogDirSupplier, faultHandler)) { + MetadataDelta delta = new MetadataDelta.Builder() + .setImage(MetadataImage.EMPTY) + .build(); + if (metadataVersion != null) { + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel())); + } + MetadataImage image = delta.apply(TEST_PROVENANCE); + + validator.onMetadataUpdate(delta, image, TEST_MANIFEST); + } + } + + @Test + void testValidatesConfigOnMetadataChange() throws Exception { + MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; + FaultHandler faultHandler = mock(FaultHandler.class); + Supplier multiLogDirSupplier = mock(Supplier.class); + when(multiLogDirSupplier.get()).thenReturn(false); + + executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler); + + verify(multiLogDirSupplier, times(1)).get(); + verifyNoMoreInteractions(faultHandler); + } + + @Test + void testInvokesFaultHandlerOnException() throws Exception { + MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV1; + Supplier multiLogDirSupplier = mock(Supplier.class); + FaultHandler faultHandler = mock(FaultHandler.class); + + when(multiLogDirSupplier.get()).thenReturn(true); + + executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler); + + verify(multiLogDirSupplier, times(1)).get(); + verify(faultHandler, times(1)).handleFault( + eq("Broker configuration does not support the cluster MetadataVersion"), + any(IllegalArgumentException.class)); + } + + @Test + void testValidateWithMetadataVersionJbodSupport() throws Exception { + FaultHandler faultHandler = mock(FaultHandler.class); + validate(MetadataVersion.IBP_3_6_IV2, false, faultHandler); + verifyNoMoreInteractions(faultHandler); + + faultHandler = mock(FaultHandler.class); + validate(MetadataVersion.IBP_3_7_IV0, false, faultHandler); + verifyNoMoreInteractions(faultHandler); + + faultHandler = mock(FaultHandler.class); + validate(MetadataVersion.IBP_3_7_IV2, false, faultHandler); + verifyNoMoreInteractions(faultHandler); + + faultHandler = mock(FaultHandler.class); + validate(MetadataVersion.IBP_3_6_IV2, true, faultHandler); + verify(faultHandler, times(1)).handleFault( + eq("Broker configuration does not support the cluster MetadataVersion"), + any(IllegalArgumentException.class)); + + faultHandler = mock(FaultHandler.class); + validate(MetadataVersion.IBP_3_7_IV0, true, faultHandler); + verify(faultHandler, times(1)).handleFault( + eq("Broker configuration does not support the cluster MetadataVersion"), + any(IllegalArgumentException.class)); + + faultHandler = mock(FaultHandler.class); + validate(MetadataVersion.IBP_3_7_IV2, true, faultHandler); + verifyNoMoreInteractions(faultHandler); + } + + private void validate(MetadataVersion metadataVersion, boolean jbodConfig, FaultHandler faultHandler) + throws Exception { + Supplier multiLogDirSupplier = () -> jbodConfig; + + executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler); + } +}