KAFKA-19204: Allow persister retry of initializing topics. (#19603)
CI / build (push) Waiting to run Details

* Currently in the share group heartbeat flow, if we see a TP subscribed
for the first time, we move that TP to initializing state in GC and let
the GC send a persister request to share group initialize the
aforementioned TP.
* However, if the coordinator runtime request for share group heartbeat
times out (maybe due to restarting/bad broker), the future completes
exceptionally resulting in persiter request to not be sent.
* Now, we are in a bad state since the TP is in initializing state in GC
but not persister initialized. Future heartbeats for the same share
partitions will also not help since we do not allow retrying persister
request for initializing TPs.
* This PR remedies the situation by allowing the same.
* A temporary fix to increase offset commit timeouts in system tests was
added to fix the issue. In this PR, we revert that change as well.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-05-02 18:55:29 +05:30 committed by GitHub
parent bc7e57242d
commit e68781414e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 18 additions and 22 deletions

View File

@ -2105,7 +2105,8 @@ public class ShareConsumerTest {
// all messages which can be read are read, some would be redelivered
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
assertTrue(prodState.count().get() < complexCons1.recordsRead());
assertTrue(prodState.count().get() < complexCons1.recordsRead(),
String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead()));
shutdownExecutorService(service);

View File

@ -2684,7 +2684,12 @@ public class GroupMetadataManager {
Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId);
Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new HashMap<>() : mergeShareGroupInitMaps(info.initializedTopics(), info.initializingTopics());
// We are only considering initialized TPs here. This is because it could happen
// that some topics have been moved to initializing but the corresponding persister request
// could not be made/failed (invoked by the group coordinator). Then there would be no way to try
// the persister call. This way we get the opportunity to retry.
Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new HashMap<>() : info.initializedTopics();
subscriptionMetadata.forEach((topicName, topicMetadata) -> {
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of());

View File

@ -21367,7 +21367,7 @@ public class GroupMetadataManagerTest {
}
@Test
public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() {
public void testShareGroupHeartbeatPersisterRequestWithInitializing() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@ -21411,7 +21411,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(t1Name)));
assertFalse(result.records().contains(
assertTrue(result.records().contains(
newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1))
)),
@ -21422,10 +21422,10 @@ public class GroupMetadataManagerTest {
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(),
Map.of(t1Uuid, Set.of(0, 1)),
groupId,
0,
false
1,
true
);
}
@ -21630,6 +21630,7 @@ public class GroupMetadataManagerTest {
// Since t1 is initializing and t2 is initialized due to replay above.
assertEquals(
Map.of(
t1Id, Set.of(0, 1),
t3Id, Set.of(0, 1, 2)
),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(

View File

@ -300,10 +300,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.use_share_groups = use_share_groups
self.use_streams_groups = use_streams_groups
# Set offsets_commit_timeout based on context
if context.injected_args is not None:
self.offsets_commit_timeout = context.injected_args.get('offsets_commit_timeout')
# Set consumer_group_migration_policy based on context and arguments.
if consumer_group_migration_policy is None:
arg_name = 'consumer_group_migration_policy'
@ -757,8 +753,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config, num_nodes=self.num_nodes,
listener_security_config=self.listener_security_config,
use_share_groups=self.use_share_groups,
offsets_commit_timeout=self.offsets_commit_timeout)
use_share_groups=self.use_share_groups)
configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
if not l.startswith("#") and "=" in l )

View File

@ -131,7 +131,3 @@ group.initial.rebalance.delay.ms=100
share.coordinator.state.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
share.coordinator.state.topic.min.isr=1
{% endif %}
{% if offsets_commit_timeout is defined and offsets_commit_timeout is not none %}
offsets.commit.timeout.ms={{ offsets_commit_timeout }}
{% endif %}

View File

@ -208,17 +208,15 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
clean_shutdown=[True, False],
metadata_quorum=[quorum.isolated_kraft],
num_failed_brokers=[1, 2],
use_share_groups=[True],
offsets_commit_timeout=[20000]
use_share_groups=[True]
)
@matrix(
clean_shutdown=[True, False],
metadata_quorum=[quorum.combined_kraft],
num_failed_brokers=[1],
use_share_groups=[True],
offsets_commit_timeout=[20000]
use_share_groups=[True]
)
def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True, offsets_commit_timeout=20000):
def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True):
producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest")