KAFKA-5316; LogCleaner should account for larger record sets after cleaning

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #3142 from hachikuji/KAFKA-5316
This commit is contained in:
Jason Gustafson 2017-05-28 09:57:59 -07:00
parent b50387eb7c
commit dfa3c8a92d
22 changed files with 492 additions and 214 deletions

View File

@ -105,7 +105,7 @@
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
<property name="max" value="17"/>
<property name="max" value="20"/>
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->

View File

@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.producer.internals;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
@ -27,23 +24,24 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
@ -119,9 +117,9 @@ public final class ProducerBatch {
}
/**
+ * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
+ * @return true if the record has been successfully appended, false otherwise.
+ */
* This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
* @return true if the record has been successfully appended, false otherwise.
*/
private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
return false;
@ -196,15 +194,13 @@ public final class ProducerBatch {
assert thunkIter.hasNext();
Thunk thunk = thunkIter.next();
if (batch == null) {
batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
record, splitBatchSize, this.createdMs);
batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
}
// A newly created batch can always host the first message.
if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
batches.add(batch);
batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
record, splitBatchSize, this.createdMs);
batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
}
}
@ -217,30 +213,13 @@ public final class ProducerBatch {
return batches;
}
private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp,
CompressionType compressionType,
Record record,
int batchSize,
long createdMs) {
int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(),
record.key(),
record.value(),
record.headers()),
batchSize);
return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs);
}
// package private for testing purpose.
static ProducerBatch createBatchOffAccumulator(TopicPartition tp,
CompressionType compressionType,
int batchSize,
long createdMs) {
ByteBuffer buffer = ByteBuffer.allocate(batchSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
compressionType,
TimestampType.CREATE_TIME,
batchSize);
return new ProducerBatch(tp, builder, createdMs, true);
private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(),
record.key(), record.value(), record.headers()), batchSize);
ByteBuffer buffer = ByteBuffer.allocate(initialSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional());
return new ProducerBatch(topicPartition, builder, this.createdMs, true);
}
/**

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
@ -478,6 +479,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc);
}
@Override
public void writeTo(ByteBufferOutputStream outputStream) {
outputStream.write(buffer.duplicate());
}
@Override
public boolean equals(Object o) {
if (this == o)

View File

@ -19,10 +19,10 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
@ -206,6 +206,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
buffer.put(this.buffer.duplicate());
}
@Override
public void writeTo(ByteBufferOutputStream outputStream) {
outputStream.write(this.buffer.duplicate());
}
@Override
public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
@ -444,13 +449,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return size;
}
/**
* Get an upper bound on the size of a batch with only a single record using a given key and value.
*/
static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
}
/**
* Get an upper bound on the size of a batch with only a single record using a given key and value.
*/

View File

@ -35,22 +35,18 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
private int position;
private final int end;
private final FileChannel channel;
private final int maxRecordSize;
private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
/**
* Create a new log input stream over the FileChannel
* @param channel Underlying FileChannel
* @param maxRecordSize Maximum size of records
* @param start Position in the file channel to start from
* @param end Position in the file channel not to read past
*/
FileLogInputStream(FileChannel channel,
int maxRecordSize,
int start,
int end) {
this.channel = channel;
this.maxRecordSize = maxRecordSize;
this.position = start;
this.end = end;
}
@ -71,9 +67,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
if (size > maxRecordSize)
throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
if (position + LOG_OVERHEAD + size > end)
return null;

View File

@ -339,35 +339,22 @@ public class FileRecords extends AbstractRecords implements Closeable {
return batches;
}
/**
* Get an iterator over the record batches, enforcing a maximum record size
* @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
* @return An iterator over the batches
*/
public Iterable<FileChannelRecordBatch> batches(int maxRecordSize) {
return batches(maxRecordSize, start);
}
private Iterable<FileChannelRecordBatch> batchesFrom(int start) {
return batches(Integer.MAX_VALUE, start);
}
private Iterable<FileChannelRecordBatch> batches(final int maxRecordSize, final int start) {
private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
return new Iterable<FileChannelRecordBatch>() {
@Override
public Iterator<FileChannelRecordBatch> iterator() {
return batchIterator(maxRecordSize, start);
return batchIterator(start);
}
};
}
private Iterator<FileChannelRecordBatch> batchIterator(int maxRecordSize, int start) {
private Iterator<FileChannelRecordBatch> batchIterator(int start) {
final int end;
if (isSlice)
end = this.end;
else
end = this.sizeInBytes();
FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end);
FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
return new RecordBatchIterator<>(inputStream);
}

View File

@ -16,6 +16,11 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
@ -31,8 +36,8 @@ import java.util.Objects;
* or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType, long)} variants.
*/
public class MemoryRecords extends AbstractRecords {
public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
private final ByteBuffer buffer;
@ -110,16 +115,21 @@ public class MemoryRecords extends AbstractRecords {
/**
* Filter the records into the provided ByteBuffer.
* @param partition The partition that is filtered (used only for logging)
* @param filter The filter function
* @param destinationBuffer The byte buffer to write the filtered records to
* @return A FilterResult with a summary of the output (for metrics)
* @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch
* exceeds this after filtering, we log a warning, but the batch will still be
* created.
* @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer
*/
public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer) {
return filterTo(batches(), filter, destinationBuffer);
public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer,
int maxRecordBatchSize) {
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize);
}
private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter,
ByteBuffer destinationBuffer) {
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize) {
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long maxOffset = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
@ -128,6 +138,8 @@ public class MemoryRecords extends AbstractRecords {
int messagesRetained = 0;
int bytesRetained = 0;
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
for (MutableRecordBatch batch : batches) {
bytesRead += batch.sizeInBytes();
@ -140,7 +152,7 @@ public class MemoryRecords extends AbstractRecords {
// recopy the messages to the destination buffer.
byte batchMagic = batch.magic();
boolean writeOriginalEntry = true;
boolean writeOriginalBatch = true;
List<Record> retainedRecords = new ArrayList<>();
for (Record record : batch) {
@ -150,20 +162,19 @@ public class MemoryRecords extends AbstractRecords {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
// the corrupted batch with correct data.
if (!record.hasMagic(batchMagic))
writeOriginalEntry = false;
writeOriginalBatch = false;
if (record.offset() > maxOffset)
maxOffset = record.offset();
retainedRecords.add(record);
} else {
writeOriginalEntry = false;
writeOriginalBatch = false;
}
}
if (writeOriginalEntry) {
// There are no messages compacted out and no message format conversion, write the original message set back
batch.writeTo(destinationBuffer);
if (writeOriginalBatch) {
batch.writeTo(bufferOutputStream);
messagesRetained += retainedRecords.size();
bytesRetained += batch.sizeInBytes();
if (batch.maxTimestamp() > maxTimestamp) {
@ -171,29 +182,18 @@ public class MemoryRecords extends AbstractRecords {
shallowOffsetOfMaxTimestamp = batch.lastOffset();
}
} else if (!retainedRecords.isEmpty()) {
ByteBuffer slice = destinationBuffer.slice();
TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ?
batch.baseOffset() : retainedRecords.get(0).offset();
MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType,
baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
batch.isTransactional(), batch.partitionLeaderEpoch());
for (Record record : retainedRecords)
builder.append(record);
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
// we must preserve the last offset from the initial batch in order to ensure that the
// last sequence number from the batch remains even after compaction. Otherwise, the producer
// could incorrectly see an out of sequence error.
builder.overrideLastOffset(batch.lastOffset());
MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
MemoryRecords records = builder.build();
destinationBuffer.position(destinationBuffer.position() + slice.position());
int filteredBatchSize = records.sizeInBytes();
messagesRetained += retainedRecords.size();
bytesRetained += records.sizeInBytes();
bytesRetained += filteredBatchSize;
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
"(new size is {}). Consumers from version 0.10.1 and earlier may need to " +
"increase their fetch sizes.",
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
MemoryRecordsBuilder.RecordsInfo info = builder.info();
if (info.maxTimestamp > maxTimestamp) {
@ -201,9 +201,44 @@ public class MemoryRecords extends AbstractRecords {
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
}
}
// If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to
// avoid the need for additional allocations.
ByteBuffer outputBuffer = bufferOutputStream.buffer();
if (outputBuffer != destinationBuffer)
return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
}
return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
}
private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
List<Record> retainedRecords,
ByteBufferOutputStream bufferOutputStream) {
byte magic = originalBatch.magic();
TimestampType timestampType = originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
originalBatch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
long baseOffset = magic >= RecordBatch.MAGIC_VALUE_V2 ?
originalBatch.baseOffset() : retainedRecords.get(0).offset();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
for (Record record : retainedRecords)
builder.append(record);
if (magic >= RecordBatch.MAGIC_VALUE_V2)
// we must preserve the last offset from the initial batch in order to ensure that the
// last sequence number from the batch remains even after compaction. Otherwise, the producer
// could incorrectly see an out of sequence error.
builder.overrideLastOffset(originalBatch.lastOffset());
return builder;
}
/**
@ -271,6 +306,7 @@ public class MemoryRecords extends AbstractRecords {
}
public static class FilterResult {
public final ByteBuffer output;
public final int messagesRead;
public final int bytesRead;
public final int messagesRetained;
@ -279,13 +315,15 @@ public class MemoryRecords extends AbstractRecords {
public final long maxTimestamp;
public final long shallowOffsetOfMaxTimestamp;
public FilterResult(int messagesRead,
public FilterResult(ByteBuffer output,
int messagesRead,
int bytesRead,
int messagesRetained,
int bytesRetained,
long maxOffset,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp) {
this.output = output;
this.messagesRead = messagesRead;
this.bytesRead = bytesRead;
this.messagesRetained = messagesRetained;

View File

@ -49,7 +49,7 @@ public class MemoryRecordsBuilder {
// so it's not safe to hold a direct reference to the underlying ByteBuffer.
private final ByteBufferOutputStream bufferStream;
private final byte magic;
private final int initPos;
private final int initialPosition;
private final long baseOffset;
private final long logAppendTime;
private final boolean isTransactional;
@ -75,25 +75,7 @@ public class MemoryRecordsBuilder {
private MemoryRecords builtRecords;
private boolean aborted = false;
/**
* Construct a new builder.
*
* @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
* to fit the records appended)
* @param magic The magic value to use
* @param compressionType The compression codec to use
* @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
* @param baseOffset The initial offset to use for
* @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
* @param producerId The producer ID associated with the producer writing this record set
* @param producerEpoch The epoch of the producer
* @param baseSequence The sequence number of the first record in this set
* @param isTransactional Whether or not the records are part of a transaction
* @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
* when compression is used since size estimates are rough, and in the case that the first
* record added exceeds the size).
*/
public MemoryRecordsBuilder(ByteBuffer buffer,
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
@ -120,7 +102,6 @@ public class MemoryRecordsBuilder {
this.compressionType = compressionType;
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
this.actualCompressionRatio = 1;
@ -132,22 +113,62 @@ public class MemoryRecordsBuilder {
this.isControlBatch = isControlBatch;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialCapacity = buffer.capacity();
this.initialPosition = bufferStream.position();
this.initialCapacity = bufferStream.capacity();
if (magic > RecordBatch.MAGIC_VALUE_V1) {
buffer.position(initPos + DefaultRecordBatch.RECORDS_OFFSET);
bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET);
} else if (compressionType != CompressionType.NONE) {
// for compressed records, leave space for the header and the shallow message metadata
// and move the starting position to the value payload offset
buffer.position(initPos + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
}
// create the stream
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = new DataOutputStream(compressionType.wrapForOutput(bufferStream, magic,
this.bufferStream = bufferStream;
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic,
COMPRESSION_DEFAULT_BUFFER_SIZE));
}
/**
* Construct a new builder.
*
* @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
* to fit the records appended)
* @param magic The magic value to use
* @param compressionType The compression codec to use
* @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
* @param baseOffset The initial offset to use for
* @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
* @param producerId The producer ID associated with the producer writing this record set
* @param producerEpoch The epoch of the producer
* @param baseSequence The sequence number of the first record in this set
* @param isTransactional Whether or not the records are part of a transaction
* @param isControlBatch Whether or not this is a control batch (e.g. for transaction markers)
* @param partitionLeaderEpoch The epoch of the partition leader appending the record set to the log
* @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
* when compression is used since size estimates are rough, and in the case that the first
* record added exceeds the size).
*/
public MemoryRecordsBuilder(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime,
producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch,
writeLimit);
}
public ByteBuffer buffer() {
return bufferStream.buffer();
}
@ -168,6 +189,10 @@ public class MemoryRecordsBuilder {
return isControlBatch;
}
public boolean isTransactional() {
return isTransactional;
}
/**
* Close this builder and return the resulting buffer.
* @return The built log buffer
@ -249,7 +274,7 @@ public class MemoryRecordsBuilder {
public void abort() {
closeForRecordAppends();
buffer().position(initPos);
buffer().position(initialPosition);
aborted = true;
}
@ -260,6 +285,27 @@ public class MemoryRecordsBuilder {
if (builtRecords != null)
return;
validateProducerState();
closeForRecordAppends();
if (numRecords == 0L) {
buffer().position(initialPosition);
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed;
else if (compressionType != CompressionType.NONE)
this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed;
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
buffer.position(initialPosition);
builtRecords = MemoryRecords.readableRecords(buffer.slice());
}
}
private void validateProducerState() {
if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID)
throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID");
@ -273,23 +319,6 @@ public class MemoryRecordsBuilder {
if (magic < RecordBatch.MAGIC_VALUE_V2)
throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic);
}
closeForRecordAppends();
if (numRecords == 0L) {
buffer().position(initPos);
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed;
else if (compressionType != CompressionType.NONE)
this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed;
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
buffer.position(initPos);
builtRecords = MemoryRecords.readableRecords(buffer.slice());
}
}
/**
@ -300,8 +329,8 @@ public class MemoryRecordsBuilder {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(initPos);
int size = pos - initPos;
buffer.position(initialPosition);
int size = pos - initialPosition;
int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
int offsetDelta = (int) (lastOffset - baseOffset);
@ -331,9 +360,9 @@ public class MemoryRecordsBuilder {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(initPos);
buffer.position(initialPosition);
int wrapperSize = pos - initPos - Records.LOG_OVERHEAD;
int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD;
int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);
@ -544,7 +573,7 @@ public class MemoryRecordsBuilder {
* @param record the record to add
*/
public void append(Record record) {
appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers());
appendWithOffset(record.offset(), isControlBatch, record.timestamp(), record.key(), record.value(), record.headers());
}
/**
@ -736,4 +765,5 @@ public class MemoryRecordsBuilder {
public short producerEpoch() {
return this.producerEpoch;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
/**
* A mutable record batch is one that can be modified in place (without copying). This is used by the broker
* to override certain fields in the batch before appending it to the log.
@ -42,4 +44,11 @@ public interface MutableRecordBatch extends RecordBatch {
* @param epoch The partition leader epoch to use
*/
void setPartitionLeaderEpoch(int epoch);
/**
* Write this record batch into an output stream.
* @param outputStream The buffer to write the batch to
*/
void writeTo(ByteBufferOutputStream outputStream);
}

View File

@ -20,16 +20,18 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* A byte buffer backed output outputStream
* A ByteBuffer-backed OutputStream
*/
public class ByteBufferOutputStream extends OutputStream {
private static final float REALLOCATION_FACTOR = 1.1f;
private ByteBuffer buffer;
private int initialPosition;
public ByteBufferOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
this.initialPosition = buffer.position();
}
public void write(int b) {
@ -40,18 +42,55 @@ public class ByteBufferOutputStream extends OutputStream {
public void write(byte[] bytes, int off, int len) {
if (buffer.remaining() < len)
expandBuffer(buffer.capacity() + len);
expandBuffer(buffer.position() + len);
buffer.put(bytes, off, len);
}
public void write(ByteBuffer buffer) {
if (buffer.hasArray())
write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
else {
int pos = buffer.position();
for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++)
write(buffer.get(i));
}
}
public ByteBuffer buffer() {
return buffer;
}
public int position() {
return buffer.position();
}
public int capacity() {
return buffer.capacity();
}
public int limit() {
return buffer.limit();
}
public void position(int position) {
if (position > buffer.limit())
expandBuffer(position);
buffer.position(position);
}
private void expandBuffer(int size) {
int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
ByteBuffer temp = ByteBuffer.allocate(expandSize);
temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
if (buffer.hasArray()) {
temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
} else {
int limit = buffer.position();
for (int i = 0; i < limit; i++)
temp.put(buffer.get(i));
}
// reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed
buffer.position(initialPosition);
buffer = temp;
}

View File

@ -153,8 +153,7 @@ public class MockClient implements KafkaClient {
short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Next in line response did not match expected request, request: "
+ abstractRequest);
throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody);
responses.add(resp);
@ -195,6 +194,18 @@ public class MockClient implements KafkaClient {
respond(response, false);
}
public void respond(RequestMatcher matcher, AbstractResponse response) {
ClientRequest nextRequest = requests.peek();
if (nextRequest == null)
throw new IllegalStateException("No current requests queued");
AbstractRequest request = nextRequest.requestBuilder().build();
if (!matcher.matches(request))
throw new IllegalStateException("Request matcher did not match next-in-line request " + request);
respond(response);
}
public void respond(AbstractResponse response, boolean disconnected) {
ClientRequest request = requests.remove();
short version = request.requestBuilder().desiredOrLatestVersion();

View File

@ -32,8 +32,11 @@ import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@ -563,7 +566,10 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time,
new ApiVersions(), null);
// Create a big batch
ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now);
ByteBuffer buffer = ByteBuffer.allocate(4096);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
byte[] value = new byte[1024];
final AtomicInteger acked = new AtomicInteger(0);
Callback cb = new Callback() {

View File

@ -38,14 +38,14 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
@ -61,6 +61,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -554,28 +555,18 @@ public class SenderTest {
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
Metrics m = new Metrics();
TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0));
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
new ApiVersions(), txnManager);
try {
Sender sender = new Sender(client,
metadata,
this.accumulator,
true,
MAX_REQUEST_SIZE,
ACKS_ALL,
maxRetries,
m,
time,
REQUEST_TIMEOUT,
1000L,
txnManager,
new ApiVersions());
Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
// Send the first message.
TopicPartition tp2 = new TopicPartition(topic, 1);
final TopicPartition tp2 = new TopicPartition(topic, 1);
Future<RecordMetadata> f1 =
accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> f2 =
@ -607,7 +598,8 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
client.respond(new ProduceResponse(responseMap));
client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f1.isDone());
assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
@ -621,7 +613,8 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
client.respond(new ProduceResponse(responseMap));
client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f2.isDone());
assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
@ -635,6 +628,36 @@ public class SenderTest {
}
}
private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp,
final ProducerIdAndEpoch producerIdAndEpoch,
final int sequence,
final boolean isTransactional) {
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
if (!(body instanceof ProduceRequest))
return false;
ProduceRequest request = (ProduceRequest) body;
Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
MemoryRecords records = recordsMap.get(tp);
if (records == null)
return false;
List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
if (batches.isEmpty() || batches.size() > 1)
return false;
MutableRecordBatch batch = batches.get(0);
return batch.baseOffset() == 0L &&
batch.baseSequence() == sequence &&
batch.producerId() == producerIdAndEpoch.producerId &&
batch.producerEpoch() == producerIdAndEpoch.epoch &&
batch.isTransactional() == isTransactional;
}
};
}
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
assertTrue("Request should be completed", future.isDone());
try {

View File

@ -37,7 +37,7 @@ public class FileLogInputStreamTest {
new SimpleRecord("bar".getBytes())));
fileRecords.flush();
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, 0,
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@ -30,6 +31,7 @@ import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -202,7 +204,8 @@ public class MemoryRecordsTest {
builder.append(12L, null, "c".getBytes());
ByteBuffer filtered = ByteBuffer.allocate(2048);
builder.build().filterTo(new RetainNonNullKeysFilter(), filtered);
builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered,
Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@ -278,7 +281,7 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.readableRecords(buffer).filterTo(new MemoryRecords.RecordFilter() {
MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
@Override
protected boolean shouldDiscard(RecordBatch batch) {
// discard the second and fourth batches
@ -289,7 +292,7 @@ public class MemoryRecordsTest {
protected boolean shouldRetain(RecordBatch recordBatch, Record record) {
return true;
}
}, filtered);
}, filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@ -316,7 +319,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@ -383,7 +387,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@ -439,6 +444,55 @@ public class MemoryRecordsTest {
}
}
@Test
public void testFilterToWithUndersizedBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
builder.append(11L, "1".getBytes(), new byte[128]);
builder.append(12L, "2".getBytes(), "c".getBytes());
builder.append(13L, null, "d".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 4L);
builder.append(14L, null, "e".getBytes());
builder.append(15L, "5".getBytes(), "f".getBytes());
builder.append(16L, "6".getBytes(), "g".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 7L);
builder.append(17L, "7".getBytes(), new byte[128]);
builder.close();
buffer.flip();
ByteBuffer output = ByteBuffer.allocate(64);
List<Record> records = new ArrayList<>();
while (buffer.hasRemaining()) {
output.rewind();
MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
.filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE);
buffer.position(buffer.position() + result.bytesRead);
result.output.flip();
if (output != result.output)
assertEquals(0, output.position());
MemoryRecords filtered = MemoryRecords.readableRecords(result.output);
records.addAll(TestUtils.toList(filtered.records()));
}
assertEquals(5, records.size());
for (Record record : records)
assertNotNull(record.key());
}
@Test
public void testFilterTo() {
ByteBuffer buffer = ByteBuffer.allocate(2048);
@ -464,7 +518,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(
new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE);
filtered.flip();
@ -576,7 +631,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class ByteBufferOutputStreamTest {
@Test
public void testExpandByteBufferOnPositionIncrease() throws Exception {
testExpandByteBufferOnPositionIncrease(ByteBuffer.allocate(16));
}
@Test
public void testExpandDirectByteBufferOnPositionIncrease() throws Exception {
testExpandByteBufferOnPositionIncrease(ByteBuffer.allocateDirect(16));
}
private void testExpandByteBufferOnPositionIncrease(ByteBuffer initialBuffer) throws Exception {
ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
output.write("hello".getBytes());
output.position(32);
assertEquals(32, output.position());
assertEquals(0, initialBuffer.position());
ByteBuffer buffer = output.buffer();
assertEquals(32, buffer.limit());
buffer.position(0);
buffer.limit(5);
byte[] bytes = new byte[5];
buffer.get(bytes);
assertArrayEquals("hello".getBytes(), bytes);
}
@Test
public void testExpandByteBufferOnWrite() throws Exception {
testExpandByteBufferOnWrite(ByteBuffer.allocate(16));
}
@Test
public void testExpandDirectByteBufferOnWrite() throws Exception {
testExpandByteBufferOnWrite(ByteBuffer.allocateDirect(16));
}
private void testExpandByteBufferOnWrite(ByteBuffer initialBuffer) throws Exception {
ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
output.write("hello".getBytes());
output.write(new byte[27]);
assertEquals(32, output.position());
assertEquals(0, initialBuffer.position());
ByteBuffer buffer = output.buffer();
assertEquals(32, buffer.limit());
buffer.position(0);
buffer.limit(5);
byte[] bytes = new byte[5];
buffer.get(bytes);
assertArrayEquals("hello".getBytes(), bytes);
}
@Test
public void testWriteByteBuffer() {
testWriteByteBuffer(ByteBuffer.allocate(16));
}
@Test
public void testWriteDirectByteBuffer() {
testWriteByteBuffer(ByteBuffer.allocateDirect(16));
}
private void testWriteByteBuffer(ByteBuffer input) {
long value = 234239230L;
input.putLong(value);
input.flip();
ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32));
output.write(input);
assertEquals(8, output.position());
assertEquals(value, output.buffer().getLong(0));
}
}

View File

@ -318,7 +318,7 @@ class Log(@volatile var dir: File,
loadProducersFromLog(stateManager, fetchDataInfo.records)
}
stateManager.updateMapEndOffset(segment.baseOffset)
val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.

View File

@ -516,22 +516,23 @@ private[log] class Cleaner(val id: Int,
source.log.readInto(readBuffer, position)
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)
val result = records.filterTo(logCleanerFilter, writeBuffer)
val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize)
stats.readMessages(result.messagesRead, result.bytesRead)
stats.recopyMessages(result.messagesRetained, result.bytesRetained)
position += result.bytesRead
// if any messages are to be retained, write them out
if (writeBuffer.position > 0) {
writeBuffer.flip()
val retained = MemoryRecords.readableRecords(writeBuffer)
val outputBuffer = result.output
if (outputBuffer.position > 0) {
outputBuffer.flip()
val retained = MemoryRecords.readableRecords(outputBuffer)
dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
largestOffset = result.maxOffset,
largestTimestamp = result.maxTimestamp,
shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
records = retained)
throttler.maybeThrottle(writeBuffer.limit)
throttler.maybeThrottle(outputBuffer.limit)
}
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again

View File

@ -242,19 +242,16 @@ class LogSegment(val log: FileRecords,
index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
* from the end of the log and index.
*
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
* the transaction index.
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
def recover(maxMessageSize: Int,
producerStateManager: ProducerStateManager,
leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
index.truncate()
index.resize(index.maxIndexSize)
timeIndex.truncate()
@ -264,7 +261,7 @@ class LogSegment(val log: FileRecords,
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
for (batch <- log.batches(maxMessageSize).asScala) {
for (batch <- log.batches.asScala) {
batch.ensureValid()
// The max timestamp is exposed at the batch level, so no need to iterate the records
@ -296,6 +293,9 @@ class LogSegment(val log: FileRecords,
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
log.truncateTo(validBytes)
index.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.

View File

@ -337,8 +337,8 @@ object DumpLogSegments {
val messageSet = FileRecords.open(file, false)
var validBytes = 0L
var lastOffset = -1L
val batches = messageSet.batches(maxMessageSize).asScala
for (batch <- batches) {
for (batch <- messageSet.batches.asScala) {
if (isDeepIteration) {
for (record <- batch.asScala) {
if (lastOffset == -1)

View File

@ -264,7 +264,7 @@ class LogSegmentTest {
seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
val indexFile = seg.index.file
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
seg.recover(new ProducerStateManager(topicPartition, logDir))
for(i <- 0 until 100)
assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset)
}
@ -303,7 +303,7 @@ class LogSegmentTest {
shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
var stateManager = new ProducerStateManager(topicPartition, logDir)
segment.recover(64 * 1024, stateManager)
segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)
@ -318,7 +318,7 @@ class LogSegmentTest {
// recover again, but this time assuming the transaction from pid2 began on a previous segment
stateManager = new ProducerStateManager(topicPartition, logDir)
stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
segment.recover(64 * 1024, stateManager)
segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)
abortedTxns = segment.txnIndex.allAbortedTxns
@ -352,7 +352,7 @@ class LogSegmentTest {
seg.append(i, i, i * 10, i, records(i, i.toString))
val timeIndexFile = seg.timeIndex.file
TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
seg.recover(new ProducerStateManager(topicPartition, logDir))
for(i <- 0 until 100) {
assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
if (i < 99)
@ -376,7 +376,7 @@ class LogSegmentTest {
val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
val position = recordPosition.position + TestUtils.random.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
seg.recover(new ProducerStateManager(topicPartition, logDir))
assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList,
seg.log.batches.asScala.map(_.lastOffset).toList)
seg.delete()

View File

@ -30,6 +30,7 @@ import org.junit.{After, Before, Test}
import kafka.utils._
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@ -221,9 +222,9 @@ class LogTest {
records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
records.filterTo(new RecordFilter {
records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
}, filtered)
}, filtered, Int.MaxValue)
filtered.flip()
val filteredRecords = MemoryRecords.readableRecords(filtered)
@ -265,9 +266,9 @@ class LogTest {
records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
records.filterTo(new RecordFilter {
records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
}, filtered)
}, filtered, Int.MaxValue)
filtered.flip()
val filteredRecords = MemoryRecords.readableRecords(filtered)