diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d0a73a1de95..92772c686af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -1073,10 +1073,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @Override public Map purgeableOffsets() { final Map purgeableConsumedOffsets = new HashMap<>(); - for (final Map.Entry entry : consumedOffsets.entrySet()) { + for (final Map.Entry entry : committedOffsets.entrySet()) { final TopicPartition tp = entry.getKey(); if (topology.isRepartitionTopic(tp.topic())) { - purgeableConsumedOffsets.put(tp, entry.getValue() + 1); + // committedOffsets map is initialized at -1 so no purging until there's a committed offset + if (entry.getValue() > -1) { + purgeableConsumedOffsets.put(tp, entry.getValue()); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9cda2b9ee7c..9e5ea0b909d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -77,6 +77,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -1845,8 +1847,9 @@ public class StreamTaskTest { verify(stateManager).close(); } - @Test - public void shouldReturnOffsetsForRepartitionTopicsForPurging() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final boolean doCommit) { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); final TopicPartition repartition = new TopicPartition("repartition", 1); @@ -1907,10 +1910,17 @@ public class StreamTaskTest { assertTrue(task.process(0L)); task.prepareCommit(); + if (doCommit) { + task.updateCommittedOffsets(repartition, 10L); + } final Map map = task.purgeableOffsets(); - assertThat(map, equalTo(singletonMap(repartition, 11L))); + if (doCommit) { + assertThat(map, equalTo(singletonMap(repartition, 10L))); + } else { + assertThat(map, equalTo(Collections.emptyMap())); + } } @Test