KAFKA-17299: add unit tests for previous fix (#17919)

https://github.com/apache/kafka/pull/17899 fixed the issue, but did not
add any unit tests.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-11-25 12:03:57 -08:00
parent 9b68b6eb9b
commit b1108cd0ba
3 changed files with 155 additions and 6 deletions

View File

@ -248,10 +248,11 @@ class PartitionGroup extends AbstractPartitionGroup {
if (queue != null) {
// get the first record from this queue.
final int oldSize = queue.size();
record = queue.poll(wallClockTime);
if (record != null) {
--totalBuffered;
totalBuffered -= oldSize - queue.size();
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag

View File

@ -19,9 +19,11 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@ -43,6 +45,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.UUID;
@ -558,6 +561,88 @@ public class PartitionGroupTest {
assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), nullValue()); // all available records removed
}
@Test
public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() {
final PartitionGroup group = new PartitionGroup(
logContext,
mkMap(mkEntry(partition1, queue1)),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
enforcedProcessingSensor,
maxTaskIdleMs
);
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue),
new ConsumerRecord<>(
"topic",
1,
-1, // offset as invalid timestamp
-1, // invalid timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>(
"topic",
1,
11,
0,
TimestampType.CREATE_TIME,
0,
0,
new byte[0], // corrupted key
recordValue,
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>(
"topic",
1,
-1, // offset as invalid timestamp
-1, // invalid timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>(
"topic",
1,
13,
0,
TimestampType.CREATE_TIME,
0,
0,
recordKey,
new byte[0], // corrupted value
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue)
);
group.addRawRecords(partition1, list1);
assertEquals(7, group.numBuffered());
group.nextRecord(new RecordInfo(), time.milliseconds());
assertEquals(6, group.numBuffered());
// drain corrupted records
group.nextRecord(new RecordInfo(), time.milliseconds());
assertEquals(1, group.numBuffered());
group.nextRecord(new RecordInfo(), time.milliseconds());
assertEquals(0, group.numBuffered());
}
@Test
public void shouldNeverWaitIfIdlingIsDisabled() {
final PartitionGroup group = new PartitionGroup(

View File

@ -145,8 +145,8 @@ public class StreamTaskTest {
private final LogContext logContext = new LogContext("[test] ");
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final TopicPartition partition1 = new TopicPartition(topic1, 1);
private final TopicPartition partition2 = new TopicPartition(topic2, 1);
private final TopicPartition partition1 = new TopicPartition(topic1, 0);
private final TopicPartition partition2 = new TopicPartition(topic2, 0);
private final Set<TopicPartition> partitions = mkSet(partition1, partition2);
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
@ -1042,6 +1042,69 @@ public class StreamTaskTest {
assertEquals(0, consumer.paused().size());
}
@Test
public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(
StreamsConfig.AT_LEAST_ONCE,
"-1",
LogAndContinueExceptionHandler.class,
LogAndSkipOnInvalidTimestamp.class
));
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
getConsumerRecordWithInvalidTimestamp(30),
getConsumerRecordWithInvalidTimestamp(40),
getConsumerRecordWithInvalidTimestamp(50)
));
assertTrue(consumer.paused().contains(partition1));
assertTrue(task.process(0L));
task.resumePollingForPartitionsWithAvailableSpace();
assertTrue(consumer.paused().contains(partition1));
assertTrue(task.process(0L));
task.resumePollingForPartitionsWithAvailableSpace();
assertEquals(0, consumer.paused().size());
assertTrue(task.process(0L)); // drain head record (ie, last invalid record)
assertFalse(task.process(0L));
assertFalse(task.hasRecordsQueued());
// repeat test for deserialization error
task.resumePollingForPartitionsWithAvailableSpace();
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 110),
getConsumerRecordWithOffsetAsTimestamp(partition1, 120),
getCorruptedConsumerRecordWithOffsetAsTimestamp(130),
getCorruptedConsumerRecordWithOffsetAsTimestamp(140),
getCorruptedConsumerRecordWithOffsetAsTimestamp(150)
));
assertTrue(consumer.paused().contains(partition1));
assertTrue(task.process(0L));
task.resumePollingForPartitionsWithAvailableSpace();
assertTrue(consumer.paused().contains(partition1));
assertTrue(task.process(0L));
task.resumePollingForPartitionsWithAvailableSpace();
assertEquals(0, consumer.paused().size());
assertTrue(task.process(0L)); // drain head record (ie, last corrupted record)
assertFalse(task.process(0L));
assertFalse(task.hasRecordsQueued());
}
@Test
public void shouldPunctuateOnceStreamTimeAfterGap() {
task = createStatelessTask(createConfig());
@ -2943,7 +3006,7 @@ public class StreamTaskTest {
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) {
return new ConsumerRecord<>(
topic1,
1,
0,
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
@ -2959,7 +3022,7 @@ public class StreamTaskTest {
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithInvalidTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
1,
0,
offset,
-1L, // invalid (negative) timestamp
TimestampType.CREATE_TIME,
@ -2975,7 +3038,7 @@ public class StreamTaskTest {
private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
1,
0,
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,