KAFKA-17635: Ensure only committed offsets are returned for purging (#17686)

Kafka Streams actively purges records from repartition topics. Prior to this PR, Kafka Streams would retrieve the offset from the consumedOffsets map, but here are a couple of edge cases where the consumedOffsets can get ahead of the commitedOffsets map. In these cases, this means Kafka Streams will potentially purge a repartition record before it's committed.

Updated the current StreamTask test to cover this case

Reviewers: Matthias Sax <mjsax@apache.org>
This commit is contained in:
Bill Bejeck 2024-11-06 17:45:27 -05:00 committed by GitHub
parent 8cbd2edfe7
commit d170b52362
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 5 deletions

View File

@ -1073,10 +1073,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
@Override
public Map<TopicPartition, Long> purgeableOffsets() {
final Map<TopicPartition, Long> purgeableConsumedOffsets = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
for (final Map.Entry<TopicPartition, Long> entry : committedOffsets.entrySet()) {
final TopicPartition tp = entry.getKey();
if (topology.isRepartitionTopic(tp.topic())) {
purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
// committedOffsets map is initialized at -1 so no purging until there's a committed offset
if (entry.getValue() > -1) {
purgeableConsumedOffsets.put(tp, entry.getValue());
}
}
}

View File

@ -77,6 +77,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -1845,8 +1847,9 @@ public class StreamTaskTest {
verify(stateManager).close();
}
@Test
public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final boolean doCommit) {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
final TopicPartition repartition = new TopicPartition("repartition", 1);
@ -1907,10 +1910,17 @@ public class StreamTaskTest {
assertTrue(task.process(0L));
task.prepareCommit();
if (doCommit) {
task.updateCommittedOffsets(repartition, 10L);
}
final Map<TopicPartition, Long> map = task.purgeableOffsets();
assertThat(map, equalTo(singletonMap(repartition, 11L)));
if (doCommit) {
assertThat(map, equalTo(singletonMap(repartition, 10L)));
} else {
assertThat(map, equalTo(Collections.emptyMap()));
}
}
@Test