diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 4852ba97932..ea88500ba0f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -248,10 +248,11 @@ class PartitionGroup extends AbstractPartitionGroup { if (queue != null) { // get the first record from this queue. + final int oldSize = queue.size(); record = queue.poll(wallClockTime); if (record != null) { - --totalBuffered; + totalBuffered -= oldSize - queue.size(); if (queue.isEmpty()) { // if a certain queue has been drained, reset the flag diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index ecdf5b7fff0..64b4918e56a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -19,9 +19,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Value; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -43,6 +45,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; import java.util.UUID; @@ -558,6 +561,88 @@ public class PartitionGroupTest { assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), nullValue()); // all available records removed } + @Test + public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() { + final PartitionGroup group = new PartitionGroup( + logContext, + mkMap(mkEntry(partition1, queue1)), + tp -> OptionalLong.of(0L), + getValueSensor(metrics, lastLatenessValue), + enforcedProcessingSensor, + maxTaskIdleMs + ); + final List> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue), + new ConsumerRecord<>( + "topic", + 1, + -1, // offset as invalid timestamp + -1, // invalid timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + 11, + 0, + TimestampType.CREATE_TIME, + 0, + 0, + new byte[0], // corrupted key + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + -1, // offset as invalid timestamp + -1, // invalid timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + 13, + 0, + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + new byte[0], // corrupted value + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue) + ); + + group.addRawRecords(partition1, list1); + assertEquals(7, group.numBuffered()); + + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(6, group.numBuffered()); + + // drain corrupted records + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(1, group.numBuffered()); + + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(0, group.numBuffered()); + } + @Test public void shouldNeverWaitIfIdlingIsDisabled() { final PartitionGroup group = new PartitionGroup( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 44882a8a46a..7444f97215f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -145,8 +145,8 @@ public class StreamTaskTest { private final LogContext logContext = new LogContext("[test] "); private final String topic1 = "topic1"; private final String topic2 = "topic2"; - private final TopicPartition partition1 = new TopicPartition(topic1, 1); - private final TopicPartition partition2 = new TopicPartition(topic2, 1); + private final TopicPartition partition1 = new TopicPartition(topic1, 0); + private final TopicPartition partition2 = new TopicPartition(topic2, 0); private final Set partitions = mkSet(partition1, partition2); private final Serializer intSerializer = new IntegerSerializer(); private final Deserializer intDeserializer = new IntegerDeserializer(); @@ -1042,6 +1042,69 @@ public class StreamTaskTest { assertEquals(0, consumer.paused().size()); } + @Test + public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig( + StreamsConfig.AT_LEAST_ONCE, + "-1", + LogAndContinueExceptionHandler.class, + LogAndSkipOnInvalidTimestamp.class + )); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + task.addRecords(partition1, asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, 10), + getConsumerRecordWithOffsetAsTimestamp(partition1, 20), + getConsumerRecordWithInvalidTimestamp(30), + getConsumerRecordWithInvalidTimestamp(40), + getConsumerRecordWithInvalidTimestamp(50) + )); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertEquals(0, consumer.paused().size()); + + assertTrue(task.process(0L)); // drain head record (ie, last invalid record) + assertFalse(task.process(0L)); + assertFalse(task.hasRecordsQueued()); + + + // repeat test for deserialization error + task.resumePollingForPartitionsWithAvailableSpace(); + task.addRecords(partition1, asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, 110), + getConsumerRecordWithOffsetAsTimestamp(partition1, 120), + getCorruptedConsumerRecordWithOffsetAsTimestamp(130), + getCorruptedConsumerRecordWithOffsetAsTimestamp(140), + getCorruptedConsumerRecordWithOffsetAsTimestamp(150) + )); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertEquals(0, consumer.paused().size()); + + assertTrue(task.process(0L)); // drain head record (ie, last corrupted record) + assertFalse(task.process(0L)); + assertFalse(task.hasRecordsQueued()); + } + @Test public void shouldPunctuateOnceStreamTimeAfterGap() { task = createStatelessTask(createConfig()); @@ -2943,7 +3006,7 @@ public class StreamTaskTest { private ConsumerRecord getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, offset, // use the offset as the timestamp TimestampType.CREATE_TIME, @@ -2959,7 +3022,7 @@ public class StreamTaskTest { private ConsumerRecord getConsumerRecordWithInvalidTimestamp(final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, -1L, // invalid (negative) timestamp TimestampType.CREATE_TIME, @@ -2975,7 +3038,7 @@ public class StreamTaskTest { private ConsumerRecord getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, offset, // use the offset as the timestamp TimestampType.CREATE_TIME,