mirror of https://github.com/apache/kafka.git
KAFKA-17872: Update consumed offsets on records with invalid timestamp (#17710)
TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress. Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
fb2d647116
commit
35c17ac0ae
|
@ -228,6 +228,7 @@ public class RecordQueue {
|
||||||
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
|
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
|
||||||
);
|
);
|
||||||
droppedRecordsSensor.record();
|
droppedRecordsSensor.record();
|
||||||
|
lastCorruptedRecord = raw;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
headRecord = new StampedRecord(deserialized, timestamp);
|
headRecord = new StampedRecord(deserialized, timestamp);
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.record.TimestampType;
|
||||||
import org.apache.kafka.common.serialization.Deserializer;
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.LongSerializer;
|
||||||
import org.apache.kafka.common.serialization.Serializer;
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
@ -75,8 +75,7 @@ public class RecordQueueTest {
|
||||||
private final StreamsMetricsImpl streamsMetrics =
|
private final StreamsMetricsImpl streamsMetrics =
|
||||||
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime());
|
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime());
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
|
||||||
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
|
|
||||||
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
|
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
|
||||||
new MockRecordCollector(),
|
new MockRecordCollector(),
|
||||||
metrics
|
metrics
|
||||||
|
@ -89,19 +88,28 @@ public class RecordQueueTest {
|
||||||
timestampExtractor,
|
timestampExtractor,
|
||||||
new LogAndFailExceptionHandler(),
|
new LogAndFailExceptionHandler(),
|
||||||
context,
|
context,
|
||||||
new LogContext());
|
new LogContext()
|
||||||
|
);
|
||||||
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
|
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
|
||||||
new TopicPartition("topic", 1),
|
new TopicPartition("topic", 1),
|
||||||
mockSourceNodeWithMetrics,
|
mockSourceNodeWithMetrics,
|
||||||
timestampExtractor,
|
timestampExtractor,
|
||||||
new LogAndContinueExceptionHandler(),
|
new LogAndContinueExceptionHandler(),
|
||||||
context,
|
context,
|
||||||
new LogContext());
|
new LogContext()
|
||||||
|
);
|
||||||
|
private final RecordQueue queueThatSkipsInvalidTimestamps = new RecordQueue(
|
||||||
|
new TopicPartition("topic", 1),
|
||||||
|
mockSourceNodeWithMetrics,
|
||||||
|
new LogAndSkipOnInvalidTimestamp(),
|
||||||
|
new LogAndFailExceptionHandler(),
|
||||||
|
context,
|
||||||
|
new LogContext()
|
||||||
|
);
|
||||||
|
|
||||||
private final byte[] recordValue = intSerializer.serialize(null, 10);
|
private final byte[] recordValue = intSerializer.serialize(null, 10);
|
||||||
private final byte[] recordKey = intSerializer.serialize(null, 1);
|
private final byte[] recordKey = intSerializer.serialize(null, 1);
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Before
|
@Before
|
||||||
public void before() {
|
public void before() {
|
||||||
mockSourceNodeWithMetrics.init(context);
|
mockSourceNodeWithMetrics.init(context);
|
||||||
|
@ -328,7 +336,7 @@ public class RecordQueueTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
|
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
|
||||||
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
|
final byte[] key = new LongSerializer().serialize("foo", 1L);
|
||||||
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
|
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
|
||||||
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
|
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
|
||||||
new RecordHeaders(), Optional.empty()));
|
new RecordHeaders(), Optional.empty()));
|
||||||
|
@ -342,7 +350,7 @@ public class RecordQueueTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
|
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
|
||||||
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
|
final byte[] value = new LongSerializer().serialize("foo", 1L);
|
||||||
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
|
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
|
||||||
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
|
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
|
||||||
new RecordHeaders(), Optional.empty()));
|
new RecordHeaders(), Optional.empty()));
|
||||||
|
@ -356,7 +364,7 @@ public class RecordQueueTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
|
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
|
||||||
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
|
final byte[] key = new LongSerializer().serialize("foo", 1L);
|
||||||
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
|
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
|
||||||
TimestampType.CREATE_TIME, 0, 0, key, recordValue,
|
TimestampType.CREATE_TIME, 0, 0, key, recordValue,
|
||||||
new RecordHeaders(), Optional.empty());
|
new RecordHeaders(), Optional.empty());
|
||||||
|
@ -369,7 +377,7 @@ public class RecordQueueTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
|
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
|
||||||
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
|
final byte[] value = new LongSerializer().serialize("foo", 1L);
|
||||||
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
|
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
|
||||||
TimestampType.CREATE_TIME, 0, 0, recordKey, value,
|
TimestampType.CREATE_TIME, 0, 0, recordKey, value,
|
||||||
new RecordHeaders(), Optional.empty());
|
new RecordHeaders(), Optional.empty());
|
||||||
|
@ -392,7 +400,7 @@ public class RecordQueueTest {
|
||||||
mockSourceNodeWithMetrics,
|
mockSourceNodeWithMetrics,
|
||||||
new FailOnInvalidTimestamp(),
|
new FailOnInvalidTimestamp(),
|
||||||
new LogAndContinueExceptionHandler(),
|
new LogAndContinueExceptionHandler(),
|
||||||
new InternalMockProcessorContext(),
|
new InternalMockProcessorContext<>(),
|
||||||
new LogContext());
|
new LogContext());
|
||||||
|
|
||||||
final StreamsException exception = assertThrows(
|
final StreamsException exception = assertThrows(
|
||||||
|
@ -409,20 +417,25 @@ public class RecordQueueTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldDropOnNegativeTimestamp() {
|
public void shouldDropOnNegativeTimestamp() {
|
||||||
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
|
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
|
||||||
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
|
"topic",
|
||||||
new RecordHeaders(), Optional.empty()));
|
1,
|
||||||
|
1,
|
||||||
|
-1L, // negative timestamp
|
||||||
|
TimestampType.CREATE_TIME,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
recordKey,
|
||||||
|
recordValue,
|
||||||
|
new RecordHeaders(),
|
||||||
|
Optional.empty()
|
||||||
|
);
|
||||||
|
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(record);
|
||||||
|
|
||||||
final RecordQueue queue = new RecordQueue(
|
queueThatSkipsInvalidTimestamps.addRawRecords(records);
|
||||||
new TopicPartition("topic", 1),
|
|
||||||
mockSourceNodeWithMetrics,
|
|
||||||
new LogAndSkipOnInvalidTimestamp(),
|
|
||||||
new LogAndContinueExceptionHandler(),
|
|
||||||
new InternalMockProcessorContext(),
|
|
||||||
new LogContext());
|
|
||||||
queue.addRawRecords(records);
|
|
||||||
|
|
||||||
assertEquals(0, queue.size());
|
assertEquals(1, queueThatSkipsInvalidTimestamps.size());
|
||||||
|
assertEquals(new CorruptedRecord(record), queueThatSkipsInvalidTimestamps.poll(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -37,14 +37,15 @@ import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.metrics.stats.CumulativeSum;
|
import org.apache.kafka.common.metrics.stats.CumulativeSum;
|
||||||
import org.apache.kafka.common.record.TimestampType;
|
import org.apache.kafka.common.record.TimestampType;
|
||||||
import org.apache.kafka.common.serialization.Deserializer;
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
|
||||||
import org.apache.kafka.common.serialization.Serializer;
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.TopologyConfig;
|
import org.apache.kafka.streams.TopologyConfig;
|
||||||
|
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.LockException;
|
import org.apache.kafka.streams.errors.LockException;
|
||||||
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
||||||
|
@ -53,10 +54,13 @@ import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
||||||
import org.apache.kafka.streams.errors.TaskMigratedException;
|
import org.apache.kafka.streams.errors.TaskMigratedException;
|
||||||
import org.apache.kafka.streams.errors.TopologyException;
|
import org.apache.kafka.streams.errors.TopologyException;
|
||||||
|
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
||||||
|
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
|
||||||
import org.apache.kafka.streams.processor.PunctuationType;
|
import org.apache.kafka.streams.processor.PunctuationType;
|
||||||
import org.apache.kafka.streams.processor.Punctuator;
|
import org.apache.kafka.streams.processor.Punctuator;
|
||||||
import org.apache.kafka.streams.processor.StateStore;
|
import org.apache.kafka.streams.processor.StateStore;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
|
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||||
import org.apache.kafka.streams.processor.api.Record;
|
import org.apache.kafka.streams.processor.api.Record;
|
||||||
import org.apache.kafka.streams.processor.internals.Task.TaskType;
|
import org.apache.kafka.streams.processor.internals.Task.TaskType;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
|
@ -81,7 +85,6 @@ import org.mockito.quality.Strictness;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -145,8 +148,8 @@ public class StreamTaskTest {
|
||||||
private final TopicPartition partition1 = new TopicPartition(topic1, 1);
|
private final TopicPartition partition1 = new TopicPartition(topic1, 1);
|
||||||
private final TopicPartition partition2 = new TopicPartition(topic2, 1);
|
private final TopicPartition partition2 = new TopicPartition(topic2, 1);
|
||||||
private final Set<TopicPartition> partitions = mkSet(partition1, partition2);
|
private final Set<TopicPartition> partitions = mkSet(partition1, partition2);
|
||||||
private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
|
private final Serializer<Integer> intSerializer = new IntegerSerializer();
|
||||||
private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
|
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
|
||||||
|
|
||||||
private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
|
private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
|
||||||
private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
|
private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
|
||||||
|
@ -235,13 +238,37 @@ public class StreamTaskTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) {
|
private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) {
|
||||||
return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
|
return createConfig(
|
||||||
|
eosConfig,
|
||||||
|
enforcedProcessingValue,
|
||||||
|
LogAndFailExceptionHandler.class,
|
||||||
|
FailOnInvalidTimestamp.class
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamsConfig createConfig(final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler) {
|
||||||
|
return createConfig(
|
||||||
|
AT_LEAST_ONCE,
|
||||||
|
"0", // max.task.idle.ms
|
||||||
|
deserializationExceptionHandler,
|
||||||
|
FailOnInvalidTimestamp.class
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamsConfig createConfigWithTsExtractor(final Class<? extends TimestampExtractor> timestampExtractor) {
|
||||||
|
return createConfig(
|
||||||
|
AT_LEAST_ONCE,
|
||||||
|
"0", // max.task.idle.ms
|
||||||
|
LogAndFailExceptionHandler.class,
|
||||||
|
timestampExtractor
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StreamsConfig createConfig(
|
private static StreamsConfig createConfig(
|
||||||
final String eosConfig,
|
final String eosConfig,
|
||||||
final String enforcedProcessingValue,
|
final String enforcedProcessingValue,
|
||||||
final String deserializationExceptionHandler) {
|
final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler,
|
||||||
|
final Class<? extends TimestampExtractor> timestampExtractor) {
|
||||||
final String canonicalPath;
|
final String canonicalPath;
|
||||||
try {
|
try {
|
||||||
canonicalPath = BASE_DIR.getCanonicalPath();
|
canonicalPath = BASE_DIR.getCanonicalPath();
|
||||||
|
@ -257,7 +284,8 @@ public class StreamTaskTest {
|
||||||
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
|
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
|
||||||
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
|
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
|
||||||
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue),
|
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue),
|
||||||
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
|
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler.getName()),
|
||||||
|
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, timestampExtractor.getName())
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -391,37 +419,39 @@ public class StreamTaskTest {
|
||||||
task.addPartitionsForOffsetReset(Collections.singleton(partition1));
|
task.addPartitionsForOffsetReset(Collections.singleton(partition1));
|
||||||
|
|
||||||
final AtomicReference<AssertionError> shouldNotSeek = new AtomicReference<>();
|
final AtomicReference<AssertionError> shouldNotSeek = new AtomicReference<>();
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
|
try (final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
|
||||||
@Override
|
@Override
|
||||||
public void seek(final TopicPartition partition, final long offset) {
|
public void seek(final TopicPartition partition, final long offset) {
|
||||||
final AssertionError error = shouldNotSeek.get();
|
final AssertionError error = shouldNotSeek.get();
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
throw error;
|
throw error;
|
||||||
|
}
|
||||||
|
super.seek(partition, offset);
|
||||||
}
|
}
|
||||||
super.seek(partition, offset);
|
}) {
|
||||||
}
|
|
||||||
};
|
|
||||||
consumer.assign(asList(partition1, partition2));
|
|
||||||
consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
|
|
||||||
|
|
||||||
consumer.seek(partition1, 5L);
|
consumer.assign(asList(partition1, partition2));
|
||||||
consumer.seek(partition2, 15L);
|
consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
|
||||||
|
|
||||||
shouldNotSeek.set(new AssertionError("Should not seek"));
|
consumer.seek(partition1, 5L);
|
||||||
|
consumer.seek(partition2, 15L);
|
||||||
|
|
||||||
final java.util.function.Consumer<Set<TopicPartition>> resetter =
|
shouldNotSeek.set(new AssertionError("Should not seek"));
|
||||||
EasyMock.mock(java.util.function.Consumer.class);
|
|
||||||
resetter.accept(Collections.singleton(partition1));
|
|
||||||
EasyMock.expectLastCall();
|
|
||||||
EasyMock.replay(resetter);
|
|
||||||
|
|
||||||
task.initializeIfNeeded();
|
final java.util.function.Consumer<Set<TopicPartition>> resetter =
|
||||||
task.completeRestoration(resetter);
|
EasyMock.mock(java.util.function.Consumer.class);
|
||||||
|
resetter.accept(Collections.singleton(partition1));
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
EasyMock.replay(resetter);
|
||||||
|
|
||||||
// because we mocked the `resetter` positions don't change
|
task.initializeIfNeeded();
|
||||||
assertThat(consumer.position(partition1), equalTo(5L));
|
task.completeRestoration(resetter);
|
||||||
assertThat(consumer.position(partition2), equalTo(15L));
|
|
||||||
EasyMock.verify(resetter);
|
// because we mocked the `resetter` positions don't change
|
||||||
|
assertThat(consumer.position(partition1), equalTo(5L));
|
||||||
|
assertThat(consumer.position(partition2), equalTo(15L));
|
||||||
|
EasyMock.verify(resetter);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -909,10 +939,9 @@ public class StreamTaskTest {
|
||||||
reporter.contextChange(metricsContext);
|
reporter.contextChange(metricsContext);
|
||||||
|
|
||||||
metrics.addReporter(reporter);
|
metrics.addReporter(reporter);
|
||||||
final String threadIdTag = THREAD_ID_TAG;
|
|
||||||
assertTrue(reporter.containsMbean(String.format(
|
assertTrue(reporter.containsMbean(String.format(
|
||||||
"kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s",
|
"kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s",
|
||||||
threadIdTag,
|
THREAD_ID_TAG,
|
||||||
threadId,
|
threadId,
|
||||||
task.id()
|
task.id()
|
||||||
)));
|
)));
|
||||||
|
@ -1415,18 +1444,15 @@ public class StreamTaskTest {
|
||||||
|
|
||||||
assertFalse(task.process(0L));
|
assertFalse(task.process(0L));
|
||||||
|
|
||||||
final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
|
task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
|
||||||
|
|
||||||
task.addRecords(partition1, singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
|
|
||||||
|
|
||||||
assertFalse(task.process(0L));
|
assertFalse(task.process(0L));
|
||||||
assertThat("task is idling", task.timeCurrentIdlingStarted().isPresent());
|
assertThat("task is idling", task.timeCurrentIdlingStarted().isPresent());
|
||||||
|
|
||||||
task.addRecords(partition2, singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
|
task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
|
||||||
|
|
||||||
assertTrue(task.process(0L));
|
assertTrue(task.process(0L));
|
||||||
assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
|
assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1827,7 +1853,7 @@ public class StreamTaskTest {
|
||||||
EasyMock.replay(stateManager, recordCollector);
|
EasyMock.replay(stateManager, recordCollector);
|
||||||
|
|
||||||
final StreamsConfig config = createConfig();
|
final StreamsConfig config = createConfig();
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2484,7 +2510,7 @@ public class StreamTaskTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
|
public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
createConfig("100"),
|
createConfig("100"),
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2548,13 +2574,98 @@ public class StreamTaskTest {
|
||||||
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
|
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldUpdateOffsetIfAllRecordsHaveInvalidTimestamp() {
|
||||||
|
task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class));
|
||||||
|
task.initializeIfNeeded();
|
||||||
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
long offset = -1L;
|
||||||
|
|
||||||
|
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
||||||
|
getConsumerRecordWithInvalidTimestamp(++offset),
|
||||||
|
getConsumerRecordWithInvalidTimestamp(++offset)
|
||||||
|
);
|
||||||
|
consumer.addRecord(records.get(0));
|
||||||
|
consumer.addRecord(records.get(1));
|
||||||
|
task.resumePollingForPartitionsWithAvailableSpace();
|
||||||
|
consumer.poll(Duration.ZERO);
|
||||||
|
task.addRecords(partition1, records);
|
||||||
|
task.updateLags();
|
||||||
|
|
||||||
|
assertTrue(task.process(offset));
|
||||||
|
assertTrue(task.commitNeeded());
|
||||||
|
assertThat(
|
||||||
|
task.prepareCommit(),
|
||||||
|
equalTo(mkMap(mkEntry(partition1,
|
||||||
|
new OffsetAndMetadata(offset + 1,
|
||||||
|
new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode()))))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldUpdateOffsetIfValidRecordFollowsInvalidTimestamp() {
|
||||||
|
task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class));
|
||||||
|
task.initializeIfNeeded();
|
||||||
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
long offset = -1L;
|
||||||
|
|
||||||
|
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
||||||
|
getConsumerRecordWithInvalidTimestamp(++offset),
|
||||||
|
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset)
|
||||||
|
);
|
||||||
|
consumer.addRecord(records.get(0));
|
||||||
|
consumer.addRecord(records.get(1));
|
||||||
|
task.resumePollingForPartitionsWithAvailableSpace();
|
||||||
|
consumer.poll(Duration.ZERO);
|
||||||
|
task.addRecords(partition1, records);
|
||||||
|
task.updateLags();
|
||||||
|
|
||||||
|
assertTrue(task.process(offset));
|
||||||
|
assertTrue(task.commitNeeded());
|
||||||
|
assertThat(
|
||||||
|
task.prepareCommit(),
|
||||||
|
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldUpdateOffsetIfInvalidTimestampeRecordFollowsValid() {
|
||||||
|
task = createStatelessTask(createConfigWithTsExtractor(LogAndSkipOnInvalidTimestamp.class));
|
||||||
|
task.initializeIfNeeded();
|
||||||
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
long offset = -1;
|
||||||
|
|
||||||
|
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
||||||
|
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
|
||||||
|
getConsumerRecordWithInvalidTimestamp(++offset));
|
||||||
|
consumer.addRecord(records.get(0));
|
||||||
|
consumer.addRecord(records.get(1));
|
||||||
|
task.resumePollingForPartitionsWithAvailableSpace();
|
||||||
|
consumer.poll(Duration.ZERO);
|
||||||
|
task.addRecords(partition1, records);
|
||||||
|
task.updateLags();
|
||||||
|
|
||||||
|
assertTrue(task.process(offset));
|
||||||
|
assertTrue(task.commitNeeded());
|
||||||
|
assertThat(
|
||||||
|
task.prepareCommit(),
|
||||||
|
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
|
||||||
|
);
|
||||||
|
|
||||||
|
assertTrue(task.process(offset));
|
||||||
|
assertTrue(task.commitNeeded());
|
||||||
|
assertThat(
|
||||||
|
task.prepareCommit(),
|
||||||
|
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
|
public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
|
||||||
task = createStatelessTask(createConfig(
|
task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class));
|
||||||
AT_LEAST_ONCE,
|
|
||||||
"0",
|
|
||||||
LogAndContinueExceptionHandler.class.getName()
|
|
||||||
));
|
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
|
@ -2581,19 +2692,16 @@ public class StreamTaskTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
|
public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
|
||||||
task = createStatelessTask(createConfig(
|
task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class));
|
||||||
AT_LEAST_ONCE,
|
|
||||||
"0",
|
|
||||||
LogAndContinueExceptionHandler.class.getName()
|
|
||||||
));
|
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
long offset = -1;
|
long offset = -1L;
|
||||||
|
|
||||||
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
||||||
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
|
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
|
||||||
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
|
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset)
|
||||||
|
);
|
||||||
consumer.addRecord(records.get(0));
|
consumer.addRecord(records.get(0));
|
||||||
consumer.addRecord(records.get(1));
|
consumer.addRecord(records.get(1));
|
||||||
task.resumePollingForPartitionsWithAvailableSpace();
|
task.resumePollingForPartitionsWithAvailableSpace();
|
||||||
|
@ -2611,18 +2719,16 @@ public class StreamTaskTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
|
public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
|
||||||
task = createStatelessTask(createConfig(
|
task = createStatelessTask(createConfig(LogAndContinueExceptionHandler.class));
|
||||||
AT_LEAST_ONCE,
|
|
||||||
"0",
|
|
||||||
LogAndContinueExceptionHandler.class.getName()));
|
|
||||||
task.initializeIfNeeded();
|
task.initializeIfNeeded();
|
||||||
task.completeRestoration(noOpResetter -> { });
|
task.completeRestoration(noOpResetter -> { });
|
||||||
|
|
||||||
long offset = -1;
|
long offset = -1L;
|
||||||
|
|
||||||
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
final List<ConsumerRecord<byte[], byte[]>> records = asList(
|
||||||
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
|
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
|
||||||
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
|
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset)
|
||||||
|
);
|
||||||
consumer.addRecord(records.get(0));
|
consumer.addRecord(records.get(0));
|
||||||
consumer.addRecord(records.get(1));
|
consumer.addRecord(records.get(1));
|
||||||
task.resumePollingForPartitionsWithAvailableSpace();
|
task.resumePollingForPartitionsWithAvailableSpace();
|
||||||
|
@ -2690,7 +2796,7 @@ public class StreamTaskTest {
|
||||||
singletonList(stateStore),
|
singletonList(stateStore),
|
||||||
Collections.singletonMap(storeName, topic1));
|
Collections.singletonMap(storeName, topic1));
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2732,7 +2838,7 @@ public class StreamTaskTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2766,7 +2872,7 @@ public class StreamTaskTest {
|
||||||
emptyMap()
|
emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2805,7 +2911,7 @@ public class StreamTaskTest {
|
||||||
singletonList(stateStore),
|
singletonList(stateStore),
|
||||||
logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.emptyMap());
|
logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.emptyMap());
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2846,7 +2952,7 @@ public class StreamTaskTest {
|
||||||
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
|
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
|
||||||
EasyMock.replay(stateManager, recordCollector);
|
EasyMock.replay(stateManager, recordCollector);
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2888,7 +2994,7 @@ public class StreamTaskTest {
|
||||||
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
|
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
|
||||||
EasyMock.replay(stateManager, recordCollector);
|
EasyMock.replay(stateManager, recordCollector);
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2928,7 +3034,7 @@ public class StreamTaskTest {
|
||||||
|
|
||||||
final StreamsConfig config = createConfig();
|
final StreamsConfig config = createConfig();
|
||||||
|
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -2963,7 +3069,7 @@ public class StreamTaskTest {
|
||||||
);
|
);
|
||||||
|
|
||||||
final StreamsConfig config = createConfig(eosConfig, "0");
|
final StreamsConfig config = createConfig(eosConfig, "0");
|
||||||
final InternalProcessorContext context = new ProcessorContextImpl(
|
final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
|
||||||
taskId,
|
taskId,
|
||||||
config,
|
config,
|
||||||
stateManager,
|
stateManager,
|
||||||
|
@ -3040,6 +3146,22 @@ public class StreamTaskTest {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithInvalidTimestamp(final long offset) {
|
||||||
|
return new ConsumerRecord<>(
|
||||||
|
topic1,
|
||||||
|
1,
|
||||||
|
offset,
|
||||||
|
-1L, // invalid (negative) timestamp
|
||||||
|
TimestampType.CREATE_TIME,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
recordKey,
|
||||||
|
recordValue,
|
||||||
|
new RecordHeaders(),
|
||||||
|
Optional.empty()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) {
|
private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) {
|
||||||
return new ConsumerRecord<>(
|
return new ConsumerRecord<>(
|
||||||
topic1,
|
topic1,
|
||||||
|
|
Loading…
Reference in New Issue