diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index c3da9dcb7e6..bd80364274d 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -244,6 +244,19 @@ class BrokerLifecycleManager( eventQueue.append(new OfflineDirEvent(directory)) } + def handleKraftJBODMetadataVersionUpdate(): Unit = { + eventQueue.append(new KraftJBODMetadataVersionUpdateEvent()) + } + + private class KraftJBODMetadataVersionUpdateEvent extends EventQueue.Event { + override def run(): Unit = { + if (!isZkBroker) { + registered = false + scheduleNextCommunicationImmediately() + } + } + } + def brokerEpoch: Long = _brokerEpoch def state: BrokerState = _state diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 60b6c554f16..6ee44e3070a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -452,7 +452,9 @@ class BrokerServer( authorizer ), sharedServer.initialBrokerMetadataLoadFaultHandler, - sharedServer.metadataPublishingFaultHandler) + sharedServer.metadataPublishingFaultHandler, + lifecycleManager + ) metadataPublishers.add(brokerMetadataPublisher) // Register parts of the broker that can be reconfigured via dynamic configs. This needs to diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 663afd226de..8edb2a1aa1a 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -20,7 +20,7 @@ package kafka.server.metadata import java.util.{OptionalInt, Properties} import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager -import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal} +import kafka.server.{BrokerLifecycleManager, KafkaConfig, ReplicaManager, RequestLocal} import kafka.utils.Logging import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException @@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.fault.FaultHandler import java.util.concurrent.CompletableFuture @@ -72,7 +73,8 @@ class BrokerMetadataPublisher( delegationTokenPublisher: DelegationTokenPublisher, aclPublisher: AclPublisher, fatalFaultHandler: FaultHandler, - metadataPublishingFaultHandler: FaultHandler + metadataPublishingFaultHandler: FaultHandler, + brokerLifecycleManager: BrokerLifecycleManager, ) extends MetadataPublisher with Logging { logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] " @@ -130,6 +132,15 @@ class BrokerMetadataPublisher( Option(delta.featuresDelta()).foreach { featuresDelta => featuresDelta.metadataVersionChange().ifPresent{ metadataVersion => info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.") + val currentMetadataVersion = delta.image().features().metadataVersion() + if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) && metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) { + info( + s"""Resending BrokerRegistration with existing incarnation-id to inform the + |controller about log directories in the broker following metadata update: + |previousMetadataVersion: ${delta.image().features().metadataVersion()} + |newMetadataVersion: $metadataVersion""".stripMargin.linesIterator.mkString(" ").trim) + brokerLifecycleManager.handleKraftJBODMetadataVersionUpdate() + } } } diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index e23b175d8d5..d7925ef8b56 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -251,4 +251,36 @@ class BrokerLifecycleManagerTest { manager.close() } + + @Test + def testKraftJBODMetadataVersionUpdateEvent(): Unit = { + val context = new RegistrationTestContext(configProperties) + val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) + val controllerNode = new Node(3000, "localhost", 8021) + context.controllerNodeProvider.node.set(controllerNode) + manager.start(() => context.highestMetadataOffset.get(), + context.mockChannelManager, context.clusterId, context.advertisedListeners, + Collections.emptyMap(), OptionalLong.of(10L)) + TestUtils.retry(60000) { + assertEquals(1, context.mockChannelManager.unsentQueue.size) + assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) + } + context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( + new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) + TestUtils.retry(10000) { + context.poll() + assertEquals(1000L, manager.brokerEpoch) + } + + manager.handleKraftJBODMetadataVersionUpdate() + context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( + new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) + TestUtils.retry(60000) { + context.time.sleep(100) + context.poll() + manager.eventQueue.wakeup() + assertEquals(1200, manager.brokerEpoch) + } + manager.close() + } } diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 015d363beff..ffe75c8408d 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -23,13 +23,14 @@ import java.util.Collections.{singleton, singletonList, singletonMap} import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import kafka.log.{LogManager, UnifiedLog} -import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager} +import kafka.server.{BrokerLifecycleManager, BrokerServer, KafkaConfig, ReplicaManager} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.BROKER +import org.apache.kafka.common.metadata.FeatureLevelRecord import org.apache.kafka.common.utils.Exit import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.GroupCoordinator @@ -38,12 +39,13 @@ import org.apache.kafka.image.loader.LogDeltaManifest import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.raft.LeaderAndEpoch +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.fault.FaultHandler import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito -import org.mockito.Mockito.{doThrow, mock, verify} +import org.mockito.Mockito.{clearInvocations, doThrow, mock, times, verify, verifyNoInteractions} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -240,7 +242,9 @@ class BrokerMetadataPublisherTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). - setNumControllerNodes(1).build()). + setNumControllerNodes(1). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV1). + build()). build() try { cluster.format() @@ -293,7 +297,8 @@ class BrokerMetadataPublisherTest { mock(classOf[DelegationTokenPublisher]), mock(classOf[AclPublisher]), faultHandler, - faultHandler + faultHandler, + mock(classOf[BrokerLifecycleManager]), ) val image = MetadataImage.EMPTY @@ -312,4 +317,102 @@ class BrokerMetadataPublisherTest { verify(groupCoordinator).onNewMetadataImage(image, delta) } + + @Test + def testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(): Unit = { + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, "")) + val metadataCache = new KRaftMetadataCache(0) + val logManager = mock(classOf[LogManager]) + val replicaManager = mock(classOf[ReplicaManager]) + val groupCoordinator = mock(classOf[GroupCoordinator]) + val faultHandler = mock(classOf[FaultHandler]) + val brokerLifecycleManager = mock(classOf[BrokerLifecycleManager]) + + val metadataPublisher = new BrokerMetadataPublisher( + config, + metadataCache, + logManager, + replicaManager, + groupCoordinator, + mock(classOf[TransactionCoordinator]), + mock(classOf[DynamicConfigPublisher]), + mock(classOf[DynamicClientQuotaPublisher]), + mock(classOf[ScramPublisher]), + mock(classOf[DelegationTokenPublisher]), + mock(classOf[AclPublisher]), + faultHandler, + faultHandler, + brokerLifecycleManager, + ) + + var image = MetadataImage.EMPTY + var delta = new MetadataDelta.Builder() + .setImage(image) + .build() + + // We first upgrade metadata version to 3_6_IV2 + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel())) + var newImage = delta.apply(new MetadataProvenance(100, 4, 2000)) + + metadataPublisher.onMetadataUpdate(delta, newImage, + LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build()); + + // This should NOT trigger broker reregistration + verifyNoInteractions(brokerLifecycleManager) + + // We then upgrade to IBP_3_7_IV2 + image = newImage + delta = new MetadataDelta.Builder() + .setImage(image) + .build() + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel())) + newImage = delta.apply(new MetadataProvenance(100, 4, 2000)) + + metadataPublisher.onMetadataUpdate(delta, newImage, + LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build()); + + // This SHOULD trigger a broker registration + verify(brokerLifecycleManager, times(1)).handleKraftJBODMetadataVersionUpdate() + clearInvocations(brokerLifecycleManager) + + // Finally upgrade to IBP_3_8_IV0 + image = newImage + delta = new MetadataDelta.Builder() + .setImage(image) + .build() + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel())) + newImage = delta.apply(new MetadataProvenance(200, 4, 3000)) + + metadataPublisher.onMetadataUpdate(delta, newImage, + LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build()); + + // This should NOT trigger broker reregistration + verify(brokerLifecycleManager, times(0)).handleKraftJBODMetadataVersionUpdate() + + metadataPublisher.close() + } } diff --git a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java index 4233198be05..defcdec17cf 100644 --- a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java +++ b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java @@ -153,6 +153,16 @@ public class DirectoryId { if (LOST.equals(dir)) { return false; } + + // The only time we should have a size be 0 is if we were at a MV prior to 3.7-IV2 + // and the system was upgraded. In this case the original list of directories was purged + // during broker registration so we don't know if the directory is online. We assume + // that a broker will halt if all its log directories are down. Eventually the broker + // will send another registration request with information about all log directories. + // Refer KAFKA-16162 for more information + if (sortedOnlineDirs.isEmpty()) { + return true; + } return Collections.binarySearch(sortedOnlineDirs, dir) >= 0; } } diff --git a/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java b/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java index 5b4d427f275..7c909fc283a 100644 --- a/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java +++ b/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -67,12 +68,16 @@ public class DirectoryIdTest { @Test void testIsOnline() { + // Given List sortedDirs = Arrays.asList( Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw") ); sortedDirs.sort(Uuid::compareTo); + List emptySortedDirs = Collections.emptyList(); + + // When/Then assertTrue(DirectoryId.isOnline(Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), sortedDirs)); assertTrue(DirectoryId.isOnline(Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), sortedDirs)); assertTrue(DirectoryId.isOnline(Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw"), sortedDirs)); @@ -80,5 +85,6 @@ public class DirectoryIdTest { assertTrue(DirectoryId.isOnline(DirectoryId.UNASSIGNED, sortedDirs)); assertFalse(DirectoryId.isOnline(DirectoryId.LOST, sortedDirs)); assertFalse(DirectoryId.isOnline(Uuid.fromString("AMYchbMtS6yhtsXbca7DQg"), sortedDirs)); + assertTrue(DirectoryId.isOnline(Uuid.randomUuid(), emptySortedDirs)); } }