KAFKA-17872: Update consumed offsets on records with invalid timestamp (#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-11-09 17:21:16 -08:00 committed by GitHub
parent 4b80591df2
commit 0bc91be145
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 264 additions and 119 deletions

View File

@ -240,6 +240,7 @@ public class RecordQueue {
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
); );
droppedRecordsSensor.record(); droppedRecordsSensor.record();
lastCorruptedRecord = raw;
continue; continue;
} }
headRecord = new StampedRecord(deserialized, timestamp); headRecord = new StampedRecord(deserialized, timestamp);

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
@ -74,8 +74,7 @@ public class RecordQueueTest {
private final StreamsMetricsImpl streamsMetrics = private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime()); new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
@SuppressWarnings("rawtypes") final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new MockRecordCollector(), new MockRecordCollector(),
metrics metrics
@ -88,19 +87,28 @@ public class RecordQueueTest {
timestampExtractor, timestampExtractor,
new LogAndFailExceptionHandler(), new LogAndFailExceptionHandler(),
context, context,
new LogContext()); new LogContext()
);
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue( private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
new TopicPartition("topic", 1), new TopicPartition("topic", 1),
mockSourceNodeWithMetrics, mockSourceNodeWithMetrics,
timestampExtractor, timestampExtractor,
new LogAndContinueExceptionHandler(), new LogAndContinueExceptionHandler(),
context, context,
new LogContext()); new LogContext()
);
private final RecordQueue queueThatSkipsInvalidTimestamps = new RecordQueue(
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
new LogAndSkipOnInvalidTimestamp(),
new LogAndFailExceptionHandler(),
context,
new LogContext()
);
private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1); private final byte[] recordKey = intSerializer.serialize(null, 1);
@SuppressWarnings("unchecked")
@BeforeEach @BeforeEach
public void before() { public void before() {
mockSourceNodeWithMetrics.init(context); mockSourceNodeWithMetrics.init(context);
@ -340,7 +348,7 @@ public class RecordQueueTest {
@Test @Test
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() { public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final byte[] key = new LongSerializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue, new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
new RecordHeaders(), Optional.empty())); new RecordHeaders(), Optional.empty()));
@ -354,7 +362,7 @@ public class RecordQueueTest {
@Test @Test
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() { public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); final byte[] value = new LongSerializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value, new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
new RecordHeaders(), Optional.empty())); new RecordHeaders(), Optional.empty()));
@ -368,7 +376,7 @@ public class RecordQueueTest {
@Test @Test
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() { public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final byte[] key = new LongSerializer().serialize("foo", 1L);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L, final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, key, recordValue, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
new RecordHeaders(), Optional.empty()); new RecordHeaders(), Optional.empty());
@ -381,7 +389,7 @@ public class RecordQueueTest {
@Test @Test
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() { public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); final byte[] value = new LongSerializer().serialize("foo", 1L);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L, final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, recordKey, value, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
new RecordHeaders(), Optional.empty()); new RecordHeaders(), Optional.empty());
@ -404,7 +412,7 @@ public class RecordQueueTest {
mockSourceNodeWithMetrics, mockSourceNodeWithMetrics,
new FailOnInvalidTimestamp(), new FailOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(), new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(), new InternalMockProcessorContext<>(),
new LogContext()); new LogContext());
final StreamsException exception = assertThrows( final StreamsException exception = assertThrows(
@ -421,20 +429,25 @@ public class RecordQueueTest {
@Test @Test
public void shouldDropOnNegativeTimestamp() { public void shouldDropOnNegativeTimestamp() {
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, "topic",
new RecordHeaders(), Optional.empty())); 1,
1,
-1L, // negative timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final RecordQueue queue = new RecordQueue( queueThatSkipsInvalidTimestamps.addRawRecords(records);
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
new LogAndSkipOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
new LogContext());
queue.addRawRecords(records);
assertEquals(0, queue.size()); assertEquals(1, queueThatSkipsInvalidTimestamps.size());
assertEquals(new CorruptedRecord(record), queueThatSkipsInvalidTimestamps.poll(0));
} }
@Test @Test

View File

@ -38,14 +38,15 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
@ -59,10 +60,13 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -87,7 +91,6 @@ import org.mockito.quality.Strictness;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -157,12 +160,12 @@ public class StreamTaskTest {
private final TopicPartition partition1 = new TopicPartition(topic1, 1); private final TopicPartition partition1 = new TopicPartition(topic1, 1);
private final TopicPartition partition2 = new TopicPartition(topic2, 1); private final TopicPartition partition2 = new TopicPartition(topic2, 1);
private final Set<TopicPartition> partitions = new HashSet<>(List.of(partition1, partition2)); private final Set<TopicPartition> partitions = new HashSet<>(List.of(partition1, partition2));
private final Serializer<Integer> intSerializer = Serdes.Integer().serializer(); private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<>(intDeserializer, intDeserializer) {
@Override @Override
public void process(final Record<Integer, Integer> record) { public void process(final Record<Integer, Integer> record) {
throw new RuntimeException("KABOOM!"); throw new RuntimeException("KABOOM!");
@ -173,7 +176,7 @@ public class StreamTaskTest {
throw new RuntimeException("KABOOM!"); throw new RuntimeException("KABOOM!");
} }
}; };
private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<>(intDeserializer, intDeserializer) {
@Override @Override
public void process(final Record<Integer, Integer> record) { public void process(final Record<Integer, Integer> record) {
throw new TimeoutException("Kaboom!"); throw new TimeoutException("Kaboom!");
@ -248,18 +251,54 @@ public class StreamTaskTest {
} }
private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) { private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) {
return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName()); return createConfig(
eosConfig,
enforcedProcessingValue,
LogAndFailExceptionHandler.class,
LogAndFailProcessingExceptionHandler.class,
FailOnInvalidTimestamp.class
);
} }
private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue, final String deserializationExceptionHandler) { private static StreamsConfig createConfig(final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler) {
return createConfig(eosConfig, enforcedProcessingValue, deserializationExceptionHandler, LogAndFailProcessingExceptionHandler.class.getName()); return createConfig(
AT_LEAST_ONCE,
"0", // max.task.idle.ms
deserializationExceptionHandler,
LogAndFailProcessingExceptionHandler.class,
FailOnInvalidTimestamp.class
);
}
private static StreamsConfig createConfigWithTsExtractor(final Class<? extends TimestampExtractor> timestampExtractor) {
return createConfig(
AT_LEAST_ONCE,
"0", // max.task.idle.ms
LogAndFailExceptionHandler.class,
LogAndFailProcessingExceptionHandler.class,
timestampExtractor
);
}
private static StreamsConfig createConfig(
final String enforcedProcessingValue,
final Class<? extends ProcessingExceptionHandler> processingExceptionHandler
) {
return createConfig(
AT_LEAST_ONCE,
enforcedProcessingValue,
LogAndFailExceptionHandler.class,
processingExceptionHandler,
FailOnInvalidTimestamp.class
);
} }
private static StreamsConfig createConfig( private static StreamsConfig createConfig(
final String eosConfig, final String eosConfig,
final String enforcedProcessingValue, final String enforcedProcessingValue,
final String deserializationExceptionHandler, final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler,
final String processingExceptionHandler) { final Class<? extends ProcessingExceptionHandler> processingExceptionHandler,
final Class<? extends TimestampExtractor> timestampExtractor) {
final String canonicalPath; final String canonicalPath;
try { try {
canonicalPath = BASE_DIR.getCanonicalPath(); canonicalPath = BASE_DIR.getCanonicalPath();
@ -275,8 +314,9 @@ public class StreamTaskTest {
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()), mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue), mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue),
mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler), mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler.getName()),
mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler) mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler.getName()),
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, timestampExtractor.getName())
))); )));
} }
@ -400,36 +440,38 @@ public class StreamTaskTest {
task.addPartitionsForOffsetReset(Collections.singleton(partition1)); task.addPartitionsForOffsetReset(Collections.singleton(partition1));
final AtomicReference<AssertionError> shouldNotSeek = new AtomicReference<>(); final AtomicReference<AssertionError> shouldNotSeek = new AtomicReference<>();
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { try (final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST) {
@Override @Override
public void seek(final TopicPartition partition, final long offset) { public void seek(final TopicPartition partition, final long offset) {
final AssertionError error = shouldNotSeek.get(); final AssertionError error = shouldNotSeek.get();
if (error != null) { if (error != null) {
throw error; throw error;
}
super.seek(partition, offset);
} }
super.seek(partition, offset); }) {
}
};
consumer.assign(asList(partition1, partition2));
consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
consumer.seek(partition1, 5L); consumer.assign(asList(partition1, partition2));
consumer.seek(partition2, 15L); consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
shouldNotSeek.set(new AssertionError("Should not seek")); consumer.seek(partition1, 5L);
consumer.seek(partition2, 15L);
// We need to keep a separate reference to the arguments of Consumer#accept shouldNotSeek.set(new AssertionError("Should not seek"));
// because the underlying data-structure is emptied and on verification time
// it is reported as empty.
final Set<TopicPartition> partitionsAtCall = new HashSet<>();
task.initializeIfNeeded(); // We need to keep a separate reference to the arguments of Consumer#accept
task.completeRestoration(partitionsAtCall::addAll); // because the underlying data-structure is emptied and on verification time
// it is reported as empty.
final Set<TopicPartition> partitionsAtCall = new HashSet<>();
// because we mocked the `resetter` positions don't change task.initializeIfNeeded();
assertThat(consumer.position(partition1), equalTo(5L)); task.completeRestoration(partitionsAtCall::addAll);
assertThat(consumer.position(partition2), equalTo(15L));
assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1))); // because we mocked the `resetter` positions don't change
assertThat(consumer.position(partition1), equalTo(5L));
assertThat(consumer.position(partition2), equalTo(15L));
assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1)));
}
} }
@Test @Test
@ -915,10 +957,9 @@ public class StreamTaskTest {
reporter.contextChange(metricsContext); reporter.contextChange(metricsContext);
metrics.addReporter(reporter); metrics.addReporter(reporter);
final String threadIdTag = THREAD_ID_TAG;
assertTrue(reporter.containsMbean(String.format( assertTrue(reporter.containsMbean(String.format(
"kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", "kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s",
threadIdTag, THREAD_ID_TAG,
threadId, threadId,
task.id() task.id()
))); )));
@ -1451,18 +1492,15 @@ public class StreamTaskTest {
assertFalse(task.process(0L)); assertFalse(task.process(0L));
final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
task.addRecords(partition1, singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
assertFalse(task.process(0L)); assertFalse(task.process(0L));
assertThat("task is idling", task.timeCurrentIdlingStarted().isPresent()); assertThat("task is idling", task.timeCurrentIdlingStarted().isPresent());
task.addRecords(partition2, singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
assertTrue(task.process(0L)); assertTrue(task.process(0L));
assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent()); assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
} }
@Test @Test
@ -1863,7 +1901,7 @@ public class StreamTaskTest {
consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L))); consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
final StreamsConfig config = createConfig(); final StreamsConfig config = createConfig();
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -2156,7 +2194,6 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
final long offset = 543L; final long offset = 543L;
final long consumedOffset = 345L;
when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset));
when(stateManager.changelogOffsets()) when(stateManager.changelogOffsets())
@ -2480,7 +2517,7 @@ public class StreamTaskTest {
@Test @Test
public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
createConfig("100"), createConfig("100"),
stateManager, stateManager,
@ -2547,21 +2584,118 @@ public class StreamTaskTest {
} }
@Test @Test
public void shouldUpdateOffsetIfAllRecordsAreCorrupted() { public void shouldUpdateOffsetIfAllRecordsHaveInvalidTimestamp() {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class));
AT_LEAST_ONCE, task.initializeIfNeeded();
"0", task.completeRestoration(noOpResetter -> { });
LogAndContinueExceptionHandler.class.getName()
)); long offset = -1L;
final List<ConsumerRecord<byte[], byte[]>> records = asList(
getConsumerRecordWithInvalidTimestamp(++offset),
getConsumerRecordWithInvalidTimestamp(++offset)
);
consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1));
task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
task.addRecords(partition1, records);
task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), ""));
task.updateLags();
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
task.prepareCommit(),
equalTo(mkMap(mkEntry(partition1,
new OffsetAndMetadata(offset + 1,
new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode()))))
);
}
@Test
public void shouldUpdateOffsetIfValidRecordFollowsInvalidTimestamp() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class));
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
long offset = -1L;
final List<ConsumerRecord<byte[], byte[]>> records = asList(
getConsumerRecordWithInvalidTimestamp(++offset),
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset)
);
consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1));
task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
task.addRecords(partition1, records);
task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), ""));
task.updateLags();
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
task.prepareCommit(),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))))
);
}
@Test
public void shouldUpdateOffsetIfInvalidTimestampeRecordFollowsValid() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class));
task.initializeIfNeeded(); task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); task.completeRestoration(noOpResetter -> { });
long offset = -1; long offset = -1;
final List<ConsumerRecord<byte[], byte[]>> records = asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
getConsumerRecordWithInvalidTimestamp(++offset));
consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1));
task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
task.addRecords(partition1, records);
task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), ""));
task.updateLags();
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
task.prepareCommit(),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
);
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
task.prepareCommit(),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
);
}
@Test
public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class));
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
long offset = -1L;
final List<ConsumerRecord<byte[], byte[]>> records = asList( final List<ConsumerRecord<byte[], byte[]>> records = asList(
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset), getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)); getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)
);
consumer.addRecord(records.get(0)); consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1)); consumer.addRecord(records.get(1));
task.resumePollingForPartitionsWithAvailableSpace(); task.resumePollingForPartitionsWithAvailableSpace();
@ -2584,19 +2718,16 @@ public class StreamTaskTest {
public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() { public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class));
AT_LEAST_ONCE,
"0",
LogAndContinueExceptionHandler.class.getName()
));
task.initializeIfNeeded(); task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); task.completeRestoration(noOpResetter -> { });
long offset = -1; long offset = -1L;
final List<ConsumerRecord<byte[], byte[]>> records = asList( final List<ConsumerRecord<byte[], byte[]>> records = asList(
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset), getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset)); getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset)
);
consumer.addRecord(records.get(0)); consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1)); consumer.addRecord(records.get(1));
task.resumePollingForPartitionsWithAvailableSpace(); task.resumePollingForPartitionsWithAvailableSpace();
@ -2617,18 +2748,16 @@ public class StreamTaskTest {
public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class));
AT_LEAST_ONCE,
"0",
LogAndContinueExceptionHandler.class.getName()));
task.initializeIfNeeded(); task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); task.completeRestoration(noOpResetter -> { });
long offset = -1; long offset = -1L;
final List<ConsumerRecord<byte[], byte[]>> records = asList( final List<ConsumerRecord<byte[], byte[]>> records = asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset), getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)); getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)
);
consumer.addRecord(records.get(0)); consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1)); consumer.addRecord(records.get(1));
task.resumePollingForPartitionsWithAvailableSpace(); task.resumePollingForPartitionsWithAvailableSpace();
@ -2682,10 +2811,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class
LogAndContinueProcessingExceptionHandler.class.getName()
)); ));
final StreamsException streamsException = assertThrows( final StreamsException streamsException = assertThrows(
@ -2704,10 +2831,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class
LogAndContinueProcessingExceptionHandler.class.getName()
)); ));
final Set<TaskId> tasksIds = new HashSet<>(); final Set<TaskId> tasksIds = new HashSet<>();
@ -2734,10 +2859,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class
LogAndContinueProcessingExceptionHandler.class.getName()
)); ));
final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause")); final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause"));
@ -2757,10 +2880,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class
LogAndContinueProcessingExceptionHandler.class.getName()
)); ));
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
@ -2773,10 +2894,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class
LogAndFailProcessingExceptionHandler.class.getName()
)); ));
final StreamsException streamsException = assertThrows( final StreamsException streamsException = assertThrows(
@ -2795,10 +2914,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), NullProcessingExceptionHandler.class
NullProcessingExceptionHandler.class.getName()
)); ));
final StreamsException streamsException = assertThrows( final StreamsException streamsException = assertThrows(
@ -2818,10 +2935,8 @@ public class StreamTaskTest {
when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig( task = createStatelessTask(createConfig(
AT_LEAST_ONCE,
"100", "100",
LogAndFailExceptionHandler.class.getName(), CrashingProcessingExceptionHandler.class
CrashingProcessingExceptionHandler.class.getName()
)); ));
final FailedProcessingException streamsException = assertThrows( final FailedProcessingException streamsException = assertThrows(
@ -2879,7 +2994,7 @@ public class StreamTaskTest {
singletonList(stateStore), singletonList(stateStore),
Collections.singletonMap(storeName, topic1)); Collections.singletonMap(storeName, topic1));
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -2921,7 +3036,7 @@ public class StreamTaskTest {
} }
}; };
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -2955,7 +3070,7 @@ public class StreamTaskTest {
emptyMap() emptyMap()
); );
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -2994,7 +3109,7 @@ public class StreamTaskTest {
singletonList(stateStore), singletonList(stateStore),
logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.emptyMap()); logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.emptyMap());
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -3029,7 +3144,7 @@ public class StreamTaskTest {
source1.addChild(processorStreamTime); source1.addChild(processorStreamTime);
source1.addChild(processorSystemTime); source1.addChild(processorSystemTime);
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -3066,7 +3181,7 @@ public class StreamTaskTest {
source1.addChild(processorSystemTime); source1.addChild(processorSystemTime);
source2.addChild(processorSystemTime); source2.addChild(processorSystemTime);
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -3102,7 +3217,7 @@ public class StreamTaskTest {
final StreamsConfig config = createConfig(); final StreamsConfig config = createConfig();
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -3135,7 +3250,7 @@ public class StreamTaskTest {
); );
final StreamsConfig config = createConfig(eosConfig, "0"); final StreamsConfig config = createConfig(eosConfig, "0");
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId, taskId,
config, config,
stateManager, stateManager,
@ -3212,6 +3327,22 @@ public class StreamTaskTest {
); );
} }
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithInvalidTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
1,
offset,
-1L, // invalid (negative) timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
);
}
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestampWithLeaderEpoch(final TopicPartition topicPartition, private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestampWithLeaderEpoch(final TopicPartition topicPartition,
final long offset, final long offset,
final int leaderEpoch) { final int leaderEpoch) {