diff --git a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java b/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java new file mode 100644 index 00000000000..042ac09452f --- /dev/null +++ b/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; + +public class MetadataVersionConfigValidator implements MetadataPublisher { + private final String name; + private final KafkaConfig config; + private final FaultHandler faultHandler; + + public MetadataVersionConfigValidator( + KafkaConfig config, + FaultHandler faultHandler + ) { + int id = config.brokerId(); + this.name = "MetadataVersionPublisher(id=" + id + ")"; + this.config = config; + this.faultHandler = faultHandler; + } + + @Override + public String name() { + return name; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + if (delta.featuresDelta() != null) { + if (delta.metadataVersionChanged().isPresent()) { + onMetadataVersionChanged(newImage.features().metadataVersion()); + } + } + } + + private void onMetadataVersionChanged(MetadataVersion metadataVersion) { + try { + this.config.validateWithMetadataVersion(metadataVersion); + } catch (Throwable t) { + RuntimeException exception = this.faultHandler.handleFault( + "Broker configuration does not support the cluster MetadataVersion", t); + if (exception != null) { + throw exception; + } + } + } +} diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 63599520e95..9987d42b040 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -439,6 +439,7 @@ class BrokerServer( rlm.startup() } + metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler)) brokerMetadataPublisher = new BrokerMetadataPublisher(config, metadataCache, logManager, @@ -481,6 +482,7 @@ class BrokerServer( () => lifecycleManager.resendBrokerRegistrationUnlessZkMode()) metadataPublishers.add(brokerRegistrationTracker) + // Register parts of the broker that can be reconfigured via dynamic configs. This needs to // be done before we publish the dynamic configs, so that we don't miss anything. config.dynamicConfig.addReconfigurables(this) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e4a72ed8af2..782febc3ed1 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -2442,6 +2442,14 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } validateAdvertisedListenersNonEmptyForBroker() } + if (processRoles.contains(BrokerRole) + && originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp) + && logDirs.size > 1) { + require(interBrokerProtocolVersion.isDirectoryAssignmentSupported, + s"Multiple log directories (aka JBOD) are not supported with the configured " + + s"${interBrokerProtocolVersion} ${KafkaConfig.InterBrokerProtocolVersionProp}. " + + s"Need ${MetadataVersion.IBP_3_7_IV2} or higher") + } val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(BrokerRole)) { @@ -2535,6 +2543,18 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"to ${KafkaConfig.ConsumerGroupMaxSessionTimeoutMsProp}") } + /** + * Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when + * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. + */ + def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = { + if (processRoles.contains(KafkaRaftServer.BrokerRole) && logDirs.size > 1) { + require(metadataVersion.isDirectoryAssignmentSupported, + s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " + + s"Need ${MetadataVersion.IBP_3_7_IV2} or higher") + } + } + /** * Copy the subset of properties that are relevant to Logs. The individual properties * are listed here since the names are slightly different in each Config class... diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 30d836370a9..ba6994569b4 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -44,68 +44,97 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + def main(args: Array[String]): Unit = { + var exitCode: Integer = 0 + var message: Option[String] = None try { - val namespace = parseArguments(args) - val command = namespace.getString("command") - val config = Option(namespace.getString("config")).flatMap( - p => Some(new KafkaConfig(Utils.loadProps(p)))) - command match { - case "info" => - val directories = configToLogDirectories(config.get) - val selfManagedMode = configToSelfManagedMode(config.get) - Exit.exit(infoCommand(System.out, selfManagedMode, directories)) - - case "format" => - val directories = configToLogDirectories(config.get) - val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { - throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.") - } - if (!metadataVersion.isProduction()) { - if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.") - } else { - throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.") - } - } - val metaProperties = new MetaProperties.Builder(). - setVersion(MetaPropertiesVersion.V1). - setClusterId(clusterId). - setNodeId(config.get.nodeId). - build() - val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() - getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { - if (!metadataVersion.isScramSupported()) { - throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); - } - for (record <- userScramCredentialRecords) { - metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) - } - }) - val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") - val ignoreFormatted = namespace.getBoolean("ignore_formatted") - if (!configToSelfManagedMode(config.get)) { - throw new TerseFailure("The kafka configuration file appears to be for " + - "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") - } - Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, - metadataVersion,ignoreFormatted)) - - case "random-uuid" => - System.out.println(Uuid.randomUuid) - Exit.exit(0) - - case _ => - throw new RuntimeException(s"Unknown command $command") - } + exitCode = execute(args) } catch { case e: TerseFailure => - System.err.println(e.getMessage) - Exit.exit(1, Some(e.getMessage)) + exitCode = 1 + message = Some(e.getMessage) } + message.foreach(System.err.println) + Exit.exit(exitCode, message) + } + + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def execute(args: Array[String]): Int = { + val namespace = parseArguments(args) + val command = namespace.getString("command") + val config = Option(namespace.getString("config")).flatMap( + p => Some(new KafkaConfig(Utils.loadProps(p)))) + command match { + case "info" => + val directories = configToLogDirectories(config.get) + val selfManagedMode = configToSelfManagedMode(config.get) + infoCommand(System.out, selfManagedMode, directories) + + case "format" => + runFormatCommand(namespace, config.get) + + case "random-uuid" => + System.out.println(Uuid.randomUuid) + 0 + case _ => + throw new RuntimeException(s"Unknown command $command") + } + } + + /** + * Validates arguments, configuration, prepares bootstrap metadata and delegates to {{@link formatCommand}}. + * Visible for testing. + * @param namespace Arguments + * @param config The server configuration + * @return The exit code + */ + def runFormatCommand(namespace: Namespace, config: KafkaConfig) = { + val directories = configToLogDirectories(config) + val clusterId = namespace.getString("cluster_id") + val metadataVersion = getMetadataVersion(namespace, + Option(config.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString)) + if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.") + } + if (!metadataVersion.isProduction()) { + if (config.unstableMetadataVersionsEnabled) { + System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.") + } else { + throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.") + } + } + try { + config.validateWithMetadataVersion(metadataVersion) + } catch { + case e: IllegalArgumentException => throw new TerseFailure(s"Invalid configuration for metadata version: ${e.getMessage}") + } + val metaProperties = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V1). + setClusterId(clusterId). + setNodeId(config.nodeId). + build() + val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { + if (!metadataVersion.isScramSupported()) { + throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (record <- userScramCredentialRecords) { + metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) + } + }) + val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") + val ignoreFormatted = namespace.getBoolean("ignore_formatted") + if (!configToSelfManagedMode(config)) { + throw new TerseFailure("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") + } + formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion,ignoreFormatted) } def parseArguments(args: Array[String]): Namespace = { diff --git a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java new file mode 100644 index 00000000000..a484d592e23 --- /dev/null +++ b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.loader.LogDeltaManifest; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.junit.jupiter.api.Test; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class MetadataVersionConfigValidatorTest { + + private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(90) + .numBytes(88) + .build(); + public static final MetadataProvenance TEST_PROVENANCE = + new MetadataProvenance(50, 3, 8000); + + void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception { + try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) { + MetadataDelta delta = new MetadataDelta.Builder() + .setImage(MetadataImage.EMPTY) + .build(); + if (metadataVersion != null) { + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel())); + } + MetadataImage image = delta.apply(TEST_PROVENANCE); + + validator.onMetadataUpdate(delta, image, TEST_MANIFEST); + } + } + + @Test + void testValidatesConfigOnMetadataChange() throws Exception { + MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; + KafkaConfig config = mock(KafkaConfig.class); + FaultHandler faultHandler = mock(FaultHandler.class); + + when(config.brokerId()).thenReturn(8); + + testWith(metadataVersion, config, faultHandler); + + verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion)); + verifyNoMoreInteractions(faultHandler); + } + + @SuppressWarnings("ThrowableNotThrown") + @Test + void testInvokesFaultHandlerOnException() throws Exception { + MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; + Exception exception = new Exception(); + KafkaConfig config = mock(KafkaConfig.class); + FaultHandler faultHandler = mock(FaultHandler.class); + + when(config.brokerId()).thenReturn(8); + willAnswer(invocation -> { + throw exception; + }).given(config).validateWithMetadataVersion(eq(metadataVersion)); + + testWith(metadataVersion, config, faultHandler); + + verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion)); + verify(faultHandler, times(1)).handleFault( + eq("Broker configuration does not support the cluster MetadataVersion"), + eq(exception)); + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index f60635a3522..465f2b4d7fd 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -22,6 +22,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM import org.apache.kafka.common.config.ConfigDef.Type.INT import org.apache.kafka.common.config.{ConfigException, TopicConfig} +import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -415,4 +416,21 @@ class LogConfigTest { assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)) assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)) } + + @Test + def testValidateWithMetadataVersionJbodSupport(): Unit = { + def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit = + KafkaConfig.fromProps( + TestUtils.createBrokerConfig(nodeId = 0, zkConnect = null, logDirCount = if (jbodConfig) 2 else 1) + ).validateWithMetadataVersion(metadataVersion) + + validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false) + validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false) + validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false) + assertThrows(classOf[IllegalArgumentException], () => + validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true)) + assertThrows(classOf[IllegalArgumentException], () => + validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true)) + validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true) + } } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index c3cdce65cbe..0ab3ab1696a 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import org.mockito.Mockito import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -433,4 +434,36 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatValidatesConfigForMetadataVersion(): Unit = { + val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null))) + val args = Array("format", + "-c", "dummy.properties", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.LATEST_PRODUCTION.toString) + val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config) + Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION) + assertEquals(0, exitCode) + } + + @Test + def testJbodSupportValidation(): Unit = { + def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): Integer = { + val properties = TestUtils.createBrokerConfig(10, null, logDirCount = logDirCount) + properties.remove(KafkaConfig.InterBrokerProtocolVersionProp) + val configFile = TestUtils.tempPropertiesFile(properties).toPath.toString + StorageTool.execute(Array("format", + "-c", configFile, + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", metadataVersion.toString)) + } + + assertEquals(0, formatWith(1, MetadataVersion.IBP_3_6_IV2)) + assertEquals("Invalid configuration for metadata version: " + + "requirement failed: Multiple log directories (aka JBOD) are not supported in the current MetadataVersion 3.6-IV2. Need 3.7-IV2 or higher", + assertThrows(classOf[TerseFailure], () => formatWith(2, MetadataVersion.IBP_3_6_IV2)).getMessage) + assertEquals(0, formatWith(1, MetadataVersion.IBP_3_7_IV2)) + assertEquals(0, formatWith(2, MetadataVersion.IBP_3_7_IV2)) + } }