mirror of https://github.com/apache/kafka.git
KAFKA-18360 Remove zookeeper configurations (#18566)
Remove broker.id.generation.enable and reserved.broker.max.id, which are not used in KRaft mode. Remove inter.broker.protocol.version, which is not used in KRaft mode. Reviewers: PoAn Yang <payang@apache.org>, Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
cf8d3ac49e
commit
b6e6a3c68a
|
@ -389,28 +389,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
|
||||
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)
|
||||
|
||||
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
|
||||
// is passed, `0.10.0-IV0` may be picked)
|
||||
val interBrokerProtocolVersionString = getString(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
|
||||
val interBrokerProtocolVersion = if (processRoles.isEmpty) {
|
||||
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
|
||||
} else {
|
||||
if (originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)) {
|
||||
// A user-supplied IBP was given
|
||||
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
|
||||
if (!configuredVersion.isKRaftSupported) {
|
||||
throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}. " +
|
||||
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
|
||||
} else {
|
||||
warn(s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} is deprecated in KRaft mode as of 3.3 and will only " +
|
||||
s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
|
||||
s"the metadata.version for a new KRaft cluster.")
|
||||
}
|
||||
}
|
||||
// In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
|
||||
// the static IBP config in broker components running in KRaft mode
|
||||
MetadataVersion.MINIMUM_KRAFT_VERSION
|
||||
}
|
||||
// This will be removed soon. See KAFKA-18366.
|
||||
val interBrokerProtocolVersion = MetadataVersion.MINIMUM_KRAFT_VERSION
|
||||
|
||||
/** ********* Controlled shutdown configuration ***********/
|
||||
val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
|
||||
|
@ -713,15 +693,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
|
||||
validateAdvertisedControllerListenersNonEmptyForKRaftController()
|
||||
validateControllerListenerNamesMustAppearInListenersForKRaftController()
|
||||
} else {
|
||||
// controller listener names must be empty when not in KRaft mode
|
||||
require(controllerListenerNames.isEmpty,
|
||||
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
|
||||
}
|
||||
|
||||
val listenerNames = listeners.map(_.listenerName).toSet
|
||||
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
|
||||
// validations for all broker setups (i.e. broker-only and co-located)
|
||||
if (processRoles.contains(ProcessRole.BrokerRole)) {
|
||||
validateAdvertisedBrokerListenersNonEmptyForBroker()
|
||||
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
|
||||
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
|
||||
|
|
|
@ -182,8 +182,7 @@ object KafkaRaftServer {
|
|||
}
|
||||
|
||||
// Load the BootstrapMetadata.
|
||||
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir,
|
||||
Optional.ofNullable(config.interBrokerProtocolVersionString))
|
||||
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir, Optional.empty())
|
||||
val bootstrapMetadata = bootstrapDirectory.read()
|
||||
(metaPropsEnsemble, bootstrapMetadata)
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
|
|||
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
|
||||
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.config.ReplicationConfigs
|
||||
|
||||
import java.util
|
||||
import scala.collection.mutable
|
||||
|
@ -129,11 +128,9 @@ object StorageTool extends Logging {
|
|||
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
|
||||
setControllerListenerName(config.controllerListenerNames.head).
|
||||
setMetadataLogDirectory(config.metadataLogDir)
|
||||
Option(namespace.getString("release_version")) match {
|
||||
case Some(releaseVersion) => formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
|
||||
case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).
|
||||
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
|
||||
}
|
||||
Option(namespace.getString("release_version")).foreach(
|
||||
releaseVersion => formatter.
|
||||
setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)))
|
||||
Option(namespace.getList[String]("feature")).foreach(
|
||||
featureNamesAndLevels(_).foreachEntry {
|
||||
(k, v) => formatter.setFeatureLevel(k, v)
|
||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.kafka.server.authorizer.Action;
|
|||
import org.apache.kafka.server.authorizer.AuthorizationResult;
|
||||
import org.apache.kafka.server.authorizer.Authorizer;
|
||||
import org.apache.kafka.server.common.KRaftVersion;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.config.KRaftConfigs;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -545,7 +544,6 @@ class DescribeTopicPartitionsRequestHandlerTest {
|
|||
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093");
|
||||
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL");
|
||||
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL");
|
||||
TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction());
|
||||
return new KafkaConfig(properties);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,11 +155,12 @@ class KafkaApisTest extends Logging {
|
|||
metrics.close()
|
||||
}
|
||||
|
||||
def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting,
|
||||
authorizer: Option[Authorizer] = None,
|
||||
configRepository: ConfigRepository = new MockConfigRepository(),
|
||||
overrideProperties: Map[String, String] = Map.empty,
|
||||
featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = {
|
||||
def createKafkaApis(
|
||||
authorizer: Option[Authorizer] = None,
|
||||
configRepository: ConfigRepository = new MockConfigRepository(),
|
||||
overrideProperties: Map[String, String] = Map.empty,
|
||||
featureVersions: Seq[FeatureVersion] = Seq.empty
|
||||
): KafkaApis = {
|
||||
|
||||
val properties = TestUtils.createBrokerConfig(brokerId)
|
||||
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
|
||||
|
@ -168,7 +169,6 @@ class KafkaApisTest extends Logging {
|
|||
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")
|
||||
|
||||
overrideProperties.foreach( p => properties.put(p._1, p._2))
|
||||
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
|
||||
val config = new KafkaConfig(properties)
|
||||
|
||||
val listenerType = ListenerType.BROKER
|
||||
|
|
|
@ -159,7 +159,6 @@ class KafkaConfigTest {
|
|||
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
|
||||
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
|
||||
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props3))
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -187,7 +186,7 @@ class KafkaConfigTest {
|
|||
val advertisedHostName = "routable-host"
|
||||
val advertisedPort = 1234
|
||||
|
||||
val props = TestUtils.createBrokerConfig(0)
|
||||
val props = createDefaultConfig()
|
||||
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
|
||||
|
||||
val serverConfig = KafkaConfig.fromProps(props)
|
||||
|
@ -617,29 +616,6 @@ class KafkaConfigTest {
|
|||
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVersionConfiguration(): Unit = {
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
|
||||
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
|
||||
val conf = KafkaConfig.fromProps(props)
|
||||
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, conf.interBrokerProtocolVersion)
|
||||
|
||||
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.0-IV1")
|
||||
val conf2 = KafkaConfig.fromProps(props)
|
||||
assertEquals(MetadataVersion.IBP_3_0_IV1, conf2.interBrokerProtocolVersion)
|
||||
|
||||
// check that patch version doesn't affect equality
|
||||
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.1-IV1")
|
||||
val conf3 = KafkaConfig.fromProps(props)
|
||||
assertEquals(MetadataVersion.IBP_3_0_IV1, conf3.interBrokerProtocolVersion)
|
||||
|
||||
//check that latest is newer than 3.0.1-IV0
|
||||
assertTrue(MetadataVersion.latestTesting.isAtLeast(conf3.interBrokerProtocolVersion))
|
||||
}
|
||||
|
||||
private def isValidKafkaConfig(props: Properties): Boolean = {
|
||||
try {
|
||||
KafkaConfig.fromProps(props)
|
||||
|
@ -1406,27 +1382,30 @@ class KafkaConfigTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
|
||||
// -1 is the default for both node.id and broker.id
|
||||
def testAcceptsLargeId(): Unit = {
|
||||
val largeBrokerId = 2000
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
assertFalse(isValidKafkaConfig(props))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
|
||||
// -1 is the default for both node.id and broker.id
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
|
||||
assertFalse(isValidKafkaConfig(props))
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092")
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, largeBrokerId.toString)
|
||||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRejectsNegativeNodeId(): Unit = {
|
||||
// -1 is the default for both node.id and broker.id
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
val props = createDefaultConfig()
|
||||
props.remove(ServerConfigs.BROKER_ID_CONFIG)
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "-1")
|
||||
assertFalse(isValidKafkaConfig(props))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRejectsNegativeBrokerId(): Unit = {
|
||||
val props = createDefaultConfig()
|
||||
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "-1")
|
||||
props.remove(KRaftConfigs.NODE_ID_CONFIG)
|
||||
assertFalse(isValidKafkaConfig(props))
|
||||
}
|
||||
|
||||
|
@ -1613,17 +1592,6 @@ class KafkaConfigTest {
|
|||
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
|
||||
for (ibp <- Seq("3.0", "3.1", "3.2")) {
|
||||
val props = new Properties()
|
||||
props.putAll(kraftProps())
|
||||
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp)
|
||||
val config = new KafkaConfig(props)
|
||||
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
|
||||
val props = new Properties()
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
|
|||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.apache.kafka.test.TestUtils
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
|
@ -262,7 +262,7 @@ class KafkaRaftServerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testKRaftUpdateWithIBP(): Unit = {
|
||||
def testKRaftUpdateAt3_3_IV1(): Unit = {
|
||||
val clusterId = clusterIdBase64
|
||||
val nodeId = 0
|
||||
val metaProperties = new MetaProperties.Builder().
|
||||
|
@ -278,10 +278,9 @@ class KafkaRaftServerTest {
|
|||
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
|
||||
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
|
||||
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
|
||||
configProperties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.3-IV1")
|
||||
|
||||
val (metaPropertiesEnsemble, bootstrapMetadata) =
|
||||
invokeLoadMetaProperties(metaProperties, configProperties, None)
|
||||
invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV1))
|
||||
|
||||
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
|
||||
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
|
||||
|
@ -290,7 +289,7 @@ class KafkaRaftServerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testKRaftUpdateWithoutIBP(): Unit = {
|
||||
def testKRaftUpdate(): Unit = {
|
||||
val clusterId = clusterIdBase64
|
||||
val nodeId = 0
|
||||
val metaProperties = new MetaProperties.Builder().
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
|
|||
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
|
||||
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
|
||||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
import org.apache.kafka.server.config.ReplicationConfigs
|
||||
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
|
||||
import org.apache.kafka.server.network.BrokerEndPoint
|
||||
import org.apache.kafka.storage.internals.log.LogAppendInfo
|
||||
|
@ -78,13 +77,16 @@ class ReplicaFetcherThreadTest {
|
|||
TestUtils.clearYammerMetrics()
|
||||
}
|
||||
|
||||
private def createReplicaFetcherThread(name: String,
|
||||
fetcherId: Int,
|
||||
brokerConfig: KafkaConfig,
|
||||
failedPartitions: FailedPartitions,
|
||||
replicaMgr: ReplicaManager,
|
||||
quota: ReplicaQuota,
|
||||
leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = {
|
||||
private def createReplicaFetcherThread(
|
||||
name: String,
|
||||
fetcherId: Int,
|
||||
brokerConfig: KafkaConfig,
|
||||
failedPartitions: FailedPartitions,
|
||||
replicaMgr: ReplicaManager,
|
||||
quota: ReplicaQuota,
|
||||
leaderEndpointBlockingSend: BlockingSend,
|
||||
metadataVersion: MetadataVersion = MetadataVersion.latestTesting()
|
||||
): ReplicaFetcherThread = {
|
||||
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
|
||||
val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
|
||||
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
|
||||
|
@ -96,7 +98,7 @@ class ReplicaFetcherThreadTest {
|
|||
replicaMgr,
|
||||
quota,
|
||||
logContext.logPrefix,
|
||||
() => brokerConfig.interBrokerProtocolVersion)
|
||||
() => metadataVersion)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -179,9 +181,8 @@ class ReplicaFetcherThreadTest {
|
|||
verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0)
|
||||
}
|
||||
|
||||
private def verifyFetchLeaderEpochOnFirstFetch(ibp: MetadataVersion, epochFetchCount: Int): Unit = {
|
||||
private def verifyFetchLeaderEpochOnFirstFetch(metadataVersion: MetadataVersion, epochFetchCount: Int): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(1)
|
||||
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp.version)
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
|
||||
//Setup all dependencies
|
||||
|
@ -219,7 +220,8 @@ class ReplicaFetcherThreadTest {
|
|||
failedPartitions,
|
||||
replicaManager,
|
||||
UNBOUNDED_QUOTA,
|
||||
mockNetwork
|
||||
mockNetwork,
|
||||
metadataVersion
|
||||
)
|
||||
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
|
||||
|
||||
|
|
|
@ -357,26 +357,12 @@ Found problem:
|
|||
"Failed to find content in output: " + stream.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFormatWithReleaseVersionDefault(): Unit = {
|
||||
val availableDirs = Seq(TestUtils.tempDir())
|
||||
val properties = new Properties()
|
||||
properties.putAll(defaultStaticQuorumProperties)
|
||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||
properties.setProperty("inter.broker.protocol.version", "3.7")
|
||||
val stream = new ByteArrayOutputStream()
|
||||
assertEquals(0, runFormatCommand(stream, properties))
|
||||
assertTrue(stream.toString().contains("3.7-IV4"),
|
||||
"Failed to find content in output: " + stream.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFormatWithReleaseVersionDefaultAndReleaseVersion(): Unit = {
|
||||
val availableDirs = Seq(TestUtils.tempDir())
|
||||
val properties = new Properties()
|
||||
properties.putAll(defaultStaticQuorumProperties)
|
||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||
properties.setProperty("inter.broker.protocol.version", "3.7")
|
||||
val stream = new ByteArrayOutputStream()
|
||||
assertEquals(0, runFormatCommand(stream, properties, Seq(
|
||||
"--release-version", "3.6-IV0",
|
||||
|
|
|
@ -315,10 +315,6 @@ object TestUtils extends Logging {
|
|||
props
|
||||
}
|
||||
|
||||
def setIbpVersion(config: Properties, version: MetadataVersion): Unit = {
|
||||
config.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, version.version)
|
||||
}
|
||||
|
||||
def createAdminClient[B <: KafkaBroker](
|
||||
brokers: Seq[B],
|
||||
listenerName: ListenerName,
|
||||
|
|
|
@ -21,8 +21,6 @@ import org.apache.kafka.common.config.ConfigDef;
|
|||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.common.MetadataVersionValidator;
|
||||
import org.apache.kafka.storage.internals.log.LogConfig;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -127,13 +125,6 @@ public class ReplicationConfigs {
|
|||
"thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " +
|
||||
"to trigger the unclean leader election immediately if needed.</p>";
|
||||
|
||||
public static final String INTER_BROKER_PROTOCOL_VERSION_CONFIG = "inter.broker.protocol.version";
|
||||
public static final String INTER_BROKER_PROTOCOL_VERSION_DEFAULT = MetadataVersion.latestProduction().version();
|
||||
public static final String INTER_BROKER_PROTOCOL_VERSION_DOC = "Specify which version of the inter-broker protocol will be used.\n" +
|
||||
"This is typically bumped after all brokers were upgraded to a new version.\n" +
|
||||
"Check MetadataVersion for the full list.\n" +
|
||||
"This configuration is only applicable in Zookeeper mode.";
|
||||
|
||||
public static final String INTER_BROKER_SECURITY_PROTOCOL_CONFIG = "security.inter.broker.protocol";
|
||||
public static final String INTER_BROKER_SECURITY_PROTOCOL_DEFAULT = SecurityProtocol.PLAINTEXT.toString();
|
||||
public static final String INTER_BROKER_LISTENER_NAME_CONFIG = "inter.broker.listener.name";
|
||||
|
@ -171,7 +162,6 @@ public class ReplicationConfigs {
|
|||
.defineInternal(UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG, LONG, UNCLEAN_LEADER_ELECTION_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, UNCLEAN_LEADER_ELECTION_INTERVAL_MS_DOC)
|
||||
.define(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
|
||||
.define(INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING, INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), MEDIUM, INTER_BROKER_SECURITY_PROTOCOL_DOC)
|
||||
.define(INTER_BROKER_PROTOCOL_VERSION_CONFIG, STRING, INTER_BROKER_PROTOCOL_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, INTER_BROKER_PROTOCOL_VERSION_DOC)
|
||||
.define(INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM, INTER_BROKER_LISTENER_NAME_DOC)
|
||||
.define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, REPLICA_SELECTOR_CLASS_DOC);
|
||||
|
||||
|
|
|
@ -83,7 +83,6 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
|
|||
import static org.apache.kafka.server.config.QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
|
||||
import static org.apache.kafka.server.config.QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
|
||||
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
|
||||
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
|
||||
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG;
|
||||
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG;
|
||||
import static org.apache.kafka.tools.ToolsTestUtils.assignThrottledPartitionReplicas;
|
||||
|
@ -141,29 +140,6 @@ public class ReassignPartitionsCommandTest {
|
|||
executeAndVerifyReassignment();
|
||||
}
|
||||
|
||||
@ClusterTests({
|
||||
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
|
||||
@ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
@ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
@ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
})
|
||||
})
|
||||
public void testReassignmentCompletionDuringPartialUpgrade() throws Exception {
|
||||
// Test reassignment during a partial upgrade when some brokers are relying on
|
||||
// `AlterPartition` and some rely on the old notification logic through Zookeeper.
|
||||
// In this test case, broker 0 starts up first on the latest IBP and is typically
|
||||
// elected as controller. The three remaining brokers start up on the older IBP.
|
||||
// We want to ensure that reassignment can still complete through the ISR change
|
||||
// notification path even though the controller expects `AlterPartition`.
|
||||
|
||||
// Override change notification settings so that test is not delayed by ISR
|
||||
// change notification delay
|
||||
// ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new IsrChangePropagationConfig(500, 100, 500));
|
||||
|
||||
createTopics();
|
||||
executeAndVerifyReassignment();
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testHighWaterMarkAfterPartitionReassignment() throws Exception {
|
||||
createTopics();
|
||||
|
|
Loading…
Reference in New Issue