KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2

We update metadata update handler to resend broker registration when
metadata has been updated to >= 3.7IV2 so that the controller becomes
aware of the log directories in the broker.

We also update DirectoryId::isOnline to return true on an empty list of
log directories while the controller awaits broker registration.

Co-authored-by: Proven Provenzano <pprovenzano@confluent.io>

Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Gaurav Narula 2024-01-30 09:59:11 -08:00 committed by Colin P. McCabe
parent 9e4a4a2821
commit 4c6f975ab3
7 changed files with 184 additions and 7 deletions

View File

@ -244,6 +244,19 @@ class BrokerLifecycleManager(
eventQueue.append(new OfflineDirEvent(directory)) eventQueue.append(new OfflineDirEvent(directory))
} }
def handleKraftJBODMetadataVersionUpdate(): Unit = {
eventQueue.append(new KraftJBODMetadataVersionUpdateEvent())
}
private class KraftJBODMetadataVersionUpdateEvent extends EventQueue.Event {
override def run(): Unit = {
if (!isZkBroker) {
registered = false
scheduleNextCommunicationImmediately()
}
}
}
def brokerEpoch: Long = _brokerEpoch def brokerEpoch: Long = _brokerEpoch
def state: BrokerState = _state def state: BrokerState = _state

View File

@ -452,7 +452,9 @@ class BrokerServer(
authorizer authorizer
), ),
sharedServer.initialBrokerMetadataLoadFaultHandler, sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler) sharedServer.metadataPublishingFaultHandler,
lifecycleManager
)
metadataPublishers.add(brokerMetadataPublisher) metadataPublishers.add(brokerMetadataPublisher)
// Register parts of the broker that can be reconfigured via dynamic configs. This needs to // Register parts of the broker that can be reconfigured via dynamic configs. This needs to

View File

@ -20,7 +20,7 @@ package kafka.server.metadata
import java.util.{OptionalInt, Properties} import java.util.{OptionalInt, Properties}
import kafka.coordinator.transaction.TransactionCoordinator import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager import kafka.log.LogManager
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal} import kafka.server.{BrokerLifecycleManager, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.errors.TimeoutException
@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.server.fault.FaultHandler
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
@ -72,7 +73,8 @@ class BrokerMetadataPublisher(
delegationTokenPublisher: DelegationTokenPublisher, delegationTokenPublisher: DelegationTokenPublisher,
aclPublisher: AclPublisher, aclPublisher: AclPublisher,
fatalFaultHandler: FaultHandler, fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler metadataPublishingFaultHandler: FaultHandler,
brokerLifecycleManager: BrokerLifecycleManager,
) extends MetadataPublisher with Logging { ) extends MetadataPublisher with Logging {
logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] " logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
@ -130,6 +132,15 @@ class BrokerMetadataPublisher(
Option(delta.featuresDelta()).foreach { featuresDelta => Option(delta.featuresDelta()).foreach { featuresDelta =>
featuresDelta.metadataVersionChange().ifPresent{ metadataVersion => featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.") info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
val currentMetadataVersion = delta.image().features().metadataVersion()
if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) && metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
info(
s"""Resending BrokerRegistration with existing incarnation-id to inform the
|controller about log directories in the broker following metadata update:
|previousMetadataVersion: ${delta.image().features().metadataVersion()}
|newMetadataVersion: $metadataVersion""".stripMargin.linesIterator.mkString(" ").trim)
brokerLifecycleManager.handleKraftJBODMetadataVersionUpdate()
}
} }
} }

View File

@ -251,4 +251,36 @@ class BrokerLifecycleManagerTest {
manager.close() manager.close()
} }
@Test
def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
val context = new RegistrationTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
Collections.emptyMap(), OptionalLong.of(10L))
TestUtils.retry(60000) {
assertEquals(1, context.mockChannelManager.unsentQueue.size)
assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
}
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
TestUtils.retry(10000) {
context.poll()
assertEquals(1000L, manager.brokerEpoch)
}
manager.handleKraftJBODMetadataVersionUpdate()
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode)
TestUtils.retry(60000) {
context.time.sleep(100)
context.poll()
manager.eventQueue.wakeup()
assertEquals(1200, manager.brokerEpoch)
}
manager.close()
}
} }

View File

@ -23,13 +23,14 @@ import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.{LogManager, UnifiedLog} import kafka.log.{LogManager, UnifiedLog}
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager} import kafka.server.{BrokerLifecycleManager, BrokerServer, KafkaConfig, ReplicaManager}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic} import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic}
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER import org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.utils.Exit import org.apache.kafka.common.utils.Exit
import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid} import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.group.GroupCoordinator
@ -38,12 +39,13 @@ import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.server.fault.FaultHandler
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito import org.mockito.Mockito
import org.mockito.Mockito.{doThrow, mock, verify} import org.mockito.Mockito.{clearInvocations, doThrow, mock, times, verify, verifyNoInteractions}
import org.mockito.invocation.InvocationOnMock import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer import org.mockito.stubbing.Answer
@ -240,7 +242,9 @@ class BrokerMetadataPublisherTest {
val cluster = new KafkaClusterTestKit.Builder( val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder(). new TestKitNodes.Builder().
setNumBrokerNodes(1). setNumBrokerNodes(1).
setNumControllerNodes(1).build()). setNumControllerNodes(1).
setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV1).
build()).
build() build()
try { try {
cluster.format() cluster.format()
@ -293,7 +297,8 @@ class BrokerMetadataPublisherTest {
mock(classOf[DelegationTokenPublisher]), mock(classOf[DelegationTokenPublisher]),
mock(classOf[AclPublisher]), mock(classOf[AclPublisher]),
faultHandler, faultHandler,
faultHandler faultHandler,
mock(classOf[BrokerLifecycleManager]),
) )
val image = MetadataImage.EMPTY val image = MetadataImage.EMPTY
@ -312,4 +317,102 @@ class BrokerMetadataPublisherTest {
verify(groupCoordinator).onNewMetadataImage(image, delta) verify(groupCoordinator).onNewMetadataImage(image, delta)
} }
@Test
def testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
val metadataCache = new KRaftMetadataCache(0)
val logManager = mock(classOf[LogManager])
val replicaManager = mock(classOf[ReplicaManager])
val groupCoordinator = mock(classOf[GroupCoordinator])
val faultHandler = mock(classOf[FaultHandler])
val brokerLifecycleManager = mock(classOf[BrokerLifecycleManager])
val metadataPublisher = new BrokerMetadataPublisher(
config,
metadataCache,
logManager,
replicaManager,
groupCoordinator,
mock(classOf[TransactionCoordinator]),
mock(classOf[DynamicConfigPublisher]),
mock(classOf[DynamicClientQuotaPublisher]),
mock(classOf[ScramPublisher]),
mock(classOf[DelegationTokenPublisher]),
mock(classOf[AclPublisher]),
faultHandler,
faultHandler,
brokerLifecycleManager,
)
var image = MetadataImage.EMPTY
var delta = new MetadataDelta.Builder()
.setImage(image)
.build()
// We first upgrade metadata version to 3_6_IV2
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel()))
var newImage = delta.apply(new MetadataProvenance(100, 4, 2000))
metadataPublisher.onMetadataUpdate(delta, newImage,
LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(100)
.numBytes(42)
.build());
// This should NOT trigger broker reregistration
verifyNoInteractions(brokerLifecycleManager)
// We then upgrade to IBP_3_7_IV2
image = newImage
delta = new MetadataDelta.Builder()
.setImage(image)
.build()
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel()))
newImage = delta.apply(new MetadataProvenance(100, 4, 2000))
metadataPublisher.onMetadataUpdate(delta, newImage,
LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(100)
.numBytes(42)
.build());
// This SHOULD trigger a broker registration
verify(brokerLifecycleManager, times(1)).handleKraftJBODMetadataVersionUpdate()
clearInvocations(brokerLifecycleManager)
// Finally upgrade to IBP_3_8_IV0
image = newImage
delta = new MetadataDelta.Builder()
.setImage(image)
.build()
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()))
newImage = delta.apply(new MetadataProvenance(200, 4, 3000))
metadataPublisher.onMetadataUpdate(delta, newImage,
LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(100)
.numBytes(42)
.build());
// This should NOT trigger broker reregistration
verify(brokerLifecycleManager, times(0)).handleKraftJBODMetadataVersionUpdate()
metadataPublisher.close()
}
} }

View File

@ -153,6 +153,16 @@ public class DirectoryId {
if (LOST.equals(dir)) { if (LOST.equals(dir)) {
return false; return false;
} }
// The only time we should have a size be 0 is if we were at a MV prior to 3.7-IV2
// and the system was upgraded. In this case the original list of directories was purged
// during broker registration so we don't know if the directory is online. We assume
// that a broker will halt if all its log directories are down. Eventually the broker
// will send another registration request with information about all log directories.
// Refer KAFKA-16162 for more information
if (sortedOnlineDirs.isEmpty()) {
return true;
}
return Collections.binarySearch(sortedOnlineDirs, dir) >= 0; return Collections.binarySearch(sortedOnlineDirs, dir) >= 0;
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -67,12 +68,16 @@ public class DirectoryIdTest {
@Test @Test
void testIsOnline() { void testIsOnline() {
// Given
List<Uuid> sortedDirs = Arrays.asList( List<Uuid> sortedDirs = Arrays.asList(
Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"),
Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"),
Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw") Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw")
); );
sortedDirs.sort(Uuid::compareTo); sortedDirs.sort(Uuid::compareTo);
List<Uuid> emptySortedDirs = Collections.emptyList();
// When/Then
assertTrue(DirectoryId.isOnline(Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), sortedDirs)); assertTrue(DirectoryId.isOnline(Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), sortedDirs));
assertTrue(DirectoryId.isOnline(Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), sortedDirs)); assertTrue(DirectoryId.isOnline(Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), sortedDirs));
assertTrue(DirectoryId.isOnline(Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw"), sortedDirs)); assertTrue(DirectoryId.isOnline(Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw"), sortedDirs));
@ -80,5 +85,6 @@ public class DirectoryIdTest {
assertTrue(DirectoryId.isOnline(DirectoryId.UNASSIGNED, sortedDirs)); assertTrue(DirectoryId.isOnline(DirectoryId.UNASSIGNED, sortedDirs));
assertFalse(DirectoryId.isOnline(DirectoryId.LOST, sortedDirs)); assertFalse(DirectoryId.isOnline(DirectoryId.LOST, sortedDirs));
assertFalse(DirectoryId.isOnline(Uuid.fromString("AMYchbMtS6yhtsXbca7DQg"), sortedDirs)); assertFalse(DirectoryId.isOnline(Uuid.fromString("AMYchbMtS6yhtsXbca7DQg"), sortedDirs));
assertTrue(DirectoryId.isOnline(Uuid.randomUuid(), emptySortedDirs));
} }
} }