[KAFKA-8522] Streamline tombstone and transaction marker removal (#10914)

This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time.

In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches.

KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds

Co-authored-by: Ted Yu <yuzhihong@gmail.com>
Co-authored-by: Richard Yu <yohan.richard.yu@gmail.com>
This commit is contained in:
Matthew Wong 2021-09-16 09:17:15 -07:00 committed by GitHub
parent 7de8a93c7e
commit 6c80643009
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 499 additions and 191 deletions

View File

@ -57,7 +57,7 @@
<suppress checks="ParameterNumber"
files="DefaultRecordBatch.java"/>
<suppress checks="ParameterNumber"
files="Sender.java"/>
files="MemoryRecordsBuilder.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>

View File

@ -35,6 +35,7 @@ import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
@ -213,6 +214,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
return false;
}
@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}
/**
* Get an iterator for the nested entries contained within this batch. Note that
* if the batch is not compressed, then this method will return an iterator over the
@ -464,6 +470,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
return buffer.getLong(OFFSET_OFFSET);
}
@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}
@Override
public LegacyRecord outerRecord() {
return record;
@ -553,6 +564,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
return loadFullBatch().baseOffset();
}
@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}
@Override
public long lastOffset() {
return offset;

View File

@ -37,6 +37,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@ -90,11 +91,15 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* -------------------------------------------------------------------------------------------------
* | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -------------------------------------------------------------------------------------------------
* ---------------------------------------------------------------------------------------------------------------------------
* | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
@ -111,9 +116,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
static final int ATTRIBUTE_LENGTH = 2;
public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
static final int LAST_OFFSET_DELTA_LENGTH = 4;
static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int FIRST_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH;
static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int BASE_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
static final int MAX_TIMESTAMP_LENGTH = 8;
static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
static final int PRODUCER_ID_LENGTH = 8;
@ -129,6 +134,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
@ -156,13 +162,12 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
/**
* Get the timestamp of the first record in this batch. It is always the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
* Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas.
*
* @return The base timestamp
*/
public long firstTimestamp() {
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
public long baseTimestamp() {
return buffer.getLong(BASE_TIMESTAMP_OFFSET);
}
@Override
@ -246,6 +251,18 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}
private boolean hasDeleteHorizonMs() {
return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
}
@Override
public OptionalLong deleteHorizonMs() {
if (hasDeleteHorizonMs())
return OptionalLong.of(buffer.getLong(BASE_TIMESTAMP_OFFSET));
else
return OptionalLong.empty();
}
@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
@ -290,9 +307,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
buffer.position(RECORDS_OFFSET);
return new RecordIterator() {
@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (BufferUnderflowException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
}
@ -364,7 +381,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;
byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), hasDeleteHorizonMs());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
@ -411,7 +428,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
private static byte computeAttributes(CompressionType type, TimestampType timestampType,
boolean isTransactional, boolean isControl) {
boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
@ -423,6 +440,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
if (isDeleteHorizonSet)
attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}
@ -440,8 +459,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}
public static void writeHeader(ByteBuffer buffer,
@ -458,6 +477,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
int sequence,
boolean isTransactional,
boolean isControlBatch,
boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
@ -465,7 +485,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);
int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
@ -473,7 +493,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
@ -555,7 +575,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private abstract class RecordIterator implements CloseableIterator<Record> {
private final Long logAppendTime;
private final long baseOffset;
private final long firstTimestamp;
private final long baseTimestamp;
private final int baseSequence;
private final int numRecords;
private int readRecords = 0;
@ -563,7 +583,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
RecordIterator() {
this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
this.baseOffset = baseOffset();
this.firstTimestamp = firstTimestamp();
this.baseTimestamp = baseTimestamp();
this.baseSequence = baseSequence();
int numRecords = count();
if (numRecords < 0)
@ -583,7 +603,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
throw new NoSuchElementException();
readRecords++;
Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime);
Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
if (readRecords == numRecords) {
// Validate that the actual size of the batch is equal to declared size
// by checking that after reading declared number of items, there no items left
@ -594,7 +614,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return rec;
}
protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime);
protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);
protected abstract boolean ensureNoneRemaining();
@ -616,9 +636,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;
@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime);
return doReadRecord(baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
@ -705,6 +725,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return loadBatchHeader().isTransactional();
}
@Override
public OptionalLong deleteHorizonMs() {
return loadBatchHeader().deleteHorizonMs();
}
@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
@ -153,15 +154,20 @@ public class MemoryRecords extends AbstractRecords {
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
}
/**
* Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
* to the delete horizon of the tombstones or txn markers which are present in the batch.
*/
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
for (MutableRecordBatch batch : batches) {
long maxOffset = -1L;
BatchRetention batchRetention = filter.checkBatchRetention(batch);
final BatchRetentionResult batchRetentionResult = filter.checkBatchRetention(batch);
final boolean containsMarkerForEmptyTxn = batchRetentionResult.containsMarkerForEmptyTxn;
final BatchRetention batchRetention = batchRetentionResult.batchRetention;
filterResult.bytesRead += batch.sizeInBytes();
if (batchRetention == BatchRetention.DELETE)
@ -171,38 +177,33 @@ public class MemoryRecords extends AbstractRecords {
// allow for the possibility that a previous version corrupted the log by writing a compressed record batch
// with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
// recopy the messages to the destination buffer.
byte batchMagic = batch.magic();
boolean writeOriginalBatch = true;
List<Record> retainedRecords = new ArrayList<>();
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
while (iterator.hasNext()) {
Record record = iterator.next();
filterResult.messagesRead += 1;
if (filter.shouldRetainRecord(batch, record)) {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
// the corrupted batch with correct data.
if (!record.hasMagic(batchMagic))
writeOriginalBatch = false;
if (record.offset() > maxOffset)
maxOffset = record.offset();
retainedRecords.add(record);
} else {
writeOriginalBatch = false;
}
}
}
final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
batchMagic, true, retainedRecords);
boolean containsTombstones = iterationResult.containsTombstones;
boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
long maxOffset = iterationResult.maxOffset;
if (!retainedRecords.isEmpty()) {
if (writeOriginalBatch) {
// we check if the delete horizon should be set to a new value
// in which case, we need to reset the base timestamp and overwrite the timestamp deltas
// if the batch does not contain tombstones, then we don't need to overwrite batch
boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
&& !batch.deleteHorizonMs().isPresent();
if (writeOriginalBatch && !needToSetDeleteHorizon) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
final MemoryRecordsBuilder builder;
long deleteHorizonMs;
if (needToSetDeleteHorizon)
deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
else
deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP);
builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
@ -239,9 +240,59 @@ public class MemoryRecords extends AbstractRecords {
return filterResult;
}
private static BatchFilterResult filterBatch(RecordBatch batch,
BufferSupplier decompressionBufferSupplier,
FilterResult filterResult,
RecordFilter filter,
byte batchMagic,
boolean writeOriginalBatch,
List<Record> retainedRecords) {
long maxOffset = -1;
boolean containsTombstones = false;
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
while (iterator.hasNext()) {
Record record = iterator.next();
filterResult.messagesRead += 1;
if (filter.shouldRetainRecord(batch, record)) {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
// the corrupted batch with correct data.
if (!record.hasMagic(batchMagic))
writeOriginalBatch = false;
if (record.offset() > maxOffset)
maxOffset = record.offset();
retainedRecords.add(record);
if (!record.hasValue()) {
containsTombstones = true;
}
} else {
writeOriginalBatch = false;
}
}
return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset);
}
}
private static class BatchFilterResult {
private final boolean writeOriginalBatch;
private final boolean containsTombstones;
private final long maxOffset;
private BatchFilterResult(final boolean writeOriginalBatch,
final boolean containsTombstones,
final long maxOffset) {
this.writeOriginalBatch = writeOriginalBatch;
this.containsTombstones = containsTombstones;
this.maxOffset = maxOffset;
}
}
private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
List<Record> retainedRecords,
ByteBufferOutputStream bufferOutputStream) {
ByteBufferOutputStream bufferOutputStream,
final long deleteHorizonMs) {
byte magic = originalBatch.magic();
TimestampType timestampType = originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
@ -252,7 +303,7 @@ public class MemoryRecords extends AbstractRecords {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs);
for (Record record : retainedRecords)
builder.append(record);
@ -303,6 +354,24 @@ public class MemoryRecords extends AbstractRecords {
}
public static abstract class RecordFilter {
public final long currentTime;
public final long deleteRetentionMs;
public RecordFilter(final long currentTime, final long deleteRetentionMs) {
this.currentTime = currentTime;
this.deleteRetentionMs = deleteRetentionMs;
}
public static class BatchRetentionResult {
public final BatchRetention batchRetention;
public final boolean containsMarkerForEmptyTxn;
public BatchRetentionResult(final BatchRetention batchRetention,
final boolean containsMarkerForEmptyTxn) {
this.batchRetention = batchRetention;
this.containsMarkerForEmptyTxn = containsMarkerForEmptyTxn;
}
}
public enum BatchRetention {
DELETE, // Delete the batch without inspecting records
RETAIN_EMPTY, // Retain the batch even if it is empty
@ -313,7 +382,7 @@ public class MemoryRecords extends AbstractRecords {
* Check whether the full batch can be discarded (i.e. whether we even need to
* check the records individually).
*/
protected abstract BatchRetention checkBatchRetention(RecordBatch batch);
protected abstract BatchRetentionResult checkBatchRetention(RecordBatch batch);
/**
* Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)}

View File

@ -80,9 +80,10 @@ public class MemoryRecordsBuilder implements AutoCloseable {
private int numRecords = 0;
private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
private long deleteHorizonMs;
private long offsetOfMaxTimestamp = -1;
private Long lastOffset = null;
private Long firstTimestamp = null;
private Long baseTimestamp = null;
private MemoryRecords builtRecords;
private boolean aborted = false;
@ -99,7 +100,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
int writeLimit,
long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
@ -109,6 +111,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic);
}
this.magic = magic;
@ -125,6 +129,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.isControlBatch = isControlBatch;
this.deleteHorizonMs = deleteHorizonMs;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
@ -133,6 +138,28 @@ public class MemoryRecordsBuilder implements AutoCloseable {
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
if (hasDeleteHorizonMs()) {
this.baseTimestamp = deleteHorizonMs;
}
}
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
this(bufferStream, magic, compressionType, timestampType, baseOffset, logAppendTime, producerId,
producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit,
RecordBatch.NO_TIMESTAMP);
}
/**
@ -197,6 +224,10 @@ public class MemoryRecordsBuilder implements AutoCloseable {
return isTransactional;
}
public boolean hasDeleteHorizonMs() {
return magic >= RecordBatch.MAGIC_VALUE_V2 && deleteHorizonMs >= 0L;
}
/**
* Close this builder and return the resulting buffer.
* @return The built log buffer
@ -369,8 +400,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
maxTimestamp = this.maxTimestamp;
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
partitionLeaderEpoch, numRecords);
baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords);
buffer.position(pos);
return writtenCompressed;
@ -416,8 +447,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
if (firstTimestamp == null)
firstTimestamp = timestamp;
if (baseTimestamp == null)
baseTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
@ -624,12 +655,12 @@ public class MemoryRecordsBuilder implements AutoCloseable {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
int offsetDelta = (int) (offset - baseOffset);
long timestamp = record.timestamp();
if (firstTimestamp == null)
firstTimestamp = timestamp;
if (baseTimestamp == null)
baseTimestamp = timestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream,
offsetDelta,
timestamp - firstTimestamp,
timestamp - baseTimestamp,
record.key(),
record.value(),
record.headers());
@ -683,7 +714,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - firstTimestamp;
long timestampDelta = timestamp - baseTimestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
@ -788,7 +819,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
} else {
int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp;
long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.CloseableIterator;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.OptionalLong;
/**
* A record batch is a container for records. In old versions of the record format (versions 0 and 1),
@ -211,6 +212,12 @@ public interface RecordBatch extends Iterable<Record> {
*/
boolean isTransactional();
/**
* Get the delete horizon, returns OptionalLong.EMPTY if the first timestamp is not the delete horizon
* @return timestamp of the delete horizon
*/
OptionalLong deleteHorizonMs();
/**
* Get the partition leader epoch of this record batch.
* @return The leader epoch or -1 if it is unknown

View File

@ -3147,10 +3147,10 @@ public class FetcherTest {
new SimpleRecord(null, "value".getBytes()));
// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter() {
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetention checkBatchRetention(RecordBatch batch) {
return BatchRetention.DELETE_EMPTY;
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override

View File

@ -67,7 +67,7 @@ public class DefaultRecordBatchTest {
assertEquals(isTransactional, batch.isTransactional());
assertEquals(timestampType, batch.timestampType());
assertEquals(timestamp, batch.maxTimestamp());
assertEquals(RecordBatch.NO_TIMESTAMP, batch.firstTimestamp());
assertEquals(RecordBatch.NO_TIMESTAMP, batch.baseTimestamp());
assertEquals(isControlBatch, batch.isControlBatch());
}
}

View File

@ -19,6 +19,9 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@ -35,6 +38,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@ -755,6 +759,48 @@ public class MemoryRecordsBuilderTest {
assertTrue(iterations < 100, "Memory usage too high: " + memUsed);
}
@ParameterizedTest
@ArgumentsSource(V2MemoryRecordsBuilderArgumentsProvider.class)
public void testRecordTimestampsWithDeleteHorizon(Args args) {
long deleteHorizon = 100;
int payloadLen = 1024 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(buffer);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBufferOutputStream, args.magic, args.compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, 0, deleteHorizon);
builder.append(50L, "0".getBytes(), "0".getBytes());
builder.append(100L, "1".getBytes(), null);
builder.append(150L, "2".getBytes(), "2".getBytes());
MemoryRecords records = builder.build();
List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs());
CloseableIterator<Record> recordIterator = batches.get(0).streamingIterator(BufferSupplier.create());
Record record = recordIterator.next();
assertEquals(50L, record.timestamp());
record = recordIterator.next();
assertEquals(100L, record.timestamp());
record = recordIterator.next();
assertEquals(150L, record.timestamp());
recordIterator.close();
}
private static class V2MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
List<Arguments> values = new ArrayList<>();
for (int bufferOffset : Arrays.asList(0, 15))
for (CompressionType type: CompressionType.values()) {
values.add(Arguments.of(new Args(bufferOffset, type, MAGIC_VALUE_V2)));
}
return values.stream();
}
}
private void verifyRecordsProcessingStats(CompressionType compressionType, RecordConversionStats processingStats,
int numRecords, int numRecordsConverted, long finalBytes,
long preConvertedBytes) {

View File

@ -21,8 +21,10 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@ -37,6 +39,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -101,6 +104,18 @@ public class MemoryRecordsTest {
}
}
private static class V2MemoryRecordsArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
List<Arguments> arguments = new ArrayList<>();
for (long firstOffset : asList(0L, 57L))
for (CompressionType type: CompressionType.values()) {
arguments.add(Arguments.of(new Args(RecordBatch.MAGIC_VALUE_V2, firstOffset, type)));
}
return arguments.stream();
}
}
private final long logAppendTime = System.currentTimeMillis();
@ParameterizedTest
@ -316,11 +331,11 @@ public class MemoryRecordsTest {
MemoryRecords records = builder.build();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
new MemoryRecords.RecordFilter() {
new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetention checkBatchRetention(RecordBatch batch) {
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// retain all batches
return BatchRetention.RETAIN_EMPTY;
return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
@Override
@ -378,11 +393,11 @@ public class MemoryRecordsTest {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords records = MemoryRecords.readableRecords(buffer);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
new MemoryRecords.RecordFilter() {
new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetention checkBatchRetention(RecordBatch batch) {
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// retain all batches
return BatchRetention.RETAIN_EMPTY;
return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
@Override
@ -426,10 +441,10 @@ public class MemoryRecordsTest {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords records = MemoryRecords.readableRecords(buffer);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
new MemoryRecords.RecordFilter() {
new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetention checkBatchRetention(RecordBatch batch) {
return deleteRetention;
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(deleteRetention, false);
}
@Override
@ -483,6 +498,53 @@ public class MemoryRecordsTest {
assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
}
/**
* This test is used to see if the base timestamp of the batch has been successfully
* converted to a delete horizon for the tombstones / transaction markers of the batch.
* It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
*/
@ParameterizedTest
@ArgumentsSource(V2MemoryRecordsArgumentsProvider.class)
public void testBaseTimestampToDeleteHorizonConversion(Args args) {
int partitionLeaderEpoch = 998;
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
builder.append(5L, "0".getBytes(), "0".getBytes());
builder.append(10L, "1".getBytes(), null);
builder.append(15L, "2".getBytes(), "2".getBytes());
ByteBuffer filtered = ByteBuffer.allocate(2048);
final long deleteHorizon = Integer.MAX_VALUE / 2;
final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
@Override
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return true;
}
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
};
builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
assertEquals(1, batches.size());
assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs());
CloseableIterator<Record> recordIterator = batches.get(0).streamingIterator(BufferSupplier.create());
Record record = recordIterator.next();
assertEquals(5L, record.timestamp());
record = recordIterator.next();
assertEquals(10L, record.timestamp());
record = recordIterator.next();
assertEquals(15L, record.timestamp());
recordIterator.close();
}
@Test
public void testBuildLeaderChangeMessage() {
final int leaderId = 5;
@ -554,13 +616,13 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetention checkBatchRetention(RecordBatch batch) {
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// discard the second and fourth batches
if (batch.lastOffset() == 2L || batch.lastOffset() == 6L)
return BatchRetention.DELETE;
return BatchRetention.DELETE_EMPTY;
return new BatchRetentionResult(BatchRetention.DELETE, false);
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override
@ -1012,9 +1074,13 @@ public class MemoryRecordsTest {
}
private static class RetainNonNullKeysFilter extends MemoryRecords.RecordFilter {
public RetainNonNullKeysFilter() {
super(0, 0);
}
@Override
protected BatchRetention checkBatchRetention(RecordBatch batch) {
return BatchRetention.DELETE_EMPTY;
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override

View File

@ -67,6 +67,8 @@ import scala.util.control.ControlThrowable
* The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic
* basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed).
* Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.
* This time is tracked by setting the base timestamp of a record batch with delete markers when the batch is recopied in the first cleaning that encounters
* it. The relative timestamps of the records in the batch are also modified when recopied in this cleaning according to the new base timestamp of the batch.
*
* Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following
* are the key points:
@ -342,7 +344,8 @@ class LogCleaner(initialConfig: CleanerConfig,
@throws(classOf[LogCleaningException])
private def cleanFilthiestLog(): Boolean = {
val preCleanStats = new PreCleanStats()
val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats)
val cleaned = ltc match {
case None =>
false
case Some(cleanable) =>
@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int,
* @return The first offset not cleaned and the statistics for this round of cleaning
*/
private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
doClean(cleanable, time.milliseconds())
}
private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
info("Beginning cleaning of log %s".format(cleanable.log.name))
// figure out the timestamp below which it is safe to remove delete tombstones
// this position is defined to be a configurable time beneath the last modified time of the last clean segment
val deleteHorizonMs =
// this timestamp is only used on the older message formats older than MAGIC_VALUE_V2
val legacyDeleteHorizonMs =
cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
}
doClean(cleanable, deleteHorizonMs)
}
private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
info("Beginning cleaning of log %s.".format(cleanable.log.name))
}
val log = cleanable.log
val stats = new CleanerStats()
@ -522,13 +526,13 @@ private[log] class Cleaner(val id: Int,
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to upper bound deletion horizon %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)))
val transactionMetadata = new CleanedTransactionMetadata
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs)
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
@ -544,17 +548,19 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
* @param deleteHorizonMs The time to retain delete tombstones
* @param currentTime The current time in milliseconds
* @param stats Collector for cleaning statistics
* @param transactionMetadata State of ongoing transactions which is carried between the cleaning
* of the grouped segments
* @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2
*/
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
currentTime: Long,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
transactionMetadata: CleanedTransactionMetadata,
legacyDeleteHorizonMs: Long): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
@ -574,14 +580,15 @@ private[log] class Cleaner(val id: Int,
val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
transactionMetadata.addAbortedTransactions(abortedTransactions)
val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
s"with deletion horizon $deleteHorizonMs, " +
s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed from " +
s"the segment last modified time of ${currentSegment.lastModified}," +
s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
try {
cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
transactionMetadata, lastOffsetOfActiveProducers, stats)
cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime)
} catch {
case e: LogSegmentOffsetOverflowException =>
// Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
@ -622,26 +629,35 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
* @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
* @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
* @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding topic
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*/
private[log] def cleanInto(topicPartition: TopicPartition,
sourceRecords: FileRecords,
dest: LogSegment,
map: OffsetMap,
retainDeletesAndTxnMarkers: Boolean,
retainLegacyDeletesAndTxnMarkers: Boolean,
deleteRetentionMs: Long,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: Map[Long, LastRecord],
stats: CleanerStats): Unit = {
val logCleanerFilter: RecordFilter = new RecordFilter {
stats: CleanerStats,
currentTime: Long): Unit = {
val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
var discardBatchRecords: Boolean = _
override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
// we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
// note that we will never delete a marker until all the records from that transaction are removed.
discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
if (batch.isControlBatch)
discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime
else
discardBatchRecords = canDiscardBatch
def isBatchLastRecordOfProducer: Boolean = {
// We retain the batch in order to preserve the state of active producers. There are three cases:
@ -658,12 +674,14 @@ private[log] class Cleaner(val id: Int,
}
}
if (batch.hasProducerId && isBatchLastRecordOfProducer)
BatchRetention.RETAIN_EMPTY
else if (discardBatchRecords)
BatchRetention.DELETE
else
BatchRetention.DELETE_EMPTY
val batchRetention: BatchRetention =
if (batch.hasProducerId && isBatchLastRecordOfProducer)
BatchRetention.RETAIN_EMPTY
else if (discardBatchRecords)
BatchRetention.DELETE
else
BatchRetention.DELETE_EMPTY
new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch && batch.isControlBatch)
}
override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
@ -671,7 +689,7 @@ private[log] class Cleaner(val id: Int,
// The batch is only retained to preserve producer sequence information; the records can be removed
false
else
Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats)
Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime)
}
}
@ -686,6 +704,7 @@ private[log] class Cleaner(val id: Int,
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)
val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier)
stats.readMessages(result.messagesRead, result.bytesRead)
stats.recopyMessages(result.messagesRetained, result.bytesRetained)
@ -747,22 +766,19 @@ private[log] class Cleaner(val id: Int,
}
private def shouldDiscardBatch(batch: RecordBatch,
transactionMetadata: CleanedTransactionMetadata,
retainTxnMarkers: Boolean): Boolean = {
if (batch.isControlBatch) {
val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch)
canDiscardControlBatch && !retainTxnMarkers
} else {
val canDiscardBatch = transactionMetadata.onBatchRead(batch)
canDiscardBatch
}
transactionMetadata: CleanedTransactionMetadata): Boolean = {
if (batch.isControlBatch)
transactionMetadata.onControlBatchRead(batch)
else
transactionMetadata.onBatchRead(batch)
}
private def shouldRetainRecord(map: kafka.log.OffsetMap,
retainDeletes: Boolean,
retainDeletesForLegacyRecords: Boolean,
batch: RecordBatch,
record: Record,
stats: CleanerStats): Boolean = {
stats: CleanerStats,
currentTime: Long): Boolean = {
val pastLatestOffset = record.offset > map.latestOffset
if (pastLatestOffset)
return true
@ -776,7 +792,14 @@ private[log] class Cleaner(val id: Int,
* 2) The message doesn't has value but it can't be deleted now.
*/
val latestOffsetForKey = record.offset() >= foundOffset
val isRetainedValue = record.hasValue || retainDeletes
val legacyRecord = batch.magic() < RecordBatch.MAGIC_VALUE_V2
def shouldRetainDeletes = {
if (!legacyRecord)
!batch.deleteHorizonMs().isPresent || currentTime < batch.deleteHorizonMs().getAsLong
else
retainDeletesForLegacyRecords
}
val isRetainedValue = record.hasValue || shouldRetainDeletes
latestOffsetForKey && isRetainedValue
} else {
stats.invalidMessage()
@ -1111,8 +1134,6 @@ private[log] class CleanedTransactionMetadata {
case ControlRecordType.ABORT =>
ongoingAbortedTxns.remove(producerId) match {
// Retain the marker until all batches from the transaction have been removed.
// We may retain a record from an aborted transaction if it is the last entry
// written by a given producerId.
case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
false

View File

@ -169,11 +169,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val now = time.milliseconds
this.timeOfLastRun = now
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
case (_, log) => log.config.compact // match logs that are marked as compacted
case (_, log) => log.config.compact
}.filterNot {
case (topicPartition, log) =>
// skip any logs already in-progress and uncleanable partitions
inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
@ -198,9 +198,10 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
if(cleanableLogs.isEmpty) {
if (cleanableLogs.isEmpty)
None
} else {
else {
preCleanStats.recordCleanablePartitions(cleanableLogs.size)
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)

View File

@ -18,6 +18,7 @@
package kafka.log
import java.io.PrintWriter
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import kafka.utils.{MockTime, TestUtils}
@ -186,7 +187,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
}
}
private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long,
startValue: Int, step: Int): Seq[(Int, Int)] = {
var valCounter = startValue
for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val curValue = valCounter

View File

@ -195,7 +195,6 @@ class LogCleanerManagerTest extends Logging {
cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
assertEquals(tp1, filthiestLog.topicPartition)
assertEquals(tp1, filthiestLog.log.topicPartition)
}

View File

@ -52,6 +52,8 @@ class LogCleanerTest {
val logConfig = LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
@AfterEach
def teardown(): Unit = {
@ -84,8 +86,8 @@ class LogCleanerTest {
val segments = log.logSegments.take(3).toSeq
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata)
val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
@ -170,7 +172,7 @@ class LogCleanerTest {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata)
cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata, -1)
// Validate based on the file name that log segment file is renamed exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
@ -365,7 +367,7 @@ class LogCleanerTest {
log.roll()
// cannot remove the marker in this pass because there are still valid records
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
@ -374,17 +376,17 @@ class LogCleanerTest {
log.roll()
// the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// delete horizon forced to 0 to verify marker is not removed early
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
// clean again with same timestamp to verify marker is not removed early
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with large delete horizon and verify the marker is removed
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
// clean again with max timestamp to verify the marker is removed
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
@ -413,11 +415,12 @@ class LogCleanerTest {
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
log.roll()
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 4), offsetsInLog(log))
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
// In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
}
@ -454,14 +457,14 @@ class LogCleanerTest {
// first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
@ -475,13 +478,15 @@ class LogCleanerTest {
log.roll()
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
// The deleteHorizon for {Producer2: Commit} is still not set yet.
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
// In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
@ -506,14 +511,16 @@ class LogCleanerTest {
// first time through the control batch is retained as an empty batch
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
// In the first pass, the deleteHorizon for the commit marker is set. In the second pass, the commit marker is removed
// but the empty batch is retained for preserving the producer epoch.
var dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
// the empty control batch does not cause an exception when cleaned
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
@ -537,7 +544,7 @@ class LogCleanerTest {
log.roll()
// Both the record and the marker should remain after cleaning
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(0, 1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@ -562,12 +569,12 @@ class LogCleanerTest {
// Both the batch and the marker should remain after cleaning. The batch is retained
// because it is the last entry for this producerId. The marker is retained because
// there are still batches remaining from this transaction.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
// The empty batch and the marker is still retained after a second cleaning.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@ -591,13 +598,13 @@ class LogCleanerTest {
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
log.roll()
// delete horizon set to 0 to verify marker is not removed early
val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
// Aborted records are removed, but the abort marker is still preserved.
val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
// clean again with large delete horizon and verify the marker is removed
cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
// In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5), offsetsInLog(log))
}
@ -633,12 +640,12 @@ class LogCleanerTest {
// Both transactional batches will be cleaned. The last one will remain in the log
// as an empty batch in order to preserve the producer sequence number and epoch
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
// On the second round of cleaning, the marker from the first transaction should be removed.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
// In the first pass, the delete horizon for the first marker is set. In the second pass, the first marker is removed.
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(3, 4, 5), offsetsInLog(log))
assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
}
@ -670,14 +677,14 @@ class LogCleanerTest {
assertAbortedTransactionIndexed()
// first time through the records are removed
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertAbortedTransactionIndexed()
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
// the empty batch remains if cleaned again because it still holds the last sequence
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertAbortedTransactionIndexed()
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
@ -687,13 +694,14 @@ class LogCleanerTest {
appendProducer(Seq(1))
log.roll()
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertAbortedTransactionIndexed()
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
// In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
assertEquals(List(3), lastOffsetsPerBatchInLog(log))
@ -728,7 +736,7 @@ class LogCleanerTest {
// clean the log
val stats = new CleanerStats()
cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata)
cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@ -741,7 +749,7 @@ class LogCleanerTest {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@ -760,7 +768,7 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
)
}
@ -777,7 +785,7 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
)
}
@ -1112,7 +1120,7 @@ class LogCleanerTest {
keys.foreach(k => map.put(key(k), Long.MaxValue))
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
)
}
@ -1372,7 +1380,7 @@ class LogCleanerTest {
// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTestUtils.keysInLog(log))
@ -1381,7 +1389,7 @@ class LogCleanerTest {
// Clean each segment now that split is complete.
for (segmentToClean <- log.logSegments)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
log.close()
@ -1422,7 +1430,7 @@ class LogCleanerTest {
// clean the log
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
var cleanedKeys = LogTestUtils.keysInLog(log)
@ -1438,7 +1446,7 @@ class LogCleanerTest {
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@ -1455,7 +1463,7 @@ class LogCleanerTest {
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@ -1477,7 +1485,7 @@ class LogCleanerTest {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@ -1495,7 +1503,7 @@ class LogCleanerTest {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@ -1513,7 +1521,7 @@ class LogCleanerTest {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@ -1654,6 +1662,7 @@ class LogCleanerTest {
key = "0".getBytes,
timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0)
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
assertEquals(1, log.logSegments.head.log.batches.iterator.next().lastOffset,
"The tombstone should be retained.")
@ -1854,6 +1863,19 @@ class LogCleanerTest {
private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
}
/**
* We need to run a two pass clean to perform the following steps to stimulate a proper clean:
* 1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records.
* 2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the
* tombstones to expire, leading to their prompt removal from the log.
* Returns the first dirty offset in the log as a result of the second cleaning.
*/
private def runTwoPassClean(cleaner: Cleaner, logToClean: LogToClean, currentTime: Long,
tombstoneRetentionMs: Long = 86400000) : Long = {
cleaner.doClean(logToClean, currentTime)
cleaner.doClean(logToClean, currentTime + tombstoneRetentionMs + 1)._1
}
}
class FakeOffsetMap(val slots: Int) extends OffsetMap {

View File

@ -1521,7 +1521,7 @@ class LogValidatorTest {
def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = {
batch match {
case b: DefaultRecordBatch =>
assertEquals(expected, b.firstTimestamp, s"Unexpected base timestamp of batch $batch")
assertEquals(expected, b.baseTimestamp, s"Unexpected base timestamp of batch $batch")
case _ => // no-op
}
}

View File

@ -33,7 +33,6 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
@ -749,8 +748,9 @@ class UnifiedLogTest {
records.batches.forEach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY
records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) {
override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult =
new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, false)
override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
}, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
filtered.flip()
@ -790,8 +790,9 @@ class UnifiedLogTest {
records.batches.forEach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.RETAIN_EMPTY
records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) {
override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult =
new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.RETAIN_EMPTY, true)
override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = false
}, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
filtered.flip()
@ -833,8 +834,9 @@ class UnifiedLogTest {
records.batches.forEach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY
records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) {
override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult =
new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, false)
override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
}, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
filtered.flip()

View File

@ -256,6 +256,7 @@ public class BatchBuilder<T> {
RecordBatch.NO_SEQUENCE,
false,
isControlBatch,
false,
leaderEpoch,
numRecords()
);