mirror of https://github.com/apache/kafka.git
KAFKA-18383 Remove reserved.broker.max.id and broker.id.generation.enable (#18478)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
a3b34c1315
commit
4dd0bcbde8
|
@ -213,8 +213,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
def quotaConfig: QuotaConfig = _quotaConfig
|
||||
|
||||
/** ********* General Configuration ***********/
|
||||
val brokerIdGenerationEnable: Boolean = getBoolean(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG)
|
||||
val maxReservedBrokerId: Int = getInt(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG)
|
||||
var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG)
|
||||
val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
|
||||
val initialRegistrationTimeoutMs: Int = getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
|
||||
|
|
|
@ -1165,8 +1165,6 @@ class KafkaConfigTest {
|
|||
defaults.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
|
||||
defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
|
||||
defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
|
||||
defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1")
|
||||
defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
|
||||
defaults.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:1122")
|
||||
defaults.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, "127.0.0.1:2, 127.0.0.2:3")
|
||||
|
@ -1181,8 +1179,6 @@ class KafkaConfigTest {
|
|||
defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString)
|
||||
|
||||
val config = KafkaConfig.fromProps(defaults)
|
||||
assertEquals(false, config.brokerIdGenerationEnable)
|
||||
assertEquals(1, config.maxReservedBrokerId)
|
||||
assertEquals(1, config.brokerId)
|
||||
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString))
|
||||
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
|
||||
|
@ -1409,21 +1405,6 @@ class KafkaConfigTest {
|
|||
assertEquals(expected, addresses)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAcceptsLargeNodeIdForRaftBasedCase(): Unit = {
|
||||
// Generation of Broker IDs is not supported when using Raft-based controller quorums,
|
||||
// so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
|
||||
// and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
|
||||
val largeBrokerId = 2000
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092")
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, largeBrokerId.toString)
|
||||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
|
||||
// -1 is the default for both node.id and broker.id
|
||||
|
@ -1441,11 +1422,10 @@ class KafkaConfigTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
|
||||
def testRejectsNegativeNodeId(): Unit = {
|
||||
// -1 is the default for both node.id and broker.id
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
assertFalse(isValidKafkaConfig(props))
|
||||
}
|
||||
|
|
|
@ -71,7 +71,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
|
|||
super.setUp(testInfo)
|
||||
val props = TestUtils.createBrokerConfig(1)
|
||||
props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
|
||||
props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
|
||||
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
|
||||
config = KafkaConfig.fromProps(props)
|
||||
broker = createBroker(config, threadNamePrefix = Option(this.getClass.getName))
|
||||
|
|
|
@ -81,12 +81,11 @@
|
|||
<li>
|
||||
<p>
|
||||
Remove the broker id generation-related configurations. These configurations were used in ZooKeeper mode
|
||||
to define the broker id, specify the broker id auto generation, and control the broker id generation process.
|
||||
to specify the broker id auto generation and control the broker id generation process.
|
||||
</p>
|
||||
<ul>
|
||||
<li><code>reserved.broker.max.id</code></li>
|
||||
<li><code>broker.id.generation.enable</code></li>
|
||||
<li><code>broker.id</code></li>
|
||||
</ul>
|
||||
<p>
|
||||
Kafka use the node id in Kraft mode to identify servers.
|
||||
|
|
|
@ -35,15 +35,6 @@ import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
|
|||
|
||||
public class ServerConfigs {
|
||||
/** ********* General Configuration ***********/
|
||||
public static final String RESERVED_BROKER_MAX_ID_CONFIG = "reserved.broker.max.id";
|
||||
public static final int RESERVED_BROKER_MAX_ID_DEFAULT = 1000;
|
||||
public static final String RESERVED_BROKER_MAX_ID_DOC = "Max number that can be used for a broker.id";
|
||||
|
||||
public static final String BROKER_ID_GENERATION_ENABLE_CONFIG = "broker.id.generation.enable";
|
||||
public static final boolean BROKER_ID_GENERATION_ENABLE_DEFAULT = true;
|
||||
public static final String BROKER_ID_GENERATION_ENABLE_DOC = "Enable automatic broker id generation on the server. When enabled the value configured for " + RESERVED_BROKER_MAX_ID_CONFIG + " should be reviewed.";
|
||||
|
||||
|
||||
public static final String BROKER_ID_CONFIG = "broker.id";
|
||||
public static final int BROKER_ID_DEFAULT = -1;
|
||||
public static final String BROKER_ID_DOC = "The broker id for this server.";
|
||||
|
@ -129,8 +120,6 @@ public class ServerConfigs {
|
|||
"the StandardAuthorizer (which stores ACLs in the metadata log.) By default, all listeners included in controller.listener.names " +
|
||||
"will also be early start listeners. A listener should not appear in this list if it accepts external traffic.";
|
||||
public static final ConfigDef CONFIG_DEF = new ConfigDef()
|
||||
.define(BROKER_ID_GENERATION_ENABLE_CONFIG, BOOLEAN, BROKER_ID_GENERATION_ENABLE_DEFAULT, MEDIUM, BROKER_ID_GENERATION_ENABLE_DOC)
|
||||
.define(RESERVED_BROKER_MAX_ID_CONFIG, INT, RESERVED_BROKER_MAX_ID_DEFAULT, atLeast(0), MEDIUM, RESERVED_BROKER_MAX_ID_DOC)
|
||||
.define(BROKER_ID_CONFIG, INT, BROKER_ID_DEFAULT, HIGH, BROKER_ID_DOC)
|
||||
.define(MESSAGE_MAX_BYTES_CONFIG, INT, LogConfig.DEFAULT_MAX_MESSAGE_BYTES, atLeast(0), HIGH, MESSAGE_MAX_BYTES_DOC)
|
||||
.define(NUM_IO_THREADS_CONFIG, INT, NUM_IO_THREADS_DEFAULT, atLeast(1), HIGH, NUM_IO_THREADS_DOC)
|
||||
|
|
Loading…
Reference in New Issue