From 0bc91be14591c742e19e3572871080386b904849 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 9 Nov 2024 17:21:16 -0800 Subject: [PATCH] KAFKA-17872: Update consumed offsets on records with invalid timestamp (#17710) TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress. Reviewers: Bill Bejeck --- .../processor/internals/RecordQueue.java | 1 + .../processor/internals/RecordQueueTest.java | 59 ++-- .../processor/internals/StreamTaskTest.java | 323 ++++++++++++------ 3 files changed, 264 insertions(+), 119 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index ea03b2f8a0e..d38d7b625ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -240,6 +240,7 @@ public class RecordQueue { deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() ); droppedRecordsSensor.record(); + lastCorruptedRecord = raw; continue; } headRecord = new StampedRecord(deserialized, timestamp); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 1e7c12167c7..1dd19fb9cf7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; @@ -74,8 +74,7 @@ public class RecordQueueTest { private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime()); - @SuppressWarnings("rawtypes") - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( + final InternalMockProcessorContext context = new InternalMockProcessorContext<>( StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), new MockRecordCollector(), metrics @@ -88,19 +87,28 @@ public class RecordQueueTest { timestampExtractor, new LogAndFailExceptionHandler(), context, - new LogContext()); + new LogContext() + ); private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue( new TopicPartition("topic", 1), mockSourceNodeWithMetrics, timestampExtractor, new LogAndContinueExceptionHandler(), context, - new LogContext()); + new LogContext() + ); + private final RecordQueue queueThatSkipsInvalidTimestamps = new RecordQueue( + new TopicPartition("topic", 1), + mockSourceNodeWithMetrics, + new LogAndSkipOnInvalidTimestamp(), + new LogAndFailExceptionHandler(), + context, + new LogContext() + ); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); - @SuppressWarnings("unchecked") @BeforeEach public void before() { mockSourceNodeWithMetrics.init(context); @@ -340,7 +348,7 @@ public class RecordQueueTest { @Test public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() { - final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); + final byte[] key = new LongSerializer().serialize("foo", 1L); final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue, new RecordHeaders(), Optional.empty())); @@ -354,7 +362,7 @@ public class RecordQueueTest { @Test public void shouldThrowStreamsExceptionWhenValueDeserializationFails() { - final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final byte[] value = new LongSerializer().serialize("foo", 1L); final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value, new RecordHeaders(), Optional.empty())); @@ -368,7 +376,7 @@ public class RecordQueueTest { @Test public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() { - final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); + final byte[] key = new LongSerializer().serialize("foo", 1L); final ConsumerRecord record = new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue, new RecordHeaders(), Optional.empty()); @@ -381,7 +389,7 @@ public class RecordQueueTest { @Test public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() { - final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final byte[] value = new LongSerializer().serialize("foo", 1L); final ConsumerRecord record = new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value, new RecordHeaders(), Optional.empty()); @@ -404,7 +412,7 @@ public class RecordQueueTest { mockSourceNodeWithMetrics, new FailOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), - new InternalMockProcessorContext(), + new InternalMockProcessorContext<>(), new LogContext()); final StreamsException exception = assertThrows( @@ -421,20 +429,25 @@ public class RecordQueueTest { @Test public void shouldDropOnNegativeTimestamp() { - final List> records = Collections.singletonList( - new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, - new RecordHeaders(), Optional.empty())); + final ConsumerRecord record = new ConsumerRecord<>( + "topic", + 1, + 1, + -1L, // negative timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ); + final List> records = Collections.singletonList(record); - final RecordQueue queue = new RecordQueue( - new TopicPartition("topic", 1), - mockSourceNodeWithMetrics, - new LogAndSkipOnInvalidTimestamp(), - new LogAndContinueExceptionHandler(), - new InternalMockProcessorContext(), - new LogContext()); - queue.addRawRecords(records); + queueThatSkipsInvalidTimestamps.addRawRecords(records); - assertEquals(0, queue.size()); + assertEquals(1, queueThatSkipsInvalidTimestamps.size()); + assertEquals(new CorruptedRecord(record), queueThatSkipsInvalidTimestamps.poll(0)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 36471dcd02b..b36f5e41e47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -38,14 +38,15 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; @@ -59,10 +60,13 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.internals.FailedProcessingException; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -87,7 +91,6 @@ import org.mockito.quality.Strictness; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collections; import java.util.HashSet; @@ -157,12 +160,12 @@ public class StreamTaskTest { private final TopicPartition partition1 = new TopicPartition(topic1, 1); private final TopicPartition partition2 = new TopicPartition(topic2, 1); private final Set partitions = new HashSet<>(List.of(partition1, partition2)); - private final Serializer intSerializer = Serdes.Integer().serializer(); - private final Deserializer intDeserializer = Serdes.Integer().deserializer(); + private final Serializer intSerializer = new IntegerSerializer(); + private final Deserializer intDeserializer = new IntegerDeserializer(); private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode source3 = new MockSourceNode(intDeserializer, intDeserializer) { + private final MockSourceNode source3 = new MockSourceNode<>(intDeserializer, intDeserializer) { @Override public void process(final Record record) { throw new RuntimeException("KABOOM!"); @@ -173,7 +176,7 @@ public class StreamTaskTest { throw new RuntimeException("KABOOM!"); } }; - private final MockSourceNode timeoutSource = new MockSourceNode(intDeserializer, intDeserializer) { + private final MockSourceNode timeoutSource = new MockSourceNode<>(intDeserializer, intDeserializer) { @Override public void process(final Record record) { throw new TimeoutException("Kaboom!"); @@ -248,18 +251,54 @@ public class StreamTaskTest { } private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) { - return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName()); + return createConfig( + eosConfig, + enforcedProcessingValue, + LogAndFailExceptionHandler.class, + LogAndFailProcessingExceptionHandler.class, + FailOnInvalidTimestamp.class + ); } - private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue, final String deserializationExceptionHandler) { - return createConfig(eosConfig, enforcedProcessingValue, deserializationExceptionHandler, LogAndFailProcessingExceptionHandler.class.getName()); + private static StreamsConfig createConfig(final Class deserializationExceptionHandler) { + return createConfig( + AT_LEAST_ONCE, + "0", // max.task.idle.ms + deserializationExceptionHandler, + LogAndFailProcessingExceptionHandler.class, + FailOnInvalidTimestamp.class + ); + } + + private static StreamsConfig createConfigWithTsExtractor(final Class timestampExtractor) { + return createConfig( + AT_LEAST_ONCE, + "0", // max.task.idle.ms + LogAndFailExceptionHandler.class, + LogAndFailProcessingExceptionHandler.class, + timestampExtractor + ); + } + + private static StreamsConfig createConfig( + final String enforcedProcessingValue, + final Class processingExceptionHandler + ) { + return createConfig( + AT_LEAST_ONCE, + enforcedProcessingValue, + LogAndFailExceptionHandler.class, + processingExceptionHandler, + FailOnInvalidTimestamp.class + ); } private static StreamsConfig createConfig( final String eosConfig, final String enforcedProcessingValue, - final String deserializationExceptionHandler, - final String processingExceptionHandler) { + final Class deserializationExceptionHandler, + final Class processingExceptionHandler, + final Class timestampExtractor) { final String canonicalPath; try { canonicalPath = BASE_DIR.getCanonicalPath(); @@ -275,8 +314,9 @@ public class StreamTaskTest { mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig), mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue), - mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler), - mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler) + mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler.getName()), + mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler.getName()), + mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, timestampExtractor.getName()) ))); } @@ -400,36 +440,38 @@ public class StreamTaskTest { task.addPartitionsForOffsetReset(Collections.singleton(partition1)); final AtomicReference shouldNotSeek = new AtomicReference<>(); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public void seek(final TopicPartition partition, final long offset) { - final AssertionError error = shouldNotSeek.get(); - if (error != null) { - throw error; + try (final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST) { + @Override + public void seek(final TopicPartition partition, final long offset) { + final AssertionError error = shouldNotSeek.get(); + if (error != null) { + throw error; + } + super.seek(partition, offset); } - super.seek(partition, offset); - } - }; - consumer.assign(asList(partition1, partition2)); - consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L))); + }) { - consumer.seek(partition1, 5L); - consumer.seek(partition2, 15L); + consumer.assign(asList(partition1, partition2)); + consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L))); - shouldNotSeek.set(new AssertionError("Should not seek")); + consumer.seek(partition1, 5L); + consumer.seek(partition2, 15L); - // We need to keep a separate reference to the arguments of Consumer#accept - // because the underlying data-structure is emptied and on verification time - // it is reported as empty. - final Set partitionsAtCall = new HashSet<>(); + shouldNotSeek.set(new AssertionError("Should not seek")); - task.initializeIfNeeded(); - task.completeRestoration(partitionsAtCall::addAll); + // We need to keep a separate reference to the arguments of Consumer#accept + // because the underlying data-structure is emptied and on verification time + // it is reported as empty. + final Set partitionsAtCall = new HashSet<>(); - // because we mocked the `resetter` positions don't change - assertThat(consumer.position(partition1), equalTo(5L)); - assertThat(consumer.position(partition2), equalTo(15L)); - assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1))); + task.initializeIfNeeded(); + task.completeRestoration(partitionsAtCall::addAll); + + // because we mocked the `resetter` positions don't change + assertThat(consumer.position(partition1), equalTo(5L)); + assertThat(consumer.position(partition2), equalTo(15L)); + assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1))); + } } @Test @@ -915,10 +957,9 @@ public class StreamTaskTest { reporter.contextChange(metricsContext); metrics.addReporter(reporter); - final String threadIdTag = THREAD_ID_TAG; assertTrue(reporter.containsMbean(String.format( "kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", - threadIdTag, + THREAD_ID_TAG, threadId, task.id() ))); @@ -1451,18 +1492,15 @@ public class StreamTaskTest { assertFalse(task.process(0L)); - final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); - - task.addRecords(partition1, singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 0))); assertFalse(task.process(0L)); assertThat("task is idling", task.timeCurrentIdlingStarted().isPresent()); - task.addRecords(partition2, singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); + task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 0))); assertTrue(task.process(0L)); assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent()); - } @Test @@ -1863,7 +1901,7 @@ public class StreamTaskTest { consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L))); final StreamsConfig config = createConfig(); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -2156,7 +2194,6 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); final long offset = 543L; - final long consumedOffset = 345L; when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); when(stateManager.changelogOffsets()) @@ -2480,7 +2517,7 @@ public class StreamTaskTest { @Test public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, createConfig("100"), stateManager, @@ -2547,21 +2584,118 @@ public class StreamTaskTest { } @Test - public void shouldUpdateOffsetIfAllRecordsAreCorrupted() { + public void shouldUpdateOffsetIfAllRecordsHaveInvalidTimestamp() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig( - AT_LEAST_ONCE, - "0", - LogAndContinueExceptionHandler.class.getName() - )); + task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class)); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + long offset = -1L; + + final List> records = asList( + getConsumerRecordWithInvalidTimestamp(++offset), + getConsumerRecordWithInvalidTimestamp(++offset) + ); + consumer.addRecord(records.get(0)); + consumer.addRecord(records.get(1)); + task.resumePollingForPartitionsWithAvailableSpace(); + consumer.poll(Duration.ZERO); + task.addRecords(partition1, records); + task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), "")); + task.updateLags(); + + assertTrue(task.process(offset)); + assertTrue(task.commitNeeded()); + assertThat( + task.prepareCommit(), + equalTo(mkMap(mkEntry(partition1, + new OffsetAndMetadata(offset + 1, + new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode())))) + ); + } + + @Test + public void shouldUpdateOffsetIfValidRecordFollowsInvalidTimestamp() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class)); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + long offset = -1L; + + final List> records = asList( + getConsumerRecordWithInvalidTimestamp(++offset), + getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset) + ); + consumer.addRecord(records.get(0)); + consumer.addRecord(records.get(1)); + task.resumePollingForPartitionsWithAvailableSpace(); + consumer.poll(Duration.ZERO); + task.addRecords(partition1, records); + task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), "")); + task.updateLags(); + + assertTrue(task.process(offset)); + assertTrue(task.commitNeeded()); + assertThat( + task.prepareCommit(), + equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode())))) + ); + } + + @Test + public void shouldUpdateOffsetIfInvalidTimestampeRecordFollowsValid() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class)); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); long offset = -1; + + final List> records = asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset), + getConsumerRecordWithInvalidTimestamp(++offset)); + consumer.addRecord(records.get(0)); + consumer.addRecord(records.get(1)); + task.resumePollingForPartitionsWithAvailableSpace(); + consumer.poll(Duration.ZERO); + task.addRecords(partition1, records); + task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), "")); + + task.updateLags(); + + assertTrue(task.process(offset)); + assertTrue(task.commitNeeded()); + assertThat( + task.prepareCommit(), + equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) + ); + + assertTrue(task.process(offset)); + assertTrue(task.commitNeeded()); + assertThat( + task.prepareCommit(), + equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) + ); + } + + @Test + public void shouldUpdateOffsetIfAllRecordsAreCorrupted() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class)); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + long offset = -1L; + final List> records = asList( getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset), - getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)); + getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset) + ); consumer.addRecord(records.get(0)); consumer.addRecord(records.get(1)); task.resumePollingForPartitionsWithAvailableSpace(); @@ -2584,19 +2718,16 @@ public class StreamTaskTest { public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig( - AT_LEAST_ONCE, - "0", - LogAndContinueExceptionHandler.class.getName() - )); + task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class)); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - long offset = -1; + long offset = -1L; final List> records = asList( getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset), - getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset)); + getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset) + ); consumer.addRecord(records.get(0)); consumer.addRecord(records.get(1)); task.resumePollingForPartitionsWithAvailableSpace(); @@ -2617,18 +2748,16 @@ public class StreamTaskTest { public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig( - AT_LEAST_ONCE, - "0", - LogAndContinueExceptionHandler.class.getName())); + task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class)); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - long offset = -1; + long offset = -1L; final List> records = asList( getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset), - getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)); + getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset) + ); consumer.addRecord(records.get(0)); consumer.addRecord(records.get(1)); task.resumePollingForPartitionsWithAvailableSpace(); @@ -2682,10 +2811,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - LogAndContinueProcessingExceptionHandler.class.getName() + LogAndContinueProcessingExceptionHandler.class )); final StreamsException streamsException = assertThrows( @@ -2704,10 +2831,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - LogAndContinueProcessingExceptionHandler.class.getName() + LogAndContinueProcessingExceptionHandler.class )); final Set tasksIds = new HashSet<>(); @@ -2734,10 +2859,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - LogAndContinueProcessingExceptionHandler.class.getName() + LogAndContinueProcessingExceptionHandler.class )); final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause")); @@ -2757,10 +2880,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - LogAndContinueProcessingExceptionHandler.class.getName() + LogAndContinueProcessingExceptionHandler.class )); task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { @@ -2773,10 +2894,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - LogAndFailProcessingExceptionHandler.class.getName() + LogAndFailProcessingExceptionHandler.class )); final StreamsException streamsException = assertThrows( @@ -2795,10 +2914,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - NullProcessingExceptionHandler.class.getName() + NullProcessingExceptionHandler.class )); final StreamsException streamsException = assertThrows( @@ -2818,10 +2935,8 @@ public class StreamTaskTest { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); task = createStatelessTask(createConfig( - AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), - CrashingProcessingExceptionHandler.class.getName() + CrashingProcessingExceptionHandler.class )); final FailedProcessingException streamsException = assertThrows( @@ -2879,7 +2994,7 @@ public class StreamTaskTest { singletonList(stateStore), Collections.singletonMap(storeName, topic1)); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -2921,7 +3036,7 @@ public class StreamTaskTest { } }; - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -2955,7 +3070,7 @@ public class StreamTaskTest { emptyMap() ); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -2994,7 +3109,7 @@ public class StreamTaskTest { singletonList(stateStore), logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.emptyMap()); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -3029,7 +3144,7 @@ public class StreamTaskTest { source1.addChild(processorStreamTime); source1.addChild(processorSystemTime); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -3066,7 +3181,7 @@ public class StreamTaskTest { source1.addChild(processorSystemTime); source2.addChild(processorSystemTime); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -3102,7 +3217,7 @@ public class StreamTaskTest { final StreamsConfig config = createConfig(); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -3135,7 +3250,7 @@ public class StreamTaskTest { ); final StreamsConfig config = createConfig(eosConfig, "0"); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( taskId, config, stateManager, @@ -3212,6 +3327,22 @@ public class StreamTaskTest { ); } + private ConsumerRecord getConsumerRecordWithInvalidTimestamp(final long offset) { + return new ConsumerRecord<>( + topic1, + 1, + offset, + -1L, // invalid (negative) timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ); + } + private ConsumerRecord getConsumerRecordWithOffsetAsTimestampWithLeaderEpoch(final TopicPartition topicPartition, final long offset, final int leaderEpoch) {