KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface (#13357)

The new group coordinator needs to access cluster metadata (e.g. topics, partitions, etc.) and it needs a mechanism to be notified when the metadata changes (e.g. to trigger a rebalance). In KRaft clusters, the easiest is to subscribe to metadata changes via the MetadataPublisher.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-03-08 08:52:01 +01:00 committed by GitHub
parent 4527e54647
commit 788cc11f45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 21 deletions

View File

@ -1231,6 +1231,7 @@ project(':group-coordinator') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
testImplementation project(':clients').sourceSets.test.output

View File

@ -345,6 +345,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.util"/>
</subpackage>
</subpackage>

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.util.FutureUtils
import java.util
@ -580,6 +581,13 @@ private[group] class GroupCoordinatorAdapter(
coordinator.onResignation(groupMetadataPartitionIndex, groupMetadataPartitionLeaderEpoch)
}
override def onNewMetadataImage(
newImage: MetadataImage,
delta: MetadataDelta
): Unit = {
// The metadata image is not used in the old group coordinator.
}
override def groupMetadataTopicConfigs(): Properties = {
coordinator.offsetsTopicConfigs
}

View File

@ -96,7 +96,7 @@ object BrokerMetadataPublisher extends Logging {
}
class BrokerMetadataPublisher(
conf: KafkaConfig,
config: KafkaConfig,
metadataCache: KRaftMetadataCache,
logManager: LogManager,
replicaManager: ReplicaManager,
@ -109,14 +109,14 @@ class BrokerMetadataPublisher(
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler
) extends MetadataPublisher with Logging {
logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
import BrokerMetadataPublisher._
/**
* The broker ID.
*/
val brokerId: Int = conf.nodeId
val brokerId: Int = config.nodeId
/**
* True if this is the first time we have published metadata.
@ -169,7 +169,7 @@ class BrokerMetadataPublisher(
replicaManager.applyDelta(topicsDelta, newImage)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error applying topics " +
s"delta in ${deltaName}", t)
s"delta in $deltaName", t)
}
try {
// Update the group coordinator of local changes
@ -181,7 +181,7 @@ class BrokerMetadataPublisher(
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in ${deltaName}", t)
s"coordinator with local changes in $deltaName", t)
}
try {
// Update the transaction coordinator of local changes
@ -192,7 +192,7 @@ class BrokerMetadataPublisher(
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in ${deltaName}", t)
s"coordinator with local changes in $deltaName", t)
}
try {
// Notify the group coordinator about deleted topics.
@ -208,7 +208,7 @@ class BrokerMetadataPublisher(
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with deleted partitions in ${deltaName}", t)
s"coordinator with deleted partitions in $deltaName", t)
}
}
@ -222,7 +222,7 @@ class BrokerMetadataPublisher(
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating client " +
s"quotas in ${deltaName}", t)
s"quotas in $deltaName", t)
}
// Apply changes to SCRAM credentials.
@ -246,7 +246,7 @@ class BrokerMetadataPublisher(
// if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
// we want to apply those changes in that order, not the reverse order! Otherwise
// there could be a window during which incorrect authorization results are returned.
Option(delta.aclsDelta()).foreach( aclsDelta =>
Option(delta.aclsDelta()).foreach { aclsDelta =>
_authorizer match {
case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta) {
try {
@ -257,7 +257,7 @@ class BrokerMetadataPublisher(
authorizer.loadSnapshot(newImage.acls().acls())
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " +
s"authorizer snapshot in ${deltaName}", t)
s"authorizer snapshot in $deltaName", t)
}
} else {
try {
@ -271,11 +271,20 @@ class BrokerMetadataPublisher(
})
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " +
s"authorizer changes in ${deltaName}", t)
s"authorizer changes in $deltaName", t)
}
}
case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do.
})
}
}
try {
// Propagate the new image to the group coordinator.
groupCoordinator.onNewMetadataImage(newImage, delta)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
}
if (_firstPublish) {
finishInitializingReplicaManager(newImage)
@ -283,7 +292,7 @@ class BrokerMetadataPublisher(
publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
s"publishing broker metadata from ${deltaName}", t)
s"publishing broker metadata from $deltaName", t)
} finally {
_firstPublish = false
}
@ -299,7 +308,7 @@ class BrokerMetadataPublisher(
override def publishedOffset: Long = publishedOffsetAtomic.get()
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}
/**
@ -356,7 +365,7 @@ class BrokerMetadataPublisher(
// Make the LogCleaner available for reconfiguration. We can't do this prior to this
// point because LogManager#startup creates the LogCleaner object, if
// log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting LogManager", t)
}
@ -369,14 +378,14 @@ class BrokerMetadataPublisher(
try {
// Start the group coordinator.
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
.getOrElse(conf.offsetsTopicPartitions))
.getOrElse(config.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
}
try {
// Start the transaction coordinator.
txnCoordinator.startup(() => metadataCache.numPartitions(
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t)
}

View File

@ -17,11 +17,14 @@
package kafka.server.metadata
import kafka.coordinator.transaction.TransactionCoordinator
import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.UnifiedLog
import kafka.server.{BrokerServer, KafkaConfig}
import kafka.log.{LogManager, UnifiedLog}
import kafka.security.CredentialProvider
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@ -30,7 +33,8 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest, TopicImage, TopicsImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.fault.FaultHandler
@ -38,7 +42,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
import org.mockito.Mockito.doThrow
import org.mockito.Mockito.{doThrow, mock, verify}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@ -263,4 +267,42 @@ class BrokerMetadataPublisherTest {
cluster.close()
}
}
@Test
def testNewImagePushedToGroupCoordinator(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
val metadataCache = new KRaftMetadataCache(0)
val logManager = mock(classOf[LogManager])
val replicaManager = mock(classOf[ReplicaManager])
val groupCoordinator = mock(classOf[GroupCoordinator])
val txnCoordinator = mock(classOf[TransactionCoordinator])
val quotaManager = mock(classOf[ClientQuotaMetadataManager])
val configPublisher = mock(classOf[DynamicConfigPublisher])
val credentialProvider = mock(classOf[CredentialProvider])
val faultHandler = mock(classOf[FaultHandler])
val metadataPublisher = new BrokerMetadataPublisher(
config,
metadataCache,
logManager,
replicaManager,
groupCoordinator,
txnCoordinator,
quotaManager,
configPublisher,
None,
credentialProvider,
faultHandler,
faultHandler
)
val image = MetadataImage.EMPTY
val delta = new MetadataDelta.Builder()
.setImage(image)
.build()
metadataPublisher.publish(delta, image)
verify(groupCoordinator).onNewMetadataImage(image, delta)
}
}

View File

@ -42,6 +42,8 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import java.util.List;
import java.util.OptionalInt;
@ -300,6 +302,17 @@ public interface GroupCoordinator {
OptionalInt groupMetadataPartitionLeaderEpoch
);
/**
* A new metadata image is available.
*
* @param newImage The new metadata image.
* @param delta The metadata delta.
*/
void onNewMetadataImage(
MetadataImage newImage,
MetadataDelta delta
);
/**
* Return the configuration properties of the internal group
* metadata topic.