From 1ebca7817b537b6503941276a1d654ff5a5ae615 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Mon, 29 Sep 2025 23:26:49 +0800 Subject: [PATCH] KAFKA-19539: Kafka Streams should also purge internal topics based on user commit requests (#20234) Repartition topic records should be purged up to the currently committed offset once `repartition.purge.interval.ms` duration has passed. Reviewers: Matthias J. Sax --- .../processor/internals/StreamThread.java | 12 +++---- .../processor/internals/StreamThreadTest.java | 33 +++++++++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 649a1ec666c..91511da5ee0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1837,12 +1837,6 @@ public class StreamThread extends Thread implements ProcessingThread { .collect(Collectors.toSet()) ); - if ((now - lastPurgeMs) > purgeTimeMs) { - // try to purge the committed records for repartition topics if possible - taskManager.maybePurgeCommittedRecords(); - lastPurgeMs = now; - } - if (committed == -1) { log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes."); } else { @@ -1853,6 +1847,12 @@ public class StreamThread extends Thread implements ProcessingThread { committed = taskManager.maybeCommitActiveTasksPerUserRequested(); } + if ((now - lastPurgeMs) > purgeTimeMs) { + // try to purge the committed records for repartition topics if possible + taskManager.maybePurgeCommittedRecords(); + lastPurgeMs = now; + } + return committed; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 8fcc44993a5..e53474db74a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -624,6 +624,39 @@ public class StreamThreadTest { verify(taskManager).maybePurgeCommittedRecords(); } + @ParameterizedTest + @MethodSource("data") + public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + final long purgeInterval = 1000L; + final long commitInterval = Long.MAX_VALUE; + final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); + props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); + + final StreamsConfig config = new StreamsConfig(props); + @SuppressWarnings("unchecked") + final Consumer consumer = mock(Consumer.class); + final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final TaskManager taskManager = mock(TaskManager.class); + + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); + + mockTime.sleep(purgeInterval + 1); + + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); + + verify(taskManager, times(2)).maybePurgeCommittedRecords(); + } + @ParameterizedTest @MethodSource("data") public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {