From e68781414e9bcbc1d7cd5c247433a13f8d0e2e6e Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Fri, 2 May 2025 18:55:29 +0530 Subject: [PATCH] KAFKA-19204: Allow persister retry of initializing topics. (#19603) * Currently in the share group heartbeat flow, if we see a TP subscribed for the first time, we move that TP to initializing state in GC and let the GC send a persister request to share group initialize the aforementioned TP. * However, if the coordinator runtime request for share group heartbeat times out (maybe due to restarting/bad broker), the future completes exceptionally resulting in persiter request to not be sent. * Now, we are in a bad state since the TP is in initializing state in GC but not persister initialized. Future heartbeats for the same share partitions will also not help since we do not allow retrying persister request for initializing TPs. * This PR remedies the situation by allowing the same. * A temporary fix to increase offset commit timeouts in system tests was added to fix the issue. In this PR, we revert that change as well. Reviewers: Andrew Schofield --- .../kafka/clients/consumer/ShareConsumerTest.java | 3 ++- .../kafka/coordinator/group/GroupMetadataManager.java | 7 ++++++- .../coordinator/group/GroupMetadataManagerTest.java | 11 ++++++----- tests/kafkatest/services/kafka/kafka.py | 7 +------ .../services/kafka/templates/kafka.properties | 4 ---- tests/kafkatest/tests/client/share_consumer_test.py | 8 +++----- 6 files changed, 18 insertions(+), 22 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index edb6d56215e..bdeeafec68f 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2105,7 +2105,8 @@ public class ShareConsumerTest { // all messages which can be read are read, some would be redelivered TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!"); - assertTrue(prodState.count().get() < complexCons1.recordsRead()); + assertTrue(prodState.count().get() < complexCons1.recordsRead(), + String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead())); shutdownExecutorService(service); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index fb2c7c4c6af..c3387859bc7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2684,7 +2684,12 @@ public class GroupMetadataManager { Map> topicPartitionChangeMap = new HashMap<>(); ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); - Map> alreadyInitialized = info == null ? new HashMap<>() : mergeShareGroupInitMaps(info.initializedTopics(), info.initializingTopics()); + + // We are only considering initialized TPs here. This is because it could happen + // that some topics have been moved to initializing but the corresponding persister request + // could not be made/failed (invoked by the group coordinator). Then there would be no way to try + // the persister call. This way we get the opportunity to retry. + Map> alreadyInitialized = info == null ? new HashMap<>() : info.initializedTopics(); subscriptionMetadata.forEach((topicName, topicMetadata) -> { Set alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 81b127af16d..065dbd6aa98 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -21367,7 +21367,7 @@ public class GroupMetadataManagerTest { } @Test - public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() { + public void testShareGroupHeartbeatPersisterRequestWithInitializing() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -21411,7 +21411,7 @@ public class GroupMetadataManagerTest { .setMemberEpoch(0) .setSubscribedTopicNames(List.of(t1Name))); - assertFalse(result.records().contains( + assertTrue(result.records().contains( newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)) )), @@ -21422,10 +21422,10 @@ public class GroupMetadataManagerTest { verifyShareGroupHeartbeatInitializeRequest( result.response().getValue(), - Map.of(), + Map.of(t1Uuid, Set.of(0, 1)), groupId, - 0, - false + 1, + true ); } @@ -21630,6 +21630,7 @@ public class GroupMetadataManagerTest { // Since t1 is initializing and t2 is initialized due to replay above. assertEquals( Map.of( + t1Id, Set.of(0, 1), t3Id, Set.of(0, 1, 2) ), context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index c89eead4b74..ca19ca8bd11 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -300,10 +300,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.use_share_groups = use_share_groups self.use_streams_groups = use_streams_groups - # Set offsets_commit_timeout based on context - if context.injected_args is not None: - self.offsets_commit_timeout = context.injected_args.get('offsets_commit_timeout') - # Set consumer_group_migration_policy based on context and arguments. if consumer_group_migration_policy is None: arg_name = 'consumer_group_migration_policy' @@ -757,8 +753,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node), security_config=self.security_config, num_nodes=self.num_nodes, listener_security_config=self.listener_security_config, - use_share_groups=self.use_share_groups, - offsets_commit_timeout=self.offsets_commit_timeout) + use_share_groups=self.use_share_groups) configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n') if not l.startswith("#") and "=" in l ) diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index e0b85459590..861c63014c5 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -130,8 +130,4 @@ group.initial.rebalance.delay.ms=100 {% if use_share_groups is not none and use_share_groups %} share.coordinator.state.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }} share.coordinator.state.topic.min.isr=1 -{% endif %} - -{% if offsets_commit_timeout is defined and offsets_commit_timeout is not none %} -offsets.commit.timeout.ms={{ offsets_commit_timeout }} {% endif %} \ No newline at end of file diff --git a/tests/kafkatest/tests/client/share_consumer_test.py b/tests/kafkatest/tests/client/share_consumer_test.py index 8367b1f4097..f47ac1e771b 100644 --- a/tests/kafkatest/tests/client/share_consumer_test.py +++ b/tests/kafkatest/tests/client/share_consumer_test.py @@ -208,17 +208,15 @@ class ShareConsumerTest(VerifiableShareConsumerTest): clean_shutdown=[True, False], metadata_quorum=[quorum.isolated_kraft], num_failed_brokers=[1, 2], - use_share_groups=[True], - offsets_commit_timeout=[20000] + use_share_groups=[True] ) @matrix( clean_shutdown=[True, False], metadata_quorum=[quorum.combined_kraft], num_failed_brokers=[1], - use_share_groups=[True], - offsets_commit_timeout=[20000] + use_share_groups=[True] ) - def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True, offsets_commit_timeout=20000): + def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True): producer = self.setup_producer(self.TOPIC2["name"]) consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest")