KAFKA-5490; Skip empty record batches in the consumer

The actual fix for KAFKA-5490 is in
https://github.com/apache/kafka/pull/3406.

This is just the consumer change that will allow the cleaner
to use empty record batches without breaking 0.11.0.0
consumers (assuming that KAFKA-5490 does not make the cut).
This is a safe change even if we decide to go with a different option
for KAFKA-5490 and I'd like to include it in RC2.

Author: Jason Gustafson <jason@confluent.io>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3408 from ijuma/kafka-5490-consumer-should-skip-empty-batches
This commit is contained in:
Jason Gustafson 2017-06-22 16:54:28 +01:00 committed by Ismael Juma
parent 1744a9b4c2
commit fc58ac594f
4 changed files with 102 additions and 17 deletions

View File

@ -1061,21 +1061,21 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
} }
records = currentBatch.streamingIterator(decompressionBufferSupplier); records = currentBatch.streamingIterator(decompressionBufferSupplier);
} } else {
Record record = records.next();
lastRecord = record;
// skip any records out of range
if (record.offset() >= nextFetchOffset) {
// we only do validation when the message should not be skipped.
maybeEnsureValid(record);
Record record = records.next(); // control records are not returned to the user
lastRecord = record; if (!currentBatch.isControlBatch()) {
// skip any records out of range return record;
if (record.offset() >= nextFetchOffset) { } else {
// we only do validation when the message should not be skipped. // Increment the next fetch offset when we skip a control batch.
maybeEnsureValid(record); nextFetchOffset = record.offset() + 1;
}
// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
} }
} }
} }

View File

@ -30,6 +30,7 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -187,9 +188,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
int baseSequence = baseSequence(); int baseSequence = baseSequence();
if (baseSequence == RecordBatch.NO_SEQUENCE) if (baseSequence == RecordBatch.NO_SEQUENCE)
return RecordBatch.NO_SEQUENCE; return RecordBatch.NO_SEQUENCE;
return incrementSequence(baseSequence, lastOffsetDelta());
int delta = lastOffsetDelta();
return incrementSequence(baseSequence, delta);
} }
@Override @Override
@ -297,6 +296,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
@Override @Override
public Iterator<Record> iterator() { public Iterator<Record> iterator() {
if (count() == 0)
return Collections.emptyIterator();
if (!isCompressed()) if (!isCompressed())
return uncompressedIterator(); return uncompressedIterator();
@ -393,6 +395,24 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return attributes; return attributes;
} }
public static void writeEmptyHeader(ByteBuffer buffer,
byte magic,
long producerId,
short producerEpoch,
int baseSequence,
long baseOffset,
long lastOffset,
int partitionLeaderEpoch,
TimestampType timestampType,
long timestamp,
boolean isTransactional,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, timestamp, timestamp, producerId, producerEpoch, baseSequence,
isTransactional, isControlRecord, partitionLeaderEpoch, 0);
}
static void writeHeader(ByteBuffer buffer, static void writeHeader(ByteBuffer buffer,
long baseOffset, long baseOffset,
int lastOffsetDelta, int lastOffsetDelta,
@ -431,6 +451,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords); buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);
long crc = Crc32C.compute(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET); long crc = Crc32C.compute(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET);
buffer.putInt(position + CRC_OFFSET, (int) crc); buffer.putInt(position + CRC_OFFSET, (int) crc);
buffer.position(position + RECORD_BATCH_OVERHEAD);
} }
@Override @Override

View File

@ -1668,6 +1668,35 @@ public class FetcherTest {
assertEquals(4L, subscriptions.position(tp1).longValue()); assertEquals(4L, subscriptions.position(tp1).longValue());
} }
@Test
public void testUpdatePositionOnEmptyBatch() {
long producerId = 1;
short producerEpoch = 0;
int sequence = 1;
long baseOffset = 37;
long lastOffset = 54;
int partitionLeaderEpoch = 7;
ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
sequence, baseOffset, lastOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME,
System.currentTimeMillis(), false, false);
buffer.flip();
MemoryRecords recordsWithEmptyBatch = MemoryRecords.readableRecords(buffer);
subscriptions.assignFromUser(singleton(tp1));
subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(tp1, recordsWithEmptyBatch, Errors.NONE, 100L, 0));
consumerClient.poll(0);
assertTrue(fetcher.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> allFetchedRecords = fetcher.fetchedRecords();
assertTrue(allFetchedRecords.isEmpty());
// The next offset should point to the next batch
assertEquals(lastOffset + 1, subscriptions.position(tp1).longValue());
}
@Test @Test
public void testReadCommittedWithCompactedTopic() { public void testReadCommittedWithCompactedTopic() {
Fetcher<String, String> fetcher = createFetcher(subscriptions, new Metrics(), new StringDeserializer(), Fetcher<String, String> fetcher = createFetcher(subscriptions, new Metrics(), new StringDeserializer(),

View File

@ -34,6 +34,41 @@ import static org.junit.Assert.assertTrue;
public class DefaultRecordBatchTest { public class DefaultRecordBatchTest {
@Test
public void testWriteEmptyHeader() {
long producerId = 23423L;
short producerEpoch = 145;
int baseSequence = 983;
long baseOffset = 15L;
long lastOffset = 37;
int partitionLeaderEpoch = 15;
long timestamp = System.currentTimeMillis();
for (TimestampType timestampType : Arrays.asList(TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME)) {
for (boolean isTransactional : Arrays.asList(true, false)) {
for (boolean isControlBatch : Arrays.asList(true, false)) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId,
producerEpoch, baseSequence, baseOffset, lastOffset, partitionLeaderEpoch, timestampType,
timestamp, isTransactional, isControlBatch);
buffer.flip();
DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
assertEquals(producerId, batch.producerId());
assertEquals(producerEpoch, batch.producerEpoch());
assertEquals(baseSequence, batch.baseSequence());
assertEquals(baseSequence + ((int) (lastOffset - baseOffset)), batch.lastSequence());
assertEquals(baseOffset, batch.baseOffset());
assertEquals(lastOffset, batch.lastOffset());
assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
assertEquals(isTransactional, batch.isTransactional());
assertEquals(timestampType, batch.timestampType());
assertEquals(timestamp, batch.maxTimestamp());
assertEquals(isControlBatch, batch.isControlBatch());
}
}
}
}
@Test @Test
public void buildDefaultRecordBatch() { public void buildDefaultRecordBatch() {
ByteBuffer buffer = ByteBuffer.allocate(2048); ByteBuffer buffer = ByteBuffer.allocate(2048);