mirror of https://github.com/apache/kafka.git
KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (#13103)
With the new broker epoch validation logic introduced in #12998, we no longer need the ZK broker epoch to be sent to the KRaft controller. This patch removes that epoch and replaces it with a boolean. Another small fix is included in this patch for controlled shutdown in migration mode. Previously, if a ZK broker was in migration mode, it would always try to do controlled shutdown via BrokerLifecycleManager. Since there is no ordering dependency between bringing up ZK brokers and the KRaft quorum during migration, a ZK broker could be running in migration mode, but talking to a ZK controller. A small check was added to see if the current controller is ZK or KRaft before decided which controlled shutdown to attempt. Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
78d4458b94
commit
0bb05d8679
|
@ -36,7 +36,7 @@ public class BrokerRegistrationRequest extends AbstractRequest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public short oldestAllowedVersion() {
|
public short oldestAllowedVersion() {
|
||||||
if (data.migratingZkBrokerEpoch() != -1) {
|
if (data.isMigratingZkBroker()) {
|
||||||
return (short) 1;
|
return (short) 1;
|
||||||
} else {
|
} else {
|
||||||
return (short) 0;
|
return (short) 0;
|
||||||
|
|
|
@ -52,7 +52,7 @@
|
||||||
},
|
},
|
||||||
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
|
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
|
||||||
"about": "The rack which this broker is in." },
|
"about": "The rack which this broker is in." },
|
||||||
{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1",
|
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
|
||||||
"about": "If the required configurations for ZK migration are present, this value is set to the ZK broker epoch" }
|
"about": "If the required configurations for ZK migration are present, this value is set to true" }
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,7 @@ class BrokerLifecycleManager(
|
||||||
val config: KafkaConfig,
|
val config: KafkaConfig,
|
||||||
val time: Time,
|
val time: Time,
|
||||||
val threadNamePrefix: Option[String],
|
val threadNamePrefix: Option[String],
|
||||||
val isZkBroker: Boolean,
|
val isZkBroker: Boolean
|
||||||
val zkBrokerEpochSupplier: () => Long
|
|
||||||
) extends Logging {
|
) extends Logging {
|
||||||
|
|
||||||
val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
|
val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
|
||||||
|
@ -291,20 +290,9 @@ class BrokerLifecycleManager(
|
||||||
setMinSupportedVersion(range.min()).
|
setMinSupportedVersion(range.min()).
|
||||||
setMaxSupportedVersion(range.max()))
|
setMaxSupportedVersion(range.max()))
|
||||||
}
|
}
|
||||||
val migrationZkBrokerEpoch: Long = {
|
|
||||||
if (isZkBroker) {
|
|
||||||
val zkBrokerEpoch: Long = Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1)
|
|
||||||
if (zkBrokerEpoch < 0) {
|
|
||||||
throw new IllegalStateException("Trying to sending BrokerRegistration in migration Zk " +
|
|
||||||
"broker without valid zk broker epoch")
|
|
||||||
}
|
|
||||||
zkBrokerEpoch
|
|
||||||
} else
|
|
||||||
-1
|
|
||||||
}
|
|
||||||
val data = new BrokerRegistrationRequestData().
|
val data = new BrokerRegistrationRequestData().
|
||||||
setBrokerId(nodeId).
|
setBrokerId(nodeId).
|
||||||
setMigratingZkBrokerEpoch(migrationZkBrokerEpoch).
|
setIsMigratingZkBroker(isZkBroker).
|
||||||
setClusterId(_clusterId).
|
setClusterId(_clusterId).
|
||||||
setFeatures(features).
|
setFeatures(features).
|
||||||
setIncarnationId(incarnationId).
|
setIncarnationId(incarnationId).
|
||||||
|
|
|
@ -188,8 +188,7 @@ class BrokerServer(
|
||||||
lifecycleManager = new BrokerLifecycleManager(config,
|
lifecycleManager = new BrokerLifecycleManager(config,
|
||||||
time,
|
time,
|
||||||
threadNamePrefix,
|
threadNamePrefix,
|
||||||
isZkBroker = false,
|
isZkBroker = false)
|
||||||
() => -1)
|
|
||||||
|
|
||||||
/* start scheduler */
|
/* start scheduler */
|
||||||
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
|
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
|
||||||
|
|
|
@ -2090,8 +2090,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
|
||||||
}
|
}
|
||||||
if (migrationEnabled) {
|
if (migrationEnabled) {
|
||||||
if (zkConnect == null) {
|
if (zkConnect == null) {
|
||||||
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value. " +
|
throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${KafkaConfig.ZkConnectProp}` must also be set.")
|
||||||
s"`${KafkaConfig.ZkConnectProp}` is required because `${KafkaConfig.MigrationEnabledProp} is set to true.")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2115,6 +2114,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
|
||||||
throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
|
throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
def validateNonEmptyQuorumVotersForMigration(): Unit = {
|
||||||
|
if (voterAddressSpecsByNodeId.isEmpty) {
|
||||||
|
throw new ConfigException(s"If using ${KafkaConfig.MigrationEnabledProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
|
||||||
|
}
|
||||||
|
}
|
||||||
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
|
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
|
||||||
require(controlPlaneListenerName.isEmpty,
|
require(controlPlaneListenerName.isEmpty,
|
||||||
s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.")
|
s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.")
|
||||||
|
@ -2197,7 +2201,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
|
||||||
} else {
|
} else {
|
||||||
// ZK-based
|
// ZK-based
|
||||||
if (migrationEnabled) {
|
if (migrationEnabled) {
|
||||||
validateNonEmptyQuorumVotersForKRaft()
|
validateNonEmptyQuorumVotersForMigration()
|
||||||
require(controllerListenerNames.nonEmpty,
|
require(controllerListenerNames.nonEmpty,
|
||||||
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
|
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
|
||||||
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
|
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
|
||||||
|
|
|
@ -378,8 +378,7 @@ class KafkaServer(
|
||||||
lifecycleManager = new BrokerLifecycleManager(config,
|
lifecycleManager = new BrokerLifecycleManager(config,
|
||||||
time,
|
time,
|
||||||
threadNamePrefix,
|
threadNamePrefix,
|
||||||
isZkBroker = true,
|
isZkBroker = true)
|
||||||
() => kafkaController.brokerEpoch)
|
|
||||||
|
|
||||||
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
|
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
|
||||||
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
|
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
|
||||||
|
@ -812,7 +811,7 @@ class KafkaServer(
|
||||||
|
|
||||||
_brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
|
_brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
|
||||||
|
|
||||||
if (config.migrationEnabled && lifecycleManager != null) {
|
if (config.migrationEnabled && lifecycleManager != null && metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
|
||||||
// TODO KAFKA-14447 Only use KRaft controlled shutdown (when in migration mode)
|
// TODO KAFKA-14447 Only use KRaft controlled shutdown (when in migration mode)
|
||||||
// For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker
|
// For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker
|
||||||
// shutting down without waiting for the heartbeat to time out.
|
// shutting down without waiting for the heartbeat to time out.
|
||||||
|
@ -826,7 +825,6 @@ class KafkaServer(
|
||||||
case e: Throwable =>
|
case e: Throwable =>
|
||||||
error("Got unexpected exception waiting for controlled shutdown future", e)
|
error("Got unexpected exception waiting for controlled shutdown future", e)
|
||||||
}
|
}
|
||||||
// TODO fix this ^
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
|
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
|
||||||
|
|
|
@ -25,7 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
|
||||||
import org.apache.kafka.common.Uuid
|
import org.apache.kafka.common.Uuid
|
||||||
import org.apache.kafka.raft.RaftConfig
|
import org.apache.kafka.raft.RaftConfig
|
||||||
import org.apache.kafka.server.common.MetadataVersion
|
import org.apache.kafka.server.common.MetadataVersion
|
||||||
import org.junit.jupiter.api.Assertions.fail
|
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
|
||||||
import org.junit.jupiter.api.extension.ExtendWith
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
import org.junit.jupiter.api.{Tag, Timeout}
|
import org.junit.jupiter.api.{Tag, Timeout}
|
||||||
|
|
||||||
|
@ -85,4 +85,35 @@ class KafkaServerKRaftRegistrationTest {
|
||||||
kraftCluster.close()
|
kraftCluster.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
|
||||||
|
def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = {
|
||||||
|
// Bootstrap the ZK cluster ID into KRaft
|
||||||
|
val clusterId = zkCluster.clusterId()
|
||||||
|
val kraftCluster = new KafkaClusterTestKit.Builder(
|
||||||
|
new TestKitNodes.Builder().
|
||||||
|
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
|
||||||
|
setClusterId(Uuid.fromString(clusterId)).
|
||||||
|
setNumBrokerNodes(0).
|
||||||
|
setNumControllerNodes(1).build())
|
||||||
|
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
|
||||||
|
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
|
||||||
|
.build()
|
||||||
|
try {
|
||||||
|
kraftCluster.format()
|
||||||
|
kraftCluster.startup()
|
||||||
|
|
||||||
|
// Enable migration configs and restart brokers
|
||||||
|
val props = kraftCluster.controllerClientProperties()
|
||||||
|
val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
|
||||||
|
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||||
|
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
|
||||||
|
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||||
|
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||||
|
assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())
|
||||||
|
} finally {
|
||||||
|
zkCluster.stop()
|
||||||
|
kraftCluster.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest {
|
||||||
@Test
|
@Test
|
||||||
def testCreateAndClose(): Unit = {
|
def testCreateAndClose(): Unit = {
|
||||||
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
||||||
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
|
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
|
||||||
manager.close()
|
manager.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testCreateStartAndClose(): Unit = {
|
def testCreateStartAndClose(): Unit = {
|
||||||
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
||||||
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
|
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
|
||||||
assertEquals(BrokerState.NOT_RUNNING, manager.state)
|
assertEquals(BrokerState.NOT_RUNNING, manager.state)
|
||||||
manager.start(() => context.highestMetadataOffset.get(),
|
manager.start(() => context.highestMetadataOffset.get(),
|
||||||
context.mockChannelManager, context.clusterId, context.advertisedListeners,
|
context.mockChannelManager, context.clusterId, context.advertisedListeners,
|
||||||
|
@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest {
|
||||||
@Test
|
@Test
|
||||||
def testSuccessfulRegistration(): Unit = {
|
def testSuccessfulRegistration(): Unit = {
|
||||||
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
||||||
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
|
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
|
||||||
val controllerNode = new Node(3000, "localhost", 8021)
|
val controllerNode = new Node(3000, "localhost", 8021)
|
||||||
context.controllerNodeProvider.node.set(controllerNode)
|
context.controllerNodeProvider.node.set(controllerNode)
|
||||||
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
|
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
|
||||||
|
@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest {
|
||||||
def testRegistrationTimeout(): Unit = {
|
def testRegistrationTimeout(): Unit = {
|
||||||
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
||||||
val controllerNode = new Node(3000, "localhost", 8021)
|
val controllerNode = new Node(3000, "localhost", 8021)
|
||||||
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
|
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
|
||||||
context.controllerNodeProvider.node.set(controllerNode)
|
context.controllerNodeProvider.node.set(controllerNode)
|
||||||
def newDuplicateRegistrationResponse(): Unit = {
|
def newDuplicateRegistrationResponse(): Unit = {
|
||||||
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
|
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
|
||||||
|
@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest {
|
||||||
@Test
|
@Test
|
||||||
def testControlledShutdown(): Unit = {
|
def testControlledShutdown(): Unit = {
|
||||||
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
val context = new BrokerLifecycleManagerTestContext(configProperties)
|
||||||
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
|
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
|
||||||
val controllerNode = new Node(3000, "localhost", 8021)
|
val controllerNode = new Node(3000, "localhost", 8021)
|
||||||
context.controllerNodeProvider.node.set(controllerNode)
|
context.controllerNodeProvider.node.set(controllerNode)
|
||||||
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
|
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
|
||||||
|
|
|
@ -110,7 +110,7 @@ class BrokerRegistrationRequestTest {
|
||||||
.setBrokerId(brokerId)
|
.setBrokerId(brokerId)
|
||||||
.setClusterId(clusterId)
|
.setClusterId(clusterId)
|
||||||
.setIncarnationId(Uuid.randomUuid())
|
.setIncarnationId(Uuid.randomUuid())
|
||||||
.setMigratingZkBrokerEpoch(zkEpoch.getOrElse(-1L))
|
.setIsMigratingZkBroker(zkEpoch.isDefined)
|
||||||
.setFeatures(features)
|
.setFeatures(features)
|
||||||
|
|
||||||
Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
|
Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
|
||||||
|
|
|
@ -1635,4 +1635,48 @@ class KafkaConfigTest {
|
||||||
errorMessage
|
errorMessage
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testMigrationEnabledZkMode(): Unit = {
|
||||||
|
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
|
||||||
|
props.setProperty(KafkaConfig.MigrationEnabledProp, "true")
|
||||||
|
assertEquals(
|
||||||
|
"If using zookeeper.metadata.migration.enable, controller.quorum.voters must contain a parseable set of voters.",
|
||||||
|
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
|
||||||
|
|
||||||
|
props.setProperty(KafkaConfig.QuorumVotersProp, "3000@localhost:9093")
|
||||||
|
assertEquals(
|
||||||
|
"requirement failed: controller.listener.names must not be empty when running in ZooKeeper migration mode: []",
|
||||||
|
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
|
||||||
|
|
||||||
|
props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||||
|
KafkaConfig.fromProps(props)
|
||||||
|
|
||||||
|
props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.IBP_3_3_IV0.version())
|
||||||
|
assertEquals(
|
||||||
|
"requirement failed: Cannot enable ZooKeeper migration without setting 'inter.broker.protocol.version' to 3.4 or higher",
|
||||||
|
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
|
||||||
|
|
||||||
|
props.remove(KafkaConfig.MigrationEnabledProp)
|
||||||
|
assertEquals(
|
||||||
|
"requirement failed: controller.listener.names must be empty when not running in KRaft mode: [CONTROLLER]",
|
||||||
|
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
|
||||||
|
|
||||||
|
props.remove(KafkaConfig.ControllerListenerNamesProp)
|
||||||
|
KafkaConfig.fromProps(props)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testMigrationEnabledKRaftMode(): Unit = {
|
||||||
|
val props = new Properties()
|
||||||
|
props.putAll(kraftProps())
|
||||||
|
props.setProperty(KafkaConfig.MigrationEnabledProp, "true")
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
"If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.",
|
||||||
|
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
|
||||||
|
|
||||||
|
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
|
||||||
|
KafkaConfig.fromProps(props)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,13 +336,13 @@ public class ClusterControlManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request.migratingZkBrokerEpoch() != -1 && !zkRegistrationAllowed()) {
|
if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
|
||||||
throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
|
throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
|
||||||
}
|
}
|
||||||
|
|
||||||
RegisterBrokerRecord record = new RegisterBrokerRecord().
|
RegisterBrokerRecord record = new RegisterBrokerRecord().
|
||||||
setBrokerId(brokerId).
|
setBrokerId(brokerId).
|
||||||
setMigratingZkBrokerEpoch(request.migratingZkBrokerEpoch()).
|
setIsMigratingZkBroker(request.isMigratingZkBroker()).
|
||||||
setIncarnationId(request.incarnationId()).
|
setIncarnationId(request.incarnationId()).
|
||||||
setBrokerEpoch(brokerEpoch).
|
setBrokerEpoch(brokerEpoch).
|
||||||
setRack(request.rack());
|
setRack(request.rack());
|
||||||
|
@ -426,7 +426,7 @@ public class ClusterControlManager {
|
||||||
new BrokerRegistration(brokerId, record.brokerEpoch(),
|
new BrokerRegistration(brokerId, record.brokerEpoch(),
|
||||||
record.incarnationId(), listeners, features,
|
record.incarnationId(), listeners, features,
|
||||||
Optional.ofNullable(record.rack()), record.fenced(),
|
Optional.ofNullable(record.rack()), record.fenced(),
|
||||||
record.inControlledShutdown(), BrokerRegistration.zkBrokerEpoch(record.migratingZkBrokerEpoch())));
|
record.inControlledShutdown(), record.isMigratingZkBroker()));
|
||||||
if (heartbeatManager != null) {
|
if (heartbeatManager != null) {
|
||||||
if (prevRegistration != null) heartbeatManager.remove(brokerId);
|
if (prevRegistration != null) heartbeatManager.remove(brokerId);
|
||||||
heartbeatManager.register(brokerId, record.fenced());
|
heartbeatManager.register(brokerId, record.fenced());
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class BrokerRegistration {
|
||||||
private final Optional<String> rack;
|
private final Optional<String> rack;
|
||||||
private final boolean fenced;
|
private final boolean fenced;
|
||||||
private final boolean inControlledShutdown;
|
private final boolean inControlledShutdown;
|
||||||
private final Optional<Long> migratingZkBrokerEpoch;
|
private final boolean isMigratingZkBroker;
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
public BrokerRegistration(int id,
|
public BrokerRegistration(int id,
|
||||||
|
@ -77,7 +77,7 @@ public class BrokerRegistration {
|
||||||
boolean fenced,
|
boolean fenced,
|
||||||
boolean inControlledShutdown) {
|
boolean inControlledShutdown) {
|
||||||
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
|
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
|
||||||
fenced, inControlledShutdown, Optional.empty());
|
fenced, inControlledShutdown, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public BrokerRegistration(int id,
|
public BrokerRegistration(int id,
|
||||||
|
@ -88,9 +88,9 @@ public class BrokerRegistration {
|
||||||
Optional<String> rack,
|
Optional<String> rack,
|
||||||
boolean fenced,
|
boolean fenced,
|
||||||
boolean inControlledShutdown,
|
boolean inControlledShutdown,
|
||||||
Optional<Long> migratingZkBrokerEpoch) {
|
boolean isMigratingZkBroker) {
|
||||||
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
|
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
|
||||||
fenced, inControlledShutdown, migratingZkBrokerEpoch);
|
fenced, inControlledShutdown, isMigratingZkBroker);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
|
@ -102,7 +102,7 @@ public class BrokerRegistration {
|
||||||
Optional<String> rack,
|
Optional<String> rack,
|
||||||
boolean fenced,
|
boolean fenced,
|
||||||
boolean inControlledShutdown) {
|
boolean inControlledShutdown) {
|
||||||
this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, Optional.empty());
|
this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public BrokerRegistration(int id,
|
public BrokerRegistration(int id,
|
||||||
|
@ -113,7 +113,7 @@ public class BrokerRegistration {
|
||||||
Optional<String> rack,
|
Optional<String> rack,
|
||||||
boolean fenced,
|
boolean fenced,
|
||||||
boolean inControlledShutdown,
|
boolean inControlledShutdown,
|
||||||
Optional<Long> migratingZkBrokerEpoch) {
|
boolean isMigratingZkBroker) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.epoch = epoch;
|
this.epoch = epoch;
|
||||||
this.incarnationId = incarnationId;
|
this.incarnationId = incarnationId;
|
||||||
|
@ -131,7 +131,7 @@ public class BrokerRegistration {
|
||||||
this.rack = rack;
|
this.rack = rack;
|
||||||
this.fenced = fenced;
|
this.fenced = fenced;
|
||||||
this.inControlledShutdown = inControlledShutdown;
|
this.inControlledShutdown = inControlledShutdown;
|
||||||
this.migratingZkBrokerEpoch = migratingZkBrokerEpoch;
|
this.isMigratingZkBroker = isMigratingZkBroker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
|
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
|
||||||
|
@ -155,7 +155,7 @@ public class BrokerRegistration {
|
||||||
Optional.ofNullable(record.rack()),
|
Optional.ofNullable(record.rack()),
|
||||||
record.fenced(),
|
record.fenced(),
|
||||||
record.inControlledShutdown(),
|
record.inControlledShutdown(),
|
||||||
zkBrokerEpoch(record.migratingZkBrokerEpoch()));
|
record.isMigratingZkBroker());
|
||||||
}
|
}
|
||||||
|
|
||||||
public int id() {
|
public int id() {
|
||||||
|
@ -199,11 +199,7 @@ public class BrokerRegistration {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isMigratingZkBroker() {
|
public boolean isMigratingZkBroker() {
|
||||||
return migratingZkBrokerEpoch.isPresent();
|
return isMigratingZkBroker;
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<Long> migratingZkBrokerEpoch() {
|
|
||||||
return migratingZkBrokerEpoch;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
|
public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
|
||||||
|
@ -222,9 +218,9 @@ public class BrokerRegistration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (migratingZkBrokerEpoch.isPresent()) {
|
if (isMigratingZkBroker) {
|
||||||
if (options.metadataVersion().isMigrationSupported()) {
|
if (options.metadataVersion().isMigrationSupported()) {
|
||||||
registrationRecord.setMigratingZkBrokerEpoch(migratingZkBrokerEpoch.get());
|
registrationRecord.setIsMigratingZkBroker(isMigratingZkBroker);
|
||||||
} else {
|
} else {
|
||||||
options.handleLoss("the isMigratingZkBroker state of one or more brokers");
|
options.handleLoss("the isMigratingZkBroker state of one or more brokers");
|
||||||
}
|
}
|
||||||
|
@ -253,7 +249,7 @@ public class BrokerRegistration {
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
|
return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
|
||||||
rack, fenced, inControlledShutdown, migratingZkBrokerEpoch);
|
rack, fenced, inControlledShutdown, isMigratingZkBroker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -268,7 +264,7 @@ public class BrokerRegistration {
|
||||||
other.rack.equals(rack) &&
|
other.rack.equals(rack) &&
|
||||||
other.fenced == fenced &&
|
other.fenced == fenced &&
|
||||||
other.inControlledShutdown == inControlledShutdown &&
|
other.inControlledShutdown == inControlledShutdown &&
|
||||||
other.migratingZkBrokerEpoch.equals(migratingZkBrokerEpoch);
|
other.isMigratingZkBroker == isMigratingZkBroker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -289,7 +285,7 @@ public class BrokerRegistration {
|
||||||
bld.append(", rack=").append(rack);
|
bld.append(", rack=").append(rack);
|
||||||
bld.append(", fenced=").append(fenced);
|
bld.append(", fenced=").append(fenced);
|
||||||
bld.append(", inControlledShutdown=").append(inControlledShutdown);
|
bld.append(", inControlledShutdown=").append(inControlledShutdown);
|
||||||
bld.append(", migratingZkBrokerEpoch=").append(migratingZkBrokerEpoch.orElse(-1L));
|
bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
|
||||||
bld.append(")");
|
bld.append(")");
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
}
|
}
|
||||||
|
@ -313,7 +309,7 @@ public class BrokerRegistration {
|
||||||
rack,
|
rack,
|
||||||
newFenced,
|
newFenced,
|
||||||
newInControlledShutdownChange,
|
newInControlledShutdownChange,
|
||||||
migratingZkBrokerEpoch
|
isMigratingZkBroker
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,8 +22,8 @@
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
|
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
|
||||||
"about": "The broker id." },
|
"about": "The broker id." },
|
||||||
{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
|
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "2+", "default": "false",
|
||||||
"about": "The ZK broker epoch if this record is for a ZK broker. Otherwise, -1" },
|
"about": "True if the broker is a ZK broker in migration mode. Otherwise, false" },
|
||||||
{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
|
{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
|
||||||
"about": "The incarnation ID of the broker process" },
|
"about": "The incarnation ID of the broker process" },
|
||||||
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
|
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class BrokerRegistrationTest {
|
||||||
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
|
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
|
||||||
Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7)))
|
Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7)))
|
||||||
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
|
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
|
||||||
Optional.empty(), false, true, Optional.of(10L)));
|
Optional.empty(), false, true, true));
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testValues() {
|
public void testValues() {
|
||||||
|
@ -90,19 +90,19 @@ public class BrokerRegistrationTest {
|
||||||
"incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
|
"incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
|
||||||
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
|
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
|
||||||
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
|
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
|
||||||
"rack=Optional.empty, fenced=true, inControlledShutdown=false, migratingZkBrokerEpoch=-1)",
|
"rack=Optional.empty, fenced=true, inControlledShutdown=false, isMigratingZkBroker=false)",
|
||||||
REGISTRATIONS.get(1).toString());
|
REGISTRATIONS.get(1).toString());
|
||||||
assertEquals("BrokerRegistration(id=2, epoch=0, " +
|
assertEquals("BrokerRegistration(id=2, epoch=0, " +
|
||||||
"incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
|
"incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
|
||||||
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
|
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
|
||||||
"host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " +
|
"host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " +
|
||||||
"rack=Optional[myrack], fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=-1)",
|
"rack=Optional[myrack], fenced=false, inControlledShutdown=true, isMigratingZkBroker=false)",
|
||||||
REGISTRATIONS.get(2).toString());
|
REGISTRATIONS.get(2).toString());
|
||||||
assertEquals("BrokerRegistration(id=3, epoch=0, " +
|
assertEquals("BrokerRegistration(id=3, epoch=0, " +
|
||||||
"incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
|
"incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
|
||||||
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
|
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
|
||||||
"host='localhost', port=9093)], supportedFeatures={metadata.version: 7}, " +
|
"host='localhost', port=9093)], supportedFeatures={metadata.version: 7}, " +
|
||||||
"rack=Optional.empty, fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=10)",
|
"rack=Optional.empty, fenced=false, inControlledShutdown=true, isMigratingZkBroker=true)",
|
||||||
REGISTRATIONS.get(3).toString());
|
REGISTRATIONS.get(3).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue