KAFKA-18360 Remove zookeeper configurations (#18566)

Remove broker.id.generation.enable and reserved.broker.max.id, which are not used in KRaft mode.
Remove inter.broker.protocol.version, which is not used in KRaft mode.

Reviewers: PoAn Yang <payang@apache.org>, Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2025-02-06 06:22:11 -08:00 committed by Chia-Ping Tsai
parent cf8d3ac49e
commit b6e6a3c68a
12 changed files with 50 additions and 164 deletions

View File

@ -389,28 +389,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG) val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0` // This will be removed soon. See KAFKA-18366.
// is passed, `0.10.0-IV0` may be picked) val interBrokerProtocolVersion = MetadataVersion.MINIMUM_KRAFT_VERSION
val interBrokerProtocolVersionString = getString(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
val interBrokerProtocolVersion = if (processRoles.isEmpty) {
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
} else {
if (originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)) {
// A user-supplied IBP was given
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
if (!configuredVersion.isKRaftSupported) {
throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}. " +
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
} else {
warn(s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} is deprecated in KRaft mode as of 3.3 and will only " +
s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
s"the metadata.version for a new KRaft cluster.")
}
}
// In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
// the static IBP config in broker components running in KRaft mode
MetadataVersion.MINIMUM_KRAFT_VERSION
}
/** ********* Controlled shutdown configuration ***********/ /** ********* Controlled shutdown configuration ***********/
val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG) val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
@ -713,15 +693,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController() validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
} }
val listenerNames = listeners.map(_.listenerName).toSet val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { if (processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. broker-only and co-located)
validateAdvertisedBrokerListenersNonEmptyForBroker() validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName), require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +

View File

@ -182,8 +182,7 @@ object KafkaRaftServer {
} }
// Load the BootstrapMetadata. // Load the BootstrapMetadata.
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir, val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir, Optional.empty())
Optional.ofNullable(config.interBrokerProtocolVersionString))
val bootstrapMetadata = bootstrapDirectory.read() val bootstrapMetadata = bootstrapDirectory.read()
(metaPropsEnsemble, bootstrapMetadata) (metaPropsEnsemble, bootstrapMetadata)
} }

View File

@ -32,7 +32,6 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.metadata.storage.{Formatter, FormatterException} import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig} import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.ReplicationConfigs
import java.util import java.util
import scala.collection.mutable import scala.collection.mutable
@ -129,11 +128,9 @@ object StorageTool extends Logging {
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")). setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
setControllerListenerName(config.controllerListenerNames.head). setControllerListenerName(config.controllerListenerNames.head).
setMetadataLogDirectory(config.metadataLogDir) setMetadataLogDirectory(config.metadataLogDir)
Option(namespace.getString("release_version")) match { Option(namespace.getString("release_version")).foreach(
case Some(releaseVersion) => formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)) releaseVersion => formatter.
case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)). setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)))
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
}
Option(namespace.getList[String]("feature")).foreach( Option(namespace.getList[String]("feature")).foreach(
featureNamesAndLevels(_).foreachEntry { featureNamesAndLevels(_).foreachEntry {
(k, v) => formatter.setFeatureLevel(k, v) (k, v) => formatter.setFeatureLevel(k, v)

View File

@ -64,7 +64,6 @@ import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.KRaftConfigs; import org.apache.kafka.server.config.KRaftConfigs;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -545,7 +544,6 @@ class DescribeTopicPartitionsRequestHandlerTest {
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093"); properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093");
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL");
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL"); properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL");
TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction());
return new KafkaConfig(properties); return new KafkaConfig(properties);
} }
} }

View File

@ -155,11 +155,12 @@ class KafkaApisTest extends Logging {
metrics.close() metrics.close()
} }
def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting, def createKafkaApis(
authorizer: Option[Authorizer] = None, authorizer: Option[Authorizer] = None,
configRepository: ConfigRepository = new MockConfigRepository(), configRepository: ConfigRepository = new MockConfigRepository(),
overrideProperties: Map[String, String] = Map.empty, overrideProperties: Map[String, String] = Map.empty,
featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = { featureVersions: Seq[FeatureVersion] = Seq.empty
): KafkaApis = {
val properties = TestUtils.createBrokerConfig(brokerId) val properties = TestUtils.createBrokerConfig(brokerId)
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString) properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
@ -168,7 +169,6 @@ class KafkaApisTest extends Logging {
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093") properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")
overrideProperties.foreach( p => properties.put(p._1, p._2)) overrideProperties.foreach( p => properties.put(p._1, p._2))
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
val config = new KafkaConfig(properties) val config = new KafkaConfig(properties)
val listenerType = ListenerType.BROKER val listenerType = ListenerType.BROKER

View File

@ -159,7 +159,6 @@ class KafkaConfigTest {
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props3)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props3))
} }
@Test @Test
@ -187,7 +186,7 @@ class KafkaConfigTest {
val advertisedHostName = "routable-host" val advertisedHostName = "routable-host"
val advertisedPort = 1234 val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(0) val props = createDefaultConfig()
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort") props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
val serverConfig = KafkaConfig.fromProps(props) val serverConfig = KafkaConfig.fromProps(props)
@ -617,29 +616,6 @@ class KafkaConfigTest {
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092")) assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
} }
@Test
def testVersionConfiguration(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
val conf = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, conf.interBrokerProtocolVersion)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.0-IV1")
val conf2 = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.IBP_3_0_IV1, conf2.interBrokerProtocolVersion)
// check that patch version doesn't affect equality
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.1-IV1")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.IBP_3_0_IV1, conf3.interBrokerProtocolVersion)
//check that latest is newer than 3.0.1-IV0
assertTrue(MetadataVersion.latestTesting.isAtLeast(conf3.interBrokerProtocolVersion))
}
private def isValidKafkaConfig(props: Properties): Boolean = { private def isValidKafkaConfig(props: Properties): Boolean = {
try { try {
KafkaConfig.fromProps(props) KafkaConfig.fromProps(props)
@ -1406,27 +1382,30 @@ class KafkaConfigTest {
} }
@Test @Test
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = { def testAcceptsLargeId(): Unit = {
// -1 is the default for both node.id and broker.id val largeBrokerId = 2000
val props = new Properties() val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
assertFalse(isValidKafkaConfig(props)) props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092")
} props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
@Test props.setProperty(KRaftConfigs.NODE_ID_CONFIG, largeBrokerId.toString)
def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = { KafkaConfig.fromProps(props)
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
assertFalse(isValidKafkaConfig(props))
} }
@Test @Test
def testRejectsNegativeNodeId(): Unit = { def testRejectsNegativeNodeId(): Unit = {
// -1 is the default for both node.id and broker.id val props = createDefaultConfig()
val props = new Properties() props.remove(ServerConfigs.BROKER_ID_CONFIG)
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "-1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") assertFalse(isValidKafkaConfig(props))
}
@Test
def testRejectsNegativeBrokerId(): Unit = {
val props = createDefaultConfig()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "-1")
props.remove(KRaftConfigs.NODE_ID_CONFIG)
assertFalse(isValidKafkaConfig(props)) assertFalse(isValidKafkaConfig(props))
} }
@ -1613,17 +1592,6 @@ class KafkaConfigTest {
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage) assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
} }
@Test
def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
for (ibp <- Seq("3.0", "3.1", "3.2")) {
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp)
val config = new KafkaConfig(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}
}
@Test @Test
def testDefaultInterBrokerProtocolVersionKRaft(): Unit = { def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
val props = new Properties() val props = new Properties()

View File

@ -26,7 +26,7 @@ import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.test.TestUtils import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -262,7 +262,7 @@ class KafkaRaftServerTest {
} }
@Test @Test
def testKRaftUpdateWithIBP(): Unit = { def testKRaftUpdateAt3_3_IV1(): Unit = {
val clusterId = clusterIdBase64 val clusterId = clusterIdBase64
val nodeId = 0 val nodeId = 0
val metaProperties = new MetaProperties.Builder(). val metaProperties = new MetaProperties.Builder().
@ -278,10 +278,9 @@ class KafkaRaftServerTest {
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093") configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093") configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL") configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
configProperties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.3-IV1")
val (metaPropertiesEnsemble, bootstrapMetadata) = val (metaPropertiesEnsemble, bootstrapMetadata) =
invokeLoadMetaProperties(metaProperties, configProperties, None) invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV1))
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next()) assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty) assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
@ -290,7 +289,7 @@ class KafkaRaftServerTest {
} }
@Test @Test
def testKRaftUpdateWithoutIBP(): Unit = { def testKRaftUpdate(): Unit = {
val clusterId = clusterIdBase64 val clusterId = clusterIdBase64
val nodeId = 0 val nodeId = 0
val metaProperties = new MetaProperties.Builder(). val metaProperties = new MetaProperties.Builder().

View File

@ -33,7 +33,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.storage.internals.log.LogAppendInfo import org.apache.kafka.storage.internals.log.LogAppendInfo
@ -78,13 +77,16 @@ class ReplicaFetcherThreadTest {
TestUtils.clearYammerMetrics() TestUtils.clearYammerMetrics()
} }
private def createReplicaFetcherThread(name: String, private def createReplicaFetcherThread(
fetcherId: Int, name: String,
brokerConfig: KafkaConfig, fetcherId: Int,
failedPartitions: FailedPartitions, brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager, failedPartitions: FailedPartitions,
quota: ReplicaQuota, replicaMgr: ReplicaManager,
leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = { quota: ReplicaQuota,
leaderEndpointBlockingSend: BlockingSend,
metadataVersion: MetadataVersion = MetadataVersion.latestTesting()
): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ") val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id) val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler, val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
@ -96,7 +98,7 @@ class ReplicaFetcherThreadTest {
replicaMgr, replicaMgr,
quota, quota,
logContext.logPrefix, logContext.logPrefix,
() => brokerConfig.interBrokerProtocolVersion) () => metadataVersion)
} }
@Test @Test
@ -179,9 +181,8 @@ class ReplicaFetcherThreadTest {
verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0) verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0)
} }
private def verifyFetchLeaderEpochOnFirstFetch(ibp: MetadataVersion, epochFetchCount: Int): Unit = { private def verifyFetchLeaderEpochOnFirstFetch(metadataVersion: MetadataVersion, epochFetchCount: Int): Unit = {
val props = TestUtils.createBrokerConfig(1) val props = TestUtils.createBrokerConfig(1)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp.version)
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
//Setup all dependencies //Setup all dependencies
@ -219,7 +220,8 @@ class ReplicaFetcherThreadTest {
failedPartitions, failedPartitions,
replicaManager, replicaManager,
UNBOUNDED_QUOTA, UNBOUNDED_QUOTA,
mockNetwork mockNetwork,
metadataVersion
) )
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L))) thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))

View File

@ -357,26 +357,12 @@ Found problem:
"Failed to find content in output: " + stream.toString()) "Failed to find content in output: " + stream.toString())
} }
@Test
def testFormatWithReleaseVersionDefault(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("inter.broker.protocol.version", "3.7")
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties))
assertTrue(stream.toString().contains("3.7-IV4"),
"Failed to find content in output: " + stream.toString())
}
@Test @Test
def testFormatWithReleaseVersionDefaultAndReleaseVersion(): Unit = { def testFormatWithReleaseVersionDefaultAndReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir()) val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties() val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties) properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(",")) properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("inter.broker.protocol.version", "3.7")
val stream = new ByteArrayOutputStream() val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq( assertEquals(0, runFormatCommand(stream, properties, Seq(
"--release-version", "3.6-IV0", "--release-version", "3.6-IV0",

View File

@ -315,10 +315,6 @@ object TestUtils extends Logging {
props props
} }
def setIbpVersion(config: Properties, version: MetadataVersion): Unit = {
config.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, version.version)
}
def createAdminClient[B <: KafkaBroker]( def createAdminClient[B <: KafkaBroker](
brokers: Seq[B], brokers: Seq[B],
listenerName: ListenerName, listenerName: ListenerName,

View File

@ -21,8 +21,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionValidator;
import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -127,13 +125,6 @@ public class ReplicationConfigs {
"thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " + "thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " +
"to trigger the unclean leader election immediately if needed.</p>"; "to trigger the unclean leader election immediately if needed.</p>";
public static final String INTER_BROKER_PROTOCOL_VERSION_CONFIG = "inter.broker.protocol.version";
public static final String INTER_BROKER_PROTOCOL_VERSION_DEFAULT = MetadataVersion.latestProduction().version();
public static final String INTER_BROKER_PROTOCOL_VERSION_DOC = "Specify which version of the inter-broker protocol will be used.\n" +
"This is typically bumped after all brokers were upgraded to a new version.\n" +
"Check MetadataVersion for the full list.\n" +
"This configuration is only applicable in Zookeeper mode.";
public static final String INTER_BROKER_SECURITY_PROTOCOL_CONFIG = "security.inter.broker.protocol"; public static final String INTER_BROKER_SECURITY_PROTOCOL_CONFIG = "security.inter.broker.protocol";
public static final String INTER_BROKER_SECURITY_PROTOCOL_DEFAULT = SecurityProtocol.PLAINTEXT.toString(); public static final String INTER_BROKER_SECURITY_PROTOCOL_DEFAULT = SecurityProtocol.PLAINTEXT.toString();
public static final String INTER_BROKER_LISTENER_NAME_CONFIG = "inter.broker.listener.name"; public static final String INTER_BROKER_LISTENER_NAME_CONFIG = "inter.broker.listener.name";
@ -171,7 +162,6 @@ public class ReplicationConfigs {
.defineInternal(UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG, LONG, UNCLEAN_LEADER_ELECTION_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, UNCLEAN_LEADER_ELECTION_INTERVAL_MS_DOC) .defineInternal(UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG, LONG, UNCLEAN_LEADER_ELECTION_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, UNCLEAN_LEADER_ELECTION_INTERVAL_MS_DOC)
.define(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, UNCLEAN_LEADER_ELECTION_ENABLE_DOC) .define(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
.define(INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING, INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), MEDIUM, INTER_BROKER_SECURITY_PROTOCOL_DOC) .define(INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING, INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), MEDIUM, INTER_BROKER_SECURITY_PROTOCOL_DOC)
.define(INTER_BROKER_PROTOCOL_VERSION_CONFIG, STRING, INTER_BROKER_PROTOCOL_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, INTER_BROKER_PROTOCOL_VERSION_DOC)
.define(INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM, INTER_BROKER_LISTENER_NAME_DOC) .define(INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM, INTER_BROKER_LISTENER_NAME_DOC)
.define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, REPLICA_SELECTOR_CLASS_DOC); .define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, REPLICA_SELECTOR_CLASS_DOC);

View File

@ -83,7 +83,6 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
import static org.apache.kafka.server.config.QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG; import static org.apache.kafka.server.config.QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
import static org.apache.kafka.server.config.QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG; import static org.apache.kafka.server.config.QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG;
import static org.apache.kafka.tools.ToolsTestUtils.assignThrottledPartitionReplicas; import static org.apache.kafka.tools.ToolsTestUtils.assignThrottledPartitionReplicas;
@ -141,29 +140,6 @@ public class ReassignPartitionsCommandTest {
executeAndVerifyReassignment(); executeAndVerifyReassignment();
} }
@ClusterTests({
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
})
})
public void testReassignmentCompletionDuringPartialUpgrade() throws Exception {
// Test reassignment during a partial upgrade when some brokers are relying on
// `AlterPartition` and some rely on the old notification logic through Zookeeper.
// In this test case, broker 0 starts up first on the latest IBP and is typically
// elected as controller. The three remaining brokers start up on the older IBP.
// We want to ensure that reassignment can still complete through the ISR change
// notification path even though the controller expects `AlterPartition`.
// Override change notification settings so that test is not delayed by ISR
// change notification delay
// ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new IsrChangePropagationConfig(500, 100, 500));
createTopics();
executeAndVerifyReassignment();
}
@ClusterTest @ClusterTest
public void testHighWaterMarkAfterPartitionReassignment() throws Exception { public void testHighWaterMarkAfterPartitionReassignment() throws Exception {
createTopics(); createTopics();