KAFKA-16606 Gate JBOD configuration on 3.7-IV2 (#15834)

Support for multiple log directories in KRaft exists from
MetataVersion 3.7-IV2.

When migrating a ZK broker to KRaft, we already check that
the IBP is high enough before allowing the broker to startup.

With KIP-584 and KIP-778, Brokers in KRaft mode do not require
the IBP configuration - the configuration is deprecated.
In KRaft mode inter.broker.protocol.version defaults to
MetadataVersion.MINIMUM_KRAFT_VERSION (IBP_3_0_IV1).

Instead KRaft brokers discover the MetadataVersion by reading
the "metadata.version" FeatureLevelRecord from the cluster metadata.

This change adds a new configuration validation step upon discovering
the "metadata.version" from the cluster metadata.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Igor Soarez 2024-06-07 11:11:57 +03:00
parent c979ef2f7d
commit 67ca44fdf3
No known key found for this signature in database
GPG Key ID: E92F58091C1987ED
7 changed files with 330 additions and 57 deletions

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
public class MetadataVersionConfigValidator implements MetadataPublisher {
private final String name;
private final KafkaConfig config;
private final FaultHandler faultHandler;
public MetadataVersionConfigValidator(
KafkaConfig config,
FaultHandler faultHandler
) {
int id = config.brokerId();
this.name = "MetadataVersionPublisher(id=" + id + ")";
this.config = config;
this.faultHandler = faultHandler;
}
@Override
public String name() {
return name;
}
@Override
public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
) {
if (delta.featuresDelta() != null) {
if (delta.metadataVersionChanged().isPresent()) {
onMetadataVersionChanged(newImage.features().metadataVersion());
}
}
}
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
try {
this.config.validateWithMetadataVersion(metadataVersion);
} catch (Throwable t) {
RuntimeException exception = this.faultHandler.handleFault(
"Broker configuration does not support the cluster MetadataVersion", t);
if (exception != null) {
throw exception;
}
}
}
}

View File

@ -439,6 +439,7 @@ class BrokerServer(
rlm.startup()
}
metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,
@ -481,6 +482,7 @@ class BrokerServer(
() => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
metadataPublishers.add(brokerRegistrationTracker)
// Register parts of the broker that can be reconfigured via dynamic configs. This needs to
// be done before we publish the dynamic configs, so that we don't miss anything.
config.dynamicConfig.addReconfigurables(this)

View File

@ -2442,6 +2442,14 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
validateAdvertisedListenersNonEmptyForBroker()
}
if (processRoles.contains(BrokerRole)
&& originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)
&& logDirs.size > 1) {
require(interBrokerProtocolVersion.isDirectoryAssignmentSupported,
s"Multiple log directories (aka JBOD) are not supported with the configured " +
s"${interBrokerProtocolVersion} ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
}
val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
@ -2535,6 +2543,18 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"to ${KafkaConfig.ConsumerGroupMaxSessionTimeoutMsProp}")
}
/**
* Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
*/
def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
if (processRoles.contains(KafkaRaftServer.BrokerRole) && logDirs.size > 1) {
require(metadataVersion.isDirectoryAssignmentSupported,
s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " +
s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
}
}
/**
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...

View File

@ -44,68 +44,97 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
object StorageTool extends Logging {
def main(args: Array[String]): Unit = {
var exitCode: Integer = 0
var message: Option[String] = None
try {
val namespace = parseArguments(args)
val command = namespace.getString("command")
val config = Option(namespace.getString("config")).flatMap(
p => Some(new KafkaConfig(Utils.loadProps(p))))
command match {
case "info" =>
val directories = configToLogDirectories(config.get)
val selfManagedMode = configToSelfManagedMode(config.get)
Exit.exit(infoCommand(System.out, selfManagedMode, directories))
case "format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace,
Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString))
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
}
if (!metadataVersion.isProduction()) {
if (config.get.unstableMetadataVersionsEnabled) {
System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.")
} else {
throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.")
}
}
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId).
setNodeId(config.get.nodeId).
build()
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
if (!metadataVersion.isScramSupported()) {
throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
}
for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
})
val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command")
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
throw new TerseFailure("The kafka configuration file appears to be for " +
"a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
}
Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata,
metadataVersion,ignoreFormatted))
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
Exit.exit(0)
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
exitCode = execute(args)
} catch {
case e: TerseFailure =>
System.err.println(e.getMessage)
Exit.exit(1, Some(e.getMessage))
exitCode = 1
message = Some(e.getMessage)
}
message.foreach(System.err.println)
Exit.exit(exitCode, message)
}
/**
* Executes the command according to the given arguments and returns the appropriate exit code.
* @param args The command line arguments
* @return The exit code
*/
def execute(args: Array[String]): Int = {
val namespace = parseArguments(args)
val command = namespace.getString("command")
val config = Option(namespace.getString("config")).flatMap(
p => Some(new KafkaConfig(Utils.loadProps(p))))
command match {
case "info" =>
val directories = configToLogDirectories(config.get)
val selfManagedMode = configToSelfManagedMode(config.get)
infoCommand(System.out, selfManagedMode, directories)
case "format" =>
runFormatCommand(namespace, config.get)
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
0
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
}
/**
* Validates arguments, configuration, prepares bootstrap metadata and delegates to {{@link formatCommand}}.
* Visible for testing.
* @param namespace Arguments
* @param config The server configuration
* @return The exit code
*/
def runFormatCommand(namespace: Namespace, config: KafkaConfig) = {
val directories = configToLogDirectories(config)
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace,
Option(config.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString))
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
}
if (!metadataVersion.isProduction()) {
if (config.unstableMetadataVersionsEnabled) {
System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.")
} else {
throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.")
}
}
try {
config.validateWithMetadataVersion(metadataVersion)
} catch {
case e: IllegalArgumentException => throw new TerseFailure(s"Invalid configuration for metadata version: ${e.getMessage}")
}
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId).
setNodeId(config.nodeId).
build()
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
if (!metadataVersion.isScramSupported()) {
throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
}
for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
})
val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command")
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config)) {
throw new TerseFailure("The kafka configuration file appears to be for " +
"a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
}
formatCommand(System.out, directories, metaProperties, bootstrapMetadata,
metadataVersion,ignoreFormatted)
}
def parseArguments(args: Array[String]): Namespace = {

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.junit.jupiter.api.Test;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class MetadataVersionConfigValidatorTest {
private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(90)
.numBytes(88)
.build();
public static final MetadataProvenance TEST_PROVENANCE =
new MetadataProvenance(50, 3, 8000);
void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception {
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) {
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build();
if (metadataVersion != null) {
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()));
}
MetadataImage image = delta.apply(TEST_PROVENANCE);
validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
}
}
@Test
void testValidatesConfigOnMetadataChange() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
KafkaConfig config = mock(KafkaConfig.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(config.brokerId()).thenReturn(8);
testWith(metadataVersion, config, faultHandler);
verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion));
verifyNoMoreInteractions(faultHandler);
}
@SuppressWarnings("ThrowableNotThrown")
@Test
void testInvokesFaultHandlerOnException() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
Exception exception = new Exception();
KafkaConfig config = mock(KafkaConfig.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(config.brokerId()).thenReturn(8);
willAnswer(invocation -> {
throw exception;
}).given(config).validateWithMetadataVersion(eq(metadataVersion));
testWith(metadataVersion, config, faultHandler);
verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion));
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
eq(exception));
}
}

View File

@ -22,6 +22,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
import org.apache.kafka.common.config.ConfigDef.Type.INT
import org.apache.kafka.common.config.{ConfigException, TopicConfig}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -415,4 +416,21 @@ class LogConfigTest {
assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG))
assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG))
}
@Test
def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
KafkaConfig.fromProps(
TestUtils.createBrokerConfig(nodeId = 0, zkConnect = null, logDirCount = if (jbodConfig) 2 else 1)
).validateWithMetadataVersion(metadataVersion)
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
assertThrows(classOf[IllegalArgumentException], () =>
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
assertThrows(classOf[IllegalArgumentException], () =>
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
}
}

View File

@ -35,6 +35,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@ -433,4 +434,36 @@ Found problem:
assertEquals(1, exitStatus)
}
}
@Test
def testFormatValidatesConfigForMetadataVersion(): Unit = {
val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null)))
val args = Array("format",
"-c", "dummy.properties",
"-t", "XcZZOzUqS4yHOjhMQB6JLQ",
"--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
assertEquals(0, exitCode)
}
@Test
def testJbodSupportValidation(): Unit = {
def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): Integer = {
val properties = TestUtils.createBrokerConfig(10, null, logDirCount = logDirCount)
properties.remove(KafkaConfig.InterBrokerProtocolVersionProp)
val configFile = TestUtils.tempPropertiesFile(properties).toPath.toString
StorageTool.execute(Array("format",
"-c", configFile,
"-t", "XcZZOzUqS4yHOjhMQB6JLQ",
"--release-version", metadataVersion.toString))
}
assertEquals(0, formatWith(1, MetadataVersion.IBP_3_6_IV2))
assertEquals("Invalid configuration for metadata version: " +
"requirement failed: Multiple log directories (aka JBOD) are not supported in the current MetadataVersion 3.6-IV2. Need 3.7-IV2 or higher",
assertThrows(classOf[TerseFailure], () => formatWith(2, MetadataVersion.IBP_3_6_IV2)).getMessage)
assertEquals(0, formatWith(1, MetadataVersion.IBP_3_7_IV2))
assertEquals(0, formatWith(2, MetadataVersion.IBP_3_7_IV2))
}
}