KAFKA-19539: Kafka Streams should also purge internal topics based on user commit requests (#20234)

Repartition topic records should be purged up to the currently committed
offset once `repartition.purge.interval.ms` duration has passed.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lan Ding 2025-09-29 23:26:49 +08:00 committed by GitHub
parent 71c5a426b8
commit 1ebca7817b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 6 deletions

View File

@ -1837,12 +1837,6 @@ public class StreamThread extends Thread implements ProcessingThread {
.collect(Collectors.toSet())
);
if ((now - lastPurgeMs) > purgeTimeMs) {
// try to purge the committed records for repartition topics if possible
taskManager.maybePurgeCommittedRecords();
lastPurgeMs = now;
}
if (committed == -1) {
log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
} else {
@ -1853,6 +1847,12 @@ public class StreamThread extends Thread implements ProcessingThread {
committed = taskManager.maybeCommitActiveTasksPerUserRequested();
}
if ((now - lastPurgeMs) > purgeTimeMs) {
// try to purge the committed records for repartition topics if possible
taskManager.maybePurgeCommittedRecords();
lastPurgeMs = now;
}
return committed;
}

View File

@ -624,6 +624,39 @@ public class StreamThreadTest {
verify(taskManager).maybePurgeCommittedRecords();
}
@ParameterizedTest
@MethodSource("data")
public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long purgeInterval = 1000L;
final long commitInterval = Long.MAX_VALUE;
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval));
final StreamsConfig config = new StreamsConfig(props);
@SuppressWarnings("unchecked")
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
final TaskManager taskManager = mock(TaskManager.class);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
mockTime.sleep(purgeInterval + 1);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
verify(taskManager, times(2)).maybePurgeCommittedRecords();
}
@ParameterizedTest
@MethodSource("data")
public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {