diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index 5f3fdc81887..51bc16fb09d 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -264,11 +264,11 @@ class BrokerLifecycleManager( new OfflineDirBrokerFailureEvent(directory)) } - def handleKraftJBODMetadataVersionUpdate(): Unit = { - eventQueue.append(new KraftJBODMetadataVersionUpdateEvent()) + def resendBrokerRegistrationUnlessZkMode(): Unit = { + eventQueue.append(new ResendBrokerRegistrationUnlessZkModeEvent()) } - private class KraftJBODMetadataVersionUpdateEvent extends EventQueue.Event { + private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event { override def run(): Unit = { if (!isZkBroker) { registered = false diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 112a03c50a9..64a4fd7474a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics} import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde} -import org.apache.kafka.image.publisher.MetadataPublisher +import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange} import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager} @@ -139,6 +139,8 @@ class BrokerServer( var brokerMetadataPublisher: BrokerMetadataPublisher = _ + var brokerRegistrationTracker: BrokerRegistrationTracker = _ + val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled) def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE @@ -482,6 +484,10 @@ class BrokerServer( lifecycleManager ) metadataPublishers.add(brokerMetadataPublisher) + brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId, + logManager.directoryIdsSet.toList.asJava, + () => lifecycleManager.resendBrokerRegistrationUnlessZkMode()) + metadataPublishers.add(brokerRegistrationTracker) // Register parts of the broker that can be reconfigured via dynamic configs. This needs to // be done before we publish the dynamic configs, so that we don't miss anything. diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 048a665757b..ee7bfa2157e 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -29,7 +29,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} -import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.fault.FaultHandler import java.util.concurrent.CompletableFuture @@ -129,21 +128,6 @@ class BrokerMetadataPublisher( debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.") } - Option(delta.featuresDelta()).foreach { featuresDelta => - featuresDelta.metadataVersionChange().ifPresent{ metadataVersion => - info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.") - val currentMetadataVersion = delta.image().features().metadataVersion() - if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) && metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) { - info( - s"""Resending BrokerRegistration with existing incarnation-id to inform the - |controller about log directories in the broker following metadata update: - |previousMetadataVersion: ${delta.image().features().metadataVersion()} - |newMetadataVersion: $metadataVersion""".stripMargin.linesIterator.mkString(" ").trim) - brokerLifecycleManager.handleKraftJBODMetadataVersionUpdate() - } - } - } - // Apply topic deltas. Option(delta.topicsDelta()).foreach { topicsDelta => try { diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index 34f9d139a03..b0162dc6358 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -285,7 +285,7 @@ class BrokerLifecycleManagerTest { assertEquals(1000L, manager.brokerEpoch) // Trigger JBOD MV update - manager.handleKraftJBODMetadataVersionUpdate() + manager.resendBrokerRegistrationUnlessZkMode() // Accept new registration, response sets epoch to 1200 nextRegistrationRequest(1200L) diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index c2926c3b67d..26f4fb3daee 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.BROKER -import org.apache.kafka.common.metadata.FeatureLevelRecord import org.apache.kafka.common.utils.Exit import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance} @@ -43,7 +42,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito -import org.mockito.Mockito.{clearInvocations, doThrow, mock, times, verify, verifyNoInteractions} +import org.mockito.Mockito.{doThrow, mock, verify} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -221,102 +220,4 @@ class BrokerMetadataPublisherTest { verify(groupCoordinator).onNewMetadataImage(image, delta) } - - @Test - def testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(): Unit = { - val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, "")) - val metadataCache = new KRaftMetadataCache(0) - val logManager = mock(classOf[LogManager]) - val replicaManager = mock(classOf[ReplicaManager]) - val groupCoordinator = mock(classOf[GroupCoordinator]) - val faultHandler = mock(classOf[FaultHandler]) - val brokerLifecycleManager = mock(classOf[BrokerLifecycleManager]) - - val metadataPublisher = new BrokerMetadataPublisher( - config, - metadataCache, - logManager, - replicaManager, - groupCoordinator, - mock(classOf[TransactionCoordinator]), - mock(classOf[DynamicConfigPublisher]), - mock(classOf[DynamicClientQuotaPublisher]), - mock(classOf[ScramPublisher]), - mock(classOf[DelegationTokenPublisher]), - mock(classOf[AclPublisher]), - faultHandler, - faultHandler, - brokerLifecycleManager, - ) - - var image = MetadataImage.EMPTY - var delta = new MetadataDelta.Builder() - .setImage(image) - .build() - - // We first upgrade metadata version to 3_6_IV2 - delta.replay(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel())) - var newImage = delta.apply(new MetadataProvenance(100, 4, 2000)) - - metadataPublisher.onMetadataUpdate(delta, newImage, - LogDeltaManifest.newBuilder() - .provenance(MetadataProvenance.EMPTY) - .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) - .numBatches(1) - .elapsedNs(100) - .numBytes(42) - .build()) - - // This should NOT trigger broker reregistration - verifyNoInteractions(brokerLifecycleManager) - - // We then upgrade to IBP_3_7_IV2 - image = newImage - delta = new MetadataDelta.Builder() - .setImage(image) - .build() - delta.replay(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel())) - newImage = delta.apply(new MetadataProvenance(100, 4, 2000)) - - metadataPublisher.onMetadataUpdate(delta, newImage, - LogDeltaManifest.newBuilder() - .provenance(MetadataProvenance.EMPTY) - .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) - .numBatches(1) - .elapsedNs(100) - .numBytes(42) - .build()) - - // This SHOULD trigger a broker registration - verify(brokerLifecycleManager, times(1)).handleKraftJBODMetadataVersionUpdate() - clearInvocations(brokerLifecycleManager) - - // Finally upgrade to IBP_3_8_IV0 - image = newImage - delta = new MetadataDelta.Builder() - .setImage(image) - .build() - delta.replay(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel())) - newImage = delta.apply(new MetadataProvenance(200, 4, 3000)) - - metadataPublisher.onMetadataUpdate(delta, newImage, - LogDeltaManifest.newBuilder() - .provenance(MetadataProvenance.EMPTY) - .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) - .numBatches(1) - .elapsedNs(100) - .numBytes(42) - .build()) - - // This should NOT trigger broker reregistration - verify(brokerLifecycleManager, times(0)).handleKraftJBODMetadataVersionUpdate() - - metadataPublisher.close() - } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 0974c31d1b2..8b9c5b19eae 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -408,6 +408,13 @@ public class ClusterControlManager { setBrokerEpoch(brokerEpoch). setRack(request.rack()). setEndPoints(listenerInfo.toBrokerRegistrationRecord()); + + if (existing != null && request.incarnationId().equals(existing.incarnationId())) { + log.info("Amending registration of broker {}", request.brokerId()); + record.setFenced(existing.fenced()); + record.setInControlledShutdown(existing.inControlledShutdown()); + } + for (BrokerRegistrationRequestData.Feature feature : request.features()) { record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature)); } diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java new file mode 100644 index 00000000000..51ac2bdfa4b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { + private final Logger log; + private final int id; + private final Runnable refreshRegistrationCallback; + + /** + * Create the tracker. + * + * @param id The ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ + public BrokerRegistrationTracker( + int id, + List targetDirectories, + Runnable refreshRegistrationCallback + ) { + this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). + logger(BrokerRegistrationTracker.class); + this.id = id; + this.refreshRegistrationCallback = refreshRegistrationCallback; + } + + @Override + public String name() { + return "BrokerRegistrationTracker(id=" + id + ")"; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + boolean checkBrokerRegistration = false; + if (delta.featuresDelta() != null) { + if (delta.metadataVersionChanged().isPresent()) { + if (log.isTraceEnabled()) { + log.trace("Metadata version change is present: {}", + delta.metadataVersionChanged()); + } + checkBrokerRegistration = true; + } + } + if (delta.clusterDelta() != null) { + if (delta.clusterDelta().changedBrokers().get(id) != null) { + if (log.isTraceEnabled()) { + log.trace("Broker change is present: {}", + delta.clusterDelta().changedBrokers().get(id)); + } + checkBrokerRegistration = true; + } + } + if (checkBrokerRegistration) { + if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), + delta.clusterDelta().broker(id))) { + refreshRegistrationCallback.run(); + } + } + } + + /** + * Check if the current broker registration needs to be refreshed. + * + * @param metadataVersion The current metadata version. + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ + boolean brokerRegistrationNeedsRefresh( + MetadataVersion metadataVersion, + BrokerRegistration registration + ) { + // If there is no existing registration, the BrokerLifecycleManager must still be sending it. + // So we don't need to do anything yet. + if (registration == null) { + log.debug("No current broker registration to check."); + return false; + } + // Check to see if the directory list has changed. Note that this check could certainly be + // triggered spuriously. For example, if the broker's directory list has been changed in the + // past, and we are in the process of replaying that change log, we will end up here. + // That's fine because resending the broker registration does not cause any problems. And, + // of course, as soon as a snapshot is made, we will no longer need to worry about those + // old metadata log entries being replayed on startup. + if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) && + registration.directories().isEmpty()) { + log.info("Current directory set is empty, but MV supports JBOD. Resending " + + "broker registration."); + return true; + } + log.debug("Broker registration does not need to be resent."); + return false; + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java new file mode 100644 index 00000000000..855a96cd8aa --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.loader.LogDeltaManifest; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.MetadataVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Timeout(value = 40) +public class BrokerRegistrationTrackerTest { + static final Uuid INCARNATION_ID = Uuid.fromString("jyjLbk31Tpa53pFrU9Y-Ng"); + + static final Uuid A = Uuid.fromString("Ahw3vXfnThqeZbb7HD1w6Q"); + + static final Uuid B = Uuid.fromString("BjOacT0OTNqIvUWIlKhahg"); + + static final Uuid C = Uuid.fromString("CVHi_iv2Rvy5_1rtPdasfg"); + + static class BrokerRegistrationTrackerTestContext { + AtomicInteger numCalls = new AtomicInteger(0); + BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1, + Arrays.asList(B, A), () -> numCalls.incrementAndGet()); + + MetadataImage image = MetadataImage.EMPTY; + + void onMetadataUpdate(MetadataDelta delta) { + MetadataProvenance provenance = new MetadataProvenance(0, 0, 0); + image = delta.apply(provenance); + LogDeltaManifest manifest = new LogDeltaManifest.Builder(). + provenance(provenance). + leaderAndEpoch(LeaderAndEpoch.UNKNOWN). + numBatches(1). + elapsedNs(1). + numBytes(1). + build(); + tracker.onMetadataUpdate(delta, image, manifest); + } + + MetadataDelta newDelta() { + return new MetadataDelta.Builder(). + setImage(image). + build(); + } + } + + @Test + public void testTrackerName() { + BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); + assertEquals("BrokerRegistrationTracker(id=1)", ctx.tracker.name()); + } + + @Test + public void testMetadataVersionUpdateWithoutRegistrationDoesNothing() { + BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); + MetadataDelta delta = ctx.newDelta(); + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel())); + ctx.onMetadataUpdate(delta); + assertEquals(0, ctx.numCalls.get()); + } + + @Test + public void testBrokerUpdateWithoutNewMvDoesNothing() { + BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); + MetadataDelta delta = ctx.newDelta(); + delta.replay(new RegisterBrokerRecord(). + setBrokerId(1). + setIncarnationId(INCARNATION_ID). + setLogDirs(Arrays.asList(A, B, C))); + ctx.onMetadataUpdate(delta); + assertEquals(0, ctx.numCalls.get()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBrokerUpdateWithNewMv(boolean jbodMv) { + BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); + MetadataDelta delta = ctx.newDelta(); + delta.replay(new RegisterBrokerRecord(). + setBrokerId(1). + setIncarnationId(INCARNATION_ID). + setLogDirs(Arrays.asList())); + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(jbodMv ? MetadataVersion.IBP_3_7_IV2.featureLevel() : + MetadataVersion.IBP_3_7_IV1.featureLevel())); + ctx.onMetadataUpdate(delta); + if (jbodMv) { + assertEquals(1, ctx.numCalls.get()); + } else { + assertEquals(0, ctx.numCalls.get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBrokerUpdateWithNewMvWithTwoDeltas(boolean jbodMv) { + BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext(); + MetadataDelta delta = ctx.newDelta(); + delta.replay(new RegisterBrokerRecord(). + setBrokerId(1). + setIncarnationId(INCARNATION_ID). + setLogDirs(Arrays.asList())); + ctx.onMetadataUpdate(delta); + // No calls are made because MetadataVersion is 3.0-IV1 initially + assertEquals(0, ctx.numCalls.get()); + + delta = ctx.newDelta(); + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(jbodMv ? MetadataVersion.IBP_3_7_IV2.featureLevel() : + MetadataVersion.IBP_3_7_IV1.featureLevel())); + ctx.onMetadataUpdate(delta); + if (jbodMv) { + assertEquals(1, ctx.numCalls.get()); + } else { + assertEquals(0, ctx.numCalls.get()); + } + } +}