KAFKA-19274; Group Coordinator Shards are not unloaded when `__consumer_offsets` topic is deleted (#19713)

Group Coordinator Shards are not unloaded when `__consumer_offsets`
topic is deleted. The unloading is scheduled but it is ignored because
the epoch is equal to the current epoch:

```
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
Scheduling  unloading of metadata for __consumer_offsets-0 with epoch
OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
```

This patch fixes the issue by not setting the leader epoch in this case.
The coordinator expects the leader epoch to be incremented when the
resignation code is called. When the topic is deleted, the epoch is not
incremented. Therefore, we must not use it. Note that this is aligned
with deleted partitions are handled too.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
David Jacot 2025-05-15 19:04:38 +02:00 committed by GitHub
parent 8a577fa5af
commit c612cfff29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 129 additions and 4 deletions

View File

@ -287,6 +287,11 @@ class BrokerMetadataPublisher(
/** /**
* Update the coordinator of local replica changes: election and resignation. * Update the coordinator of local replica changes: election and resignation.
* *
* When the topic is deleted or a partition of the topic is deleted, {@param resignation}
* callback must be called with {@code None}. The coordinator expects the leader epoch to be
* incremented when the {@param resignation} callback is called but the leader epoch
* is not incremented when a topic is deleted.
*
* @param image latest metadata image * @param image latest metadata image
* @param delta metadata delta from the previous image and the latest image * @param delta metadata delta from the previous image and the latest image
* @param topicName name of the topic associated with the coordinator * @param topicName name of the topic associated with the coordinator
@ -307,7 +312,7 @@ class BrokerMetadataPublisher(
if (topicsDelta.topicWasDeleted(topicName)) { if (topicsDelta.topicWasDeleted(topicName)) {
topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry => topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry =>
if (entry.getValue.leader == brokerId) { if (entry.getValue.leader == brokerId) {
resignation(entry.getKey, Some(entry.getValue.leaderEpoch)) resignation(entry.getKey, None)
} }
} }
} }

View File

@ -16,8 +16,8 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Typ
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription} import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata} import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata}
import org.apache.kafka.common.errors.GroupIdNotFoundException import org.apache.kafka.common.errors.{GroupIdNotFoundException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, TopicPartition} import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, TopicCollection, TopicPartition}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -27,11 +27,13 @@ import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerConfigs import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.storage.internals.log.UnifiedLog import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.Timeout
import java.time.Duration import java.time.Duration
import java.util.Collections import java.util.Collections
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionException
@Timeout(120) @Timeout(120)
class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) { class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
@ -278,6 +280,58 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
} }
} }
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testRecreatingConsumerOffsetsTopic(): Unit = {
withAdmin { admin =>
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
withConsumer(groupId = "group", groupProtocol = GroupProtocol.CONSUMER) { consumer =>
consumer.subscribe(List("foo").asJava)
TestUtils.waitUntilTrue(() => {
consumer.poll(Duration.ofMillis(50))
consumer.assignment().asScala.nonEmpty
}, msg = "Consumer did not get an non empty assignment")
}
admin
.deleteTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
.all()
.get()
TestUtils.waitUntilTrue(() => {
try {
admin
.describeTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
.topicNameValues()
.get(Topic.GROUP_METADATA_TOPIC_NAME)
.get(JTestUtils.DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS)
false
} catch {
case e: ExecutionException =>
e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
}
}, msg = s"${Topic.GROUP_METADATA_TOPIC_NAME} was not deleted")
withConsumer(groupId = "group", groupProtocol = GroupProtocol.CONSUMER) { consumer =>
consumer.subscribe(List("foo").asJava)
TestUtils.waitUntilTrue(() => {
consumer.poll(Duration.ofMillis(50))
consumer.assignment().asScala.nonEmpty
}, msg = "Consumer did not get an non empty assignment")
}
}
}
private def rollAndCompactConsumerOffsets(): Unit = { private def rollAndCompactConsumerOffsets(): Unit = {
val tp = new TopicPartition("__consumer_offsets", 0) val tp = new TopicPartition("__consumer_offsets", 0)
val broker = cluster.brokers.asScala.head._2 val broker = cluster.brokers.asScala.head._2

View File

@ -28,9 +28,11 @@ import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic} import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER import org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.metadata.FeatureLevelRecord import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes} import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.Exit import org.apache.kafka.common.utils.Exit
import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.group.GroupCoordinator
@ -183,6 +185,70 @@ class BrokerMetadataPublisherTest {
} }
} }
@Test
def testGroupCoordinatorTopicDeletion(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))
val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_1)
val logManager = mock(classOf[LogManager])
val replicaManager = mock(classOf[ReplicaManager])
val groupCoordinator = mock(classOf[GroupCoordinator])
val faultHandler = mock(classOf[FaultHandler])
val metadataPublisher = new BrokerMetadataPublisher(
config,
metadataCache,
logManager,
replicaManager,
groupCoordinator,
mock(classOf[TransactionCoordinator]),
mock(classOf[ShareCoordinator]),
mock(classOf[DynamicConfigPublisher]),
mock(classOf[DynamicClientQuotaPublisher]),
mock(classOf[DynamicTopicClusterQuotaPublisher]),
mock(classOf[ScramPublisher]),
mock(classOf[DelegationTokenPublisher]),
mock(classOf[AclPublisher]),
faultHandler,
faultHandler,
mock(classOf[SharePartitionManager])
)
val topicId = Uuid.randomUuid()
var delta = new MetadataDelta(MetadataImage.EMPTY)
delta.replay(new TopicRecord()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setTopicId(topicId)
)
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setLeader(config.brokerId)
)
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(1)
.setLeader(config.brokerId)
)
val image = delta.apply(MetadataProvenance.EMPTY)
delta = new MetadataDelta(image)
delta.replay(new RemoveTopicRecord()
.setTopicId(topicId)
)
metadataPublisher.onMetadataUpdate(delta, delta.apply(MetadataProvenance.EMPTY),
LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(100)
.numBytes(42)
.build())
verify(groupCoordinator).onResignation(0, OptionalInt.empty())
verify(groupCoordinator).onResignation(1, OptionalInt.empty())
}
@Test @Test
def testNewImagePushedToGroupCoordinator(): Unit = { def testNewImagePushedToGroupCoordinator(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)) val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))