KAFKA-16780: Txn consumer exerts pressure on remote storage when collecting aborted txns (#17659)

- KIP-1058 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions)
- Unit and Integration tests added.

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Kamal Chandraprakash 2024-11-08 14:49:09 +05:30 committed by GitHub
parent 0049b967e5
commit b9976437e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 806 additions and 93 deletions

View File

@ -584,13 +584,32 @@ public class RemoteLogManager implements Closeable {
int epochForOffset, int epochForOffset,
long offset) throws RemoteStorageException { long offset) throws RemoteStorageException {
Uuid topicId = topicIdByPartitionMap.get(topicPartition); Uuid topicId = topicIdByPartitionMap.get(topicPartition);
if (topicId == null) { if (topicId == null) {
throw new KafkaException("No topic id registered for topic partition: " + topicPartition); throw new KafkaException("No topic id registered for topic partition: " + topicPartition);
} }
return remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset); return remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset);
} }
/**
* Returns the next segment that may contain the aborted transaction entries. The search ensures that the returned
* segment offsets are greater than or equal to the given offset and in the same epoch.
* @param topicPartition topic partition to search
* @param epochForOffset the epoch
* @param offset the offset
* @return The next segment that contains the transaction index in the same epoch.
* @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata.
*/
public Optional<RemoteLogSegmentMetadata> fetchNextSegmentWithTxnIndex(TopicPartition topicPartition,
int epochForOffset,
long offset) throws RemoteStorageException {
Uuid topicId = topicIdByPartitionMap.get(topicPartition);
if (topicId == null) {
throw new KafkaException("No topic id registered for topic partition: " + topicPartition);
}
TopicIdPartition tpId = new TopicIdPartition(topicId, topicPartition);
return remoteLogMetadataManager.nextSegmentWithTxnIndex(tpId, epochForOffset, offset);
}
Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset)
throws RemoteStorageException, IOException { throws RemoteStorageException, IOException {
int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset); int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset);
@ -973,9 +992,10 @@ public class RemoteLogManager implements Closeable {
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(epochEntries.size()); Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset)); epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));
boolean isTxnIdxEmpty = segment.txnIndex().isEmpty();
RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset, RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
segment.largestTimestamp(), brokerId, time.milliseconds(), segment.log().sizeInBytes(), segment.largestTimestamp(), brokerId, time.milliseconds(), segment.log().sizeInBytes(),
segmentLeaderEpochs); segmentLeaderEpochs, isTxnIdxEmpty);
remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
@ -1036,7 +1056,8 @@ public class RemoteLogManager implements Closeable {
// Update the highest offset in remote storage for this partition's log so that the local log segments // Update the highest offset in remote storage for this partition's log so that the local log segments
// are not deleted before they are copied to remote storage. // are not deleted before they are copied to remote storage.
log.updateHighestOffsetInRemoteStorage(endOffset); log.updateHighestOffsetInRemoteStorage(endOffset);
logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); logger.info("Copied {} to remote storage with segment-id: {}",
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size(); long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size();
long segmentsLag = log.onlyLocalLogSegmentsCount() - 1; long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
@ -1740,7 +1761,10 @@ public class RemoteLogManager implements Closeable {
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList())); .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
long startTimeNs = time.nanoseconds();
collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log);
LOGGER.debug("Time taken to collect: {} aborted transactions for {} in {} ns", abortedTransactions.size(),
segmentMetadata, time.nanoseconds() - startTimeNs);
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records, fetchInfo.records,
@ -1748,29 +1772,51 @@ public class RemoteLogManager implements Closeable {
Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions))); Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions)));
} }
/**
* Collects the aborted transaction entries from the current and subsequent segments until the upper bound offset.
* Note that the accumulated aborted transaction entries might contain duplicates as it collects the entries across
* segments. We are relying on the client to discard the duplicates.
* @param startOffset The start offset of the fetch request.
* @param upperBoundOffset The upper bound offset of the fetch request.
* @param segmentMetadata The current segment metadata.
* @param accumulator The accumulator to collect the aborted transactions.
* @param log The unified log instance.
* @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata.
*/
private void collectAbortedTransactions(long startOffset, private void collectAbortedTransactions(long startOffset,
long upperBoundOffset, long upperBoundOffset,
RemoteLogSegmentMetadata segmentMetadata, RemoteLogSegmentMetadata segmentMetadata,
Consumer<List<AbortedTxn>> accumulator, Consumer<List<AbortedTxn>> accumulator,
UnifiedLog log) throws RemoteStorageException { UnifiedLog log) throws RemoteStorageException {
// Search in remote segments first. TopicPartition tp = segmentMetadata.topicIdPartition().topicPartition();
Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata); boolean isSearchComplete = false;
while (nextSegmentMetadataOpt.isPresent()) { LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache().getOrElse(null);
Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex()); Optional<RemoteLogSegmentMetadata> currentMetadataOpt = Optional.of(segmentMetadata);
while (!isSearchComplete && currentMetadataOpt.isPresent()) {
RemoteLogSegmentMetadata currentMetadata = currentMetadataOpt.get();
Optional<TransactionIndex> txnIndexOpt = getTransactionIndex(currentMetadata);
if (txnIndexOpt.isPresent()) { if (txnIndexOpt.isPresent()) {
TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset); TransactionIndex txnIndex = txnIndexOpt.get();
TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions); accumulator.accept(searchResult.abortedTransactions);
if (searchResult.isComplete) { isSearchComplete = searchResult.isComplete;
// Return immediately when the search result is complete, it does not need to go through local log segments. }
return; if (!isSearchComplete) {
} currentMetadataOpt = findNextSegmentWithTxnIndex(tp, currentMetadata.endOffset() + 1, leaderEpochCache);
} }
nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
} }
// Search in local segments // Search in local segments
collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, log.logSegments().iterator()); if (!isSearchComplete) {
collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, log.logSegments().iterator());
}
}
private Optional<TransactionIndex> getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
return !currentMetadata.isTxnIdxEmpty() ?
// `ofNullable` is needed for backward compatibility for old events that were stored in the
// `__remote_log_metadata` topic. The old events will return the `txnIdxEmpty` as false, but the
// transaction index may not exist in the remote storage.
Optional.ofNullable(indexCache.getIndexEntry(currentMetadata).txnIndex()) : Optional.empty();
} }
private void collectAbortedTransactionInLocalSegments(long startOffset, private void collectAbortedTransactionInLocalSegments(long startOffset,
@ -1803,6 +1849,44 @@ public class RemoteLogManager implements Closeable {
: Optional.empty(); : Optional.empty();
} }
/**
* Returns the next segment metadata that contains the aborted transaction entries from the given offset.
* Note that the search starts from the given (offset-for-epoch, offset) pair, when there are no segments contains
* the transaction index in that epoch, then it proceeds to the next epoch (next-epoch, epoch-start-offset)
* and the search ends when the segment metadata is found or the leader epoch cache is exhausted.
* Note that the returned segment metadata may or may not contain the transaction index.
* Visible for testing
* @param tp The topic partition.
* @param offset The offset to start the search.
* @param leaderEpochCache The leader epoch file cache, this could be null.
* @return The next segment metadata that contains the transaction index. The transaction index may or may not exist
* in that segment metadata which depends on the RLMM plugin implementation. The caller of this method should handle
* for both the cases.
* @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata.
*/
Optional<RemoteLogSegmentMetadata> findNextSegmentWithTxnIndex(TopicPartition tp,
long offset,
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
if (leaderEpochCache == null) {
return Optional.empty();
}
OptionalInt initialEpochOpt = leaderEpochCache.epochForOffset(offset);
if (initialEpochOpt.isEmpty()) {
return Optional.empty();
}
int initialEpoch = initialEpochOpt.getAsInt();
for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) {
if (epochEntry.epoch >= initialEpoch) {
long startOffset = Math.max(epochEntry.startOffset, offset);
Optional<RemoteLogSegmentMetadata> metadataOpt = fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset);
if (metadataOpt.isPresent()) {
return metadataOpt;
}
}
}
return Optional.empty();
}
// Visible for testing // Visible for testing
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
RecordBatch nextBatch; RecordBatch nextBatch;

View File

@ -1414,6 +1414,92 @@ public class RemoteLogManagerTest {
.remoteLogSegmentMetadata(eq(followerTopicIdPartition), anyInt(), anyLong()); .remoteLogSegmentMetadata(eq(followerTopicIdPartition), anyInt(), anyLong());
} }
@Test
public void testFetchNextSegmentWithTxnIndex() throws RemoteStorageException {
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
remoteLogManager.fetchNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, 100L);
remoteLogManager.fetchNextSegmentWithTxnIndex(followerTopicIdPartition.topicPartition(), 20, 200L);
verify(remoteLogMetadataManager)
.nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), anyInt(), anyLong());
verify(remoteLogMetadataManager)
.nextSegmentWithTxnIndex(eq(followerTopicIdPartition), anyInt(), anyLong());
}
@Test
public void testFindNextSegmentWithTxnIndex() throws RemoteStorageException {
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt()))
.thenReturn(Optional.of(0L));
when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class), anyInt(), anyLong()))
.thenAnswer(ans -> {
TopicIdPartition topicIdPartition = ans.getArgument(0);
int leaderEpoch = ans.getArgument(1);
long offset = ans.getArgument(2);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
Map<Integer, Long> leaderEpochs = new TreeMap<>();
leaderEpochs.put(leaderEpoch, offset);
RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(segmentId,
offset, offset + 100, time.milliseconds(), 0, time.milliseconds(), 1024, leaderEpochs, true);
return Optional.of(metadata);
});
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
// For offset-10, epoch is 0.
remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, cache);
verify(remoteLogMetadataManager)
.nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0), eq(10L));
}
@Test
public void testFindNextSegmentWithTxnIndexTraversesNextEpoch() throws RemoteStorageException {
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt()))
.thenReturn(Optional.of(0L));
when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class), anyInt(), anyLong()))
.thenAnswer(ans -> {
TopicIdPartition topicIdPartition = ans.getArgument(0);
int leaderEpoch = ans.getArgument(1);
long offset = ans.getArgument(2);
Optional<RemoteLogSegmentMetadata> metadataOpt = Optional.empty();
if (leaderEpoch == 2) {
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
Map<Integer, Long> leaderEpochs = new TreeMap<>();
leaderEpochs.put(leaderEpoch, offset);
RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(segmentId,
offset, offset + 100, time.milliseconds(), 0, time.milliseconds(), 1024, leaderEpochs, true);
metadataOpt = Optional.of(metadata);
}
return metadataOpt;
});
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
// For offset-10, epoch is 0.
// 1. For epoch 0 and 1, it returns empty and
// 2. For epoch 2, it returns the segment metadata.
remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, cache);
verify(remoteLogMetadataManager)
.nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0), eq(10L));
verify(remoteLogMetadataManager)
.nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(1), eq(100L));
verify(remoteLogMetadataManager)
.nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(2), eq(200L));
}
@Test @Test
void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() { void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
remoteLogManager.startup(); remoteLogManager.startup();

View File

@ -209,4 +209,25 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
* @return Total size of the log stored in remote storage in bytes. * @return Total size of the log stored in remote storage in bytes.
*/ */
long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException; long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException;
/**
* Returns the next segment metadata that contains the aborted transaction entries for the given topic partition, epoch and offset.
* <ul>
* <li>The default implementation returns the segment metadata that matches the given epoch and offset
* irrespective of the presence of the transaction index.</li>
* <li>The custom implementation can optimize by returning the next segment metadata that contains the txn index
* in the given epoch. If there are no segments with txn index in the given epoch, then return empty.</li>
* </ul>
* @param topicIdPartition topic partition to search for.
* @param epoch leader epoch for the given offset.
* @param offset offset
* @return The next segment metadata. The transaction index may or may not exist in the returned segment metadata
* which depends on the RLMM plugin implementation. The caller of this method handles for both the cases.
* @throws RemoteStorageException if there are any storage related errors occurred.
*/
default Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
int epoch,
long offset) throws RemoteStorageException {
return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
}
} }

View File

@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
*/ */
private final RemoteLogSegmentState state; private final RemoteLogSegmentState state;
/**
* Indicates whether the transaction index is empty for this segment.
*/
private final boolean txnIdxEmpty;
/** /**
* Creates an instance with the given metadata of remote log segment. * Creates an instance with the given metadata of remote log segment.
* <p> * <p>
@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
Optional<CustomMetadata> customMetadata, Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state, RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) { Map<Integer, Long> segmentLeaderEpochs) {
this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes,
customMetadata, state, segmentLeaderEpochs, false);
}
/**
* Creates an instance with the given metadata of remote log segment.
* <p>
* {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
* then it should have an entry with epoch mapping to start-offset of this segment.
*
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment (inclusive).
* @param endOffset End offset of this segment (inclusive).
* @param maxTimestampMs Maximum timestamp in milli seconds in this segment.
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
* @param customMetadata Custom metadata.
* @param state State of the respective segment of remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
* @param txnIdxEmpty True if the transaction index is empty, false otherwise.
*/
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
long startOffset,
long endOffset,
long maxTimestampMs,
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs,
boolean txnIdxEmpty) {
super(brokerId, eventTimestampMs); super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null"); this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
this.state = Objects.requireNonNull(state, "state can not be null"); this.state = Objects.requireNonNull(state, "state can not be null");
@ -128,6 +166,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
} }
this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new TreeMap<>(segmentLeaderEpochs)); this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new TreeMap<>(segmentLeaderEpochs));
this.txnIdxEmpty = txnIdxEmpty;
} }
/** /**
@ -164,6 +203,34 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
segmentLeaderEpochs); segmentLeaderEpochs);
} }
/**
* Creates an instance with the given metadata of remote log segment and its state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}.
* <p>
* {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
* then it should have an entry with epoch mapping to start-offset of this segment.
*
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment (inclusive).
* @param endOffset End offset of this segment (inclusive).
* @param maxTimestampMs Maximum timestamp in this segment
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
* @param segmentLeaderEpochs leader epochs occurred within this segment
* @param txnIdxEmpty True if the transaction index is empty, false otherwise.
*/
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
long startOffset,
long endOffset,
long maxTimestampMs,
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
Map<Integer, Long> segmentLeaderEpochs,
boolean txnIdxEmpty) {
this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes,
Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED, segmentLeaderEpochs, txnIdxEmpty);
}
/** /**
* @return unique id of this segment. * @return unique id of this segment.
@ -227,6 +294,14 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
return state; return state;
} }
/**
* If true indicates that the transaction index is empty.
* @return True if the Transaction index is empty, false otherwise.
*/
public boolean isTxnIdxEmpty() {
return txnIdxEmpty;
}
/** /**
* Creates a new RemoteLogSegmentMetadata applying the given {@code rlsmUpdate} on this instance. This method will * Creates a new RemoteLogSegmentMetadata applying the given {@code rlsmUpdate} on this instance. This method will
* not update this instance. * not update this instance.
@ -241,7 +316,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset, return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
endOffset, maxTimestampMs, rlsmUpdate.brokerId(), rlsmUpdate.eventTimestampMs(), endOffset, maxTimestampMs, rlsmUpdate.brokerId(), rlsmUpdate.eventTimestampMs(),
segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs); segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs, txnIdxEmpty);
} }
@Override @Override
@ -266,13 +341,14 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
&& Objects.equals(customMetadata, that.customMetadata) && Objects.equals(customMetadata, that.customMetadata)
&& state == that.state && state == that.state
&& eventTimestampMs() == that.eventTimestampMs() && eventTimestampMs() == that.eventTimestampMs()
&& brokerId() == that.brokerId(); && brokerId() == that.brokerId()
&& txnIdxEmpty == that.txnIdxEmpty;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId(), maxTimestampMs, return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId(), maxTimestampMs,
eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state); eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state, txnIdxEmpty);
} }
@Override @Override
@ -288,6 +364,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
", segmentSizeInBytes=" + segmentSizeInBytes + ", segmentSizeInBytes=" + segmentSizeInBytes +
", customMetadata=" + customMetadata + ", customMetadata=" + customMetadata +
", state=" + state + ", state=" + state +
", txnIdxEmpty=" + txnIdxEmpty +
'}'; '}';
} }

View File

@ -79,6 +79,11 @@ public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
return 0; return 0;
} }
@Override
public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) {
return Optional.empty();
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
} }

View File

@ -106,6 +106,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetada
return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, leaderEpoch)); return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, leaderEpoch));
} }
@Override
public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException {
return withClassLoader(() -> delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
}
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
withClassLoader(() -> { withClassLoader(() -> {

View File

@ -125,29 +125,45 @@ public class RemoteLogMetadataCache {
* @return the requested remote log segment metadata if it exists. * @return the requested remote log segment metadata if it exists.
*/ */
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int leaderEpoch, long offset) { public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int leaderEpoch, long offset) {
RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch); RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch, offset);
long epochEndOffset = -1L;
if (remoteLogLeaderEpochState == null) { if (metadata != null) {
return Optional.empty(); // Check whether the given offset with leaderEpoch exists in this segment.
// Check for epoch's offset boundaries with in this segment.
// 1. Get the next epoch's start offset -1 if exists
// 2. If no next epoch exists, then segment end offset can be considered as epoch's relative end offset.
Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : metadata.endOffset();
} }
// Look for floor entry as the given offset may exist in this entry.
RemoteLogSegmentId remoteLogSegmentId = remoteLogLeaderEpochState.floorEntry(offset);
if (remoteLogSegmentId == null) {
// If the offset is lower than the minimum offset available in metadata then return empty.
return Optional.empty();
}
RemoteLogSegmentMetadata metadata = idToSegmentMetadata.get(remoteLogSegmentId);
// Check whether the given offset with leaderEpoch exists in this segment.
// Check for epoch's offset boundaries with in this segment.
// 1. Get the next epoch's start offset -1 if exists
// 2. If no next epoch exists, then segment end offset can be considered as epoch's relative end offset.
Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : metadata.endOffset();
// Return empty when target offset > epoch's end offset. // Return empty when target offset > epoch's end offset.
return offset > epochEndOffset ? Optional.empty() : Optional.of(metadata); return offset > epochEndOffset ? Optional.empty() : Optional.ofNullable(metadata);
}
public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int leaderEpoch, long offset) {
boolean txnIdxEmpty = true;
Optional<RemoteLogSegmentMetadata> metadataOpt = remoteLogSegmentMetadata(leaderEpoch, offset);
while (metadataOpt.isPresent() && txnIdxEmpty) {
txnIdxEmpty = metadataOpt.get().isTxnIdxEmpty();
if (txnIdxEmpty) { // If txn index is empty, then look for next segment.
metadataOpt = remoteLogSegmentMetadata(leaderEpoch, metadataOpt.get().endOffset() + 1);
}
}
return txnIdxEmpty ? Optional.empty() : metadataOpt;
}
private RemoteLogSegmentMetadata getSegmentMetadata(int leaderEpoch, long offset) {
RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch);
if (remoteLogLeaderEpochState != null) {
// Look for floor entry as the given offset may exist in this entry.
RemoteLogSegmentId remoteLogSegmentId = remoteLogLeaderEpochState.floorEntry(offset);
if (remoteLogSegmentId != null) {
return idToSegmentMetadata.get(remoteLogSegmentId);
} else {
log.warn("No remote segment found for leaderEpoch: {}, offset: {}", leaderEpoch, offset);
}
}
// If the offset is lower than the minimum offset available in metadata then return null.
return null;
} }
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate) public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)

View File

@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
*/ */
private final RemoteLogSegmentState state; private final RemoteLogSegmentState state;
/**
* Indicates whether the transaction index is empty for this segment.
*/
private final boolean txnIdxEmpty;
/** /**
* Creates an instance with the given metadata of remote log segment. * Creates an instance with the given metadata of remote log segment.
* <p> * <p>
@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
Optional<CustomMetadata> customMetadata, Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state, RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) { Map<Integer, Long> segmentLeaderEpochs) {
this(segmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes,
customMetadata, state, segmentLeaderEpochs, false);
}
/**
* Creates an instance with the given metadata of remote log segment.
* <p>
* {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
* then it should have an entry with epoch mapping to start-offset of this segment.
*
* @param segmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment (inclusive).
* @param endOffset End offset of this segment (inclusive).
* @param maxTimestampMs Maximum timestamp in milliseconds in this segment.
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milliseconds at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
* @param customMetadata Custom metadata.
* @param state State of the respective segment of remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
* @param txnIdxEmpty true if the transaction index is empty, false otherwise.
*/
public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
long startOffset,
long endOffset,
long maxTimestampMs,
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs,
boolean txnIdxEmpty) {
super(brokerId, eventTimestampMs); super(brokerId, eventTimestampMs);
this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId can not be null"); this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId can not be null");
this.state = Objects.requireNonNull(state, "state can not be null"); this.state = Objects.requireNonNull(state, "state can not be null");
@ -114,6 +152,7 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
this.maxTimestampMs = maxTimestampMs; this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes; this.segmentSizeInBytes = segmentSizeInBytes;
this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null"); this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
this.txnIdxEmpty = txnIdxEmpty;
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) { if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty"); throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
@ -125,7 +164,7 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) { public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) {
return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(), return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(),
metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(), metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(),
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs() metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs(), metadata.isTxnIdxEmpty()
); );
} }
@ -191,6 +230,10 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
return state; return state;
} }
public boolean isTxnIdxEmpty() {
return txnIdxEmpty;
}
@Override @Override
public TopicIdPartition topicIdPartition() { public TopicIdPartition topicIdPartition() {
throw new UnsupportedOperationException("This metadata does not have topic partition with it."); throw new UnsupportedOperationException("This metadata does not have topic partition with it.");
@ -208,12 +251,13 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
&& Objects.equals(customMetadata, that.customMetadata) && Objects.equals(customMetadata, that.customMetadata)
&& Objects.equals(segmentId, that.segmentId) && Objects.equals(segmentId, that.segmentId)
&& Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs)
&& state == that.state; && state == that.state
&& txnIdxEmpty == that.txnIdxEmpty;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state); return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state, txnIdxEmpty);
} }
@Override @Override
@ -227,6 +271,7 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
", segmentSizeInBytes=" + segmentSizeInBytes + ", segmentSizeInBytes=" + segmentSizeInBytes +
", customMetadata=" + customMetadata + ", customMetadata=" + customMetadata +
", state=" + state + ", state=" + state +
", txnIdxEmpty=" + txnIdxEmpty +
'}'; '}';
} }
} }

View File

@ -55,7 +55,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
@Override @Override
public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata); log.debug("Adding remote log segment: {}", remoteLogSegmentMetadata);
final RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); final RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition(); TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition();
@ -71,7 +71,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
@Override @Override
public void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) { public void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) {
log.debug("Updating remote log segment: [{}]", rlsmUpdate); log.debug("Updating remote log segment: {}", rlsmUpdate);
RemoteLogSegmentId remoteLogSegmentId = rlsmUpdate.remoteLogSegmentId(); RemoteLogSegmentId remoteLogSegmentId = rlsmUpdate.remoteLogSegmentId();
TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition(); TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition();
RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
@ -88,7 +88,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
@Override @Override
public void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) { public void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
log.debug("Received partition delete state with: [{}]", remotePartitionDeleteMetadata); log.debug("Received partition delete state with: {}", remotePartitionDeleteMetadata);
TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topicIdPartition(); TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topicIdPartition();
idToPartitionDeleteMetadata.put(topicIdPartition, remotePartitionDeleteMetadata); idToPartitionDeleteMetadata.put(topicIdPartition, remotePartitionDeleteMetadata);
@ -108,30 +108,25 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments(); return getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
} }
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch); return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
} }
private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition) private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
throws RemoteResourceNotFoundException { throws RemoteResourceNotFoundException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
if (remoteLogMetadataCache == null) { if (remoteLogMetadataCache == null) {
throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition);
} }
if (!remoteLogMetadataCache.isInitialized()) { if (!remoteLogMetadataCache.isInitialized()) {
// Throwing a retriable ReplicaNotAvailableException here for clients retry. // Throwing a retriable ReplicaNotAvailableException here for clients retry.
throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition);
} }
return remoteLogMetadataCache; return remoteLogMetadataCache;
} }
@ -139,15 +134,17 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
long offset, long offset,
int epochForOffset) int epochForOffset)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset); return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset);
} }
public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
int epoch,
long offset) throws RemoteStorageException {
return getRemoteLogMetadataCache(topicIdPartition).nextSegmentWithTxnIndex(epoch, offset);
}
public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition, public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
int leaderEpoch) throws RemoteStorageException { int leaderEpoch) throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch); return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
} }

View File

@ -356,6 +356,17 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
return remoteLogSize; return remoteLogSize;
} }
@Override
public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException {
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset);
} finally {
lock.readLock().unlock();
}
}
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null."); Objects.requireNonNull(configs, "configs can not be null.");

View File

@ -40,7 +40,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetad
.setMaxTimestampMs(segmentMetadata.maxTimestampMs()) .setMaxTimestampMs(segmentMetadata.maxTimestampMs())
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes()) .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs())) .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
.setRemoteLogSegmentState(segmentMetadata.state().id()); .setRemoteLogSegmentState(segmentMetadata.state().id())
.setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty());
segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value())); segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion()); return new ApiMessageAndVersion(record, record.highestSupportedVersion());
@ -72,7 +73,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetad
record.segmentSizeInBytes(), record.segmentSizeInBytes(),
customMetadata, customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()), RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
segmentLeaderEpochs); segmentLeaderEpochs,
record.txnIndexEmpty());
} }
} }

View File

@ -44,7 +44,8 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans
.setMaxTimestampMs(segmentMetadata.maxTimestampMs()) .setMaxTimestampMs(segmentMetadata.maxTimestampMs())
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes()) .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata)) .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
.setRemoteLogSegmentState(segmentMetadata.state().id()); .setRemoteLogSegmentState(segmentMetadata.state().id())
.setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty());
segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value())); segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion()); return new ApiMessageAndVersion(record, record.highestSupportedVersion());
@ -83,7 +84,7 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans
new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(), new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(),
record.maxTimestampMs(), record.brokerId(), record.maxTimestampMs(), record.brokerId(),
record.eventTimestampMs(), record.segmentSizeInBytes(), record.eventTimestampMs(), record.segmentSizeInBytes(),
segmentLeaderEpochs); segmentLeaderEpochs, record.txnIndexEmpty());
RemoteLogSegmentMetadataUpdate rlsmUpdate RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(), = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(),
customMetadata, customMetadata,

View File

@ -194,6 +194,14 @@ public class TransactionIndex implements Closeable {
} }
} }
/**
* Check if the index is empty.
* @return `true` if the index is empty (or) when underlying file doesn't exists, `false` otherwise.
*/
public boolean isEmpty() {
return !iterable().iterator().hasNext();
}
private FileChannel openChannel() throws IOException { private FileChannel openChannel() throws IOException {
FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE,
StandardOpenOption.READ, StandardOpenOption.WRITE); StandardOpenOption.READ, StandardOpenOption.WRITE);

View File

@ -129,6 +129,14 @@
"type": "int8", "type": "int8",
"versions": "0+", "versions": "0+",
"about": "State identifier of the remote log segment, which is RemoteLogSegmentState.id()." "about": "State identifier of the remote log segment, which is RemoteLogSegmentState.id()."
},
{
"name": "TxnIndexEmpty",
"type": "bool",
"versions": "0+",
"about": "Flag to indicate if the transaction index is empty.",
"taggedVersions": "0+",
"tag": 0
} }
] ]
} }

View File

@ -95,6 +95,14 @@
"type": "int8", "type": "int8",
"versions": "0+", "versions": "0+",
"about": "State of the remote log segment" "about": "State of the remote log segment"
},
{
"name": "TxnIndexEmpty",
"type": "bool",
"versions": "0+",
"about": "Flag to indicate if the transaction index is empty.",
"taggedVersions": "0+",
"tag": 0
} }
] ]
} }

View File

@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest {
Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10])); Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED,
segLeaderEpochs); segLeaderEpochs, true);
byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>( ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
@ -65,7 +65,7 @@ public class RemoteLogMetadataFormatterTest {
"startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, " + "startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, " +
"eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " + "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " +
"customMetadata=Optional[CustomMetadata{10 bytes}], " + "customMetadata=Optional[CustomMetadata{10 bytes}], " +
"state=COPY_SEGMENT_STARTED}\n", "state=COPY_SEGMENT_STARTED, txnIdxEmpty=true}\n",
TOPIC_ID, SEGMENT_ID); TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) { PrintStream ps = new PrintStream(baos)) {

View File

@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
class RemoteLogSegmentMetadataSnapshotTransformTest { class RemoteLogSegmentMetadataSnapshotTransformTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("parameters") @MethodSource("parameters")
void testToAndFromMessage(Optional<CustomMetadata> customMetadata) { void testToAndFromMessage(Optional<CustomMetadata> customMetadata, boolean isTxnIdxEmpty) {
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(); Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
segmentLeaderEpochs.put(0, 0L); segmentLeaderEpochs.put(0, 0L);
RemoteLogSegmentMetadataSnapshot snapshot = new RemoteLogSegmentMetadataSnapshot( RemoteLogSegmentMetadataSnapshot snapshot = new RemoteLogSegmentMetadataSnapshot(
@ -43,7 +43,8 @@ class RemoteLogSegmentMetadataSnapshotTransformTest {
0L, 100L, -1L, 0, 0, 1234, 0L, 100L, -1L, 0, 0, 1234,
customMetadata, customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
segmentLeaderEpochs segmentLeaderEpochs,
isTxnIdxEmpty
); );
RemoteLogSegmentMetadataSnapshotTransform transform = new RemoteLogSegmentMetadataSnapshotTransform(); RemoteLogSegmentMetadataSnapshotTransform transform = new RemoteLogSegmentMetadataSnapshotTransform();
@ -51,11 +52,11 @@ class RemoteLogSegmentMetadataSnapshotTransformTest {
assertEquals(snapshot, transform.fromApiMessageAndVersion(message)); assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
} }
private static Stream<Object> parameters() { private static Stream<Object[]> parameters() {
return Stream.of( return Stream.of(
Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), new Object[]{Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), true},
Optional.of(new CustomMetadata(new byte[0])), new Object[]{Optional.of(new CustomMetadata(new byte[0])), false},
Optional.empty() new Object[]{Optional.empty(), true}
); );
} }
} }

View File

@ -202,4 +202,24 @@ public class TransactionIndexTest {
index.deleteIfExists(); index.deleteIfExists();
assertFalse(file.exists()); assertFalse(file.exists());
} }
@Test
public void testIsEmptyWhenFileDoesNotExist() throws IOException {
File nonExistentFile = TestUtils.tempFile();
assertTrue(nonExistentFile.delete());
try (TransactionIndex testIndex = new TransactionIndex(0, nonExistentFile)) {
assertTrue(testIndex.isEmpty());
}
}
@Test
public void testIsEmptyWhenFileIsEmpty() {
assertTrue(index.isEmpty());
}
@Test
public void testIsEmptyWhenFileIsNotEmpty() throws IOException {
index.append(new AbortedTxn(0L, 0, 10, 2));
assertFalse(index.isEmpty());
}
} }

View File

@ -53,6 +53,7 @@ import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
import org.apache.kafka.tiered.storage.specs.ProducableSpec; import org.apache.kafka.tiered.storage.specs.ProducableSpec;
import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec; import org.apache.kafka.tiered.storage.specs.TopicSpec;
@ -228,10 +229,17 @@ public final class TieredStorageTestBuilder {
public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer fromBroker, public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer fromBroker,
String topic, String topic,
Integer partition, Integer partition,
Integer remoteFetchRequestCount) { Integer segmentFetchRequestCount) {
return expectFetchFromTieredStorage(fromBroker, topic, partition, new RemoteFetchCount(segmentFetchRequestCount));
}
public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer fromBroker,
String topic,
Integer partition,
RemoteFetchCount remoteFetchRequestCount) {
TopicPartition topicPartition = new TopicPartition(topic, partition); TopicPartition topicPartition = new TopicPartition(topic, partition);
assertTrue(partition >= 0, "Partition must be >= 0"); assertTrue(partition >= 0, "Partition must be >= 0");
assertTrue(remoteFetchRequestCount >= 0, "Expected fetch count from tiered storage must be >= 0"); assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0, "Expected fetch count from tiered storage must be >= 0");
assertFalse(fetchables.containsKey(topicPartition), "Consume already in progress for " + topicPartition); assertFalse(fetchables.containsKey(topicPartition), "Consume already in progress for " + topicPartition);
fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount)); fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount));
return this; return this;
@ -371,7 +379,7 @@ public final class TieredStorageTestBuilder {
private void createConsumeAction() { private void createConsumeAction() {
if (!consumables.isEmpty()) { if (!consumables.isEmpty()) {
consumables.forEach((topicPartition, consumableSpec) -> { consumables.forEach((topicPartition, consumableSpec) -> {
FetchableSpec fetchableSpec = fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, 0)); FetchableSpec fetchableSpec = fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, new RemoteFetchCount(0)));
RemoteFetchSpec remoteFetchSpec = new RemoteFetchSpec(fetchableSpec.getSourceBrokerId(), topicPartition, RemoteFetchSpec remoteFetchSpec = new RemoteFetchSpec(fetchableSpec.getSourceBrokerId(), topicPartition,
fetchableSpec.getFetchCount()); fetchableSpec.getFetchCount());
ConsumeAction action = new ConsumeAction(topicPartition, consumableSpec.getFetchOffset(), ConsumeAction action = new ConsumeAction(topicPartition, consumableSpec.getFetchOffset(),

View File

@ -92,12 +92,16 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
protected abstract void writeTestSpecifications(TieredStorageTestBuilder builder); protected abstract void writeTestSpecifications(TieredStorageTestBuilder builder);
protected void overrideConsumerConfig(Properties consumerConfig) {
}
@BeforeEach @BeforeEach
@Override @Override
public void setUp(TestInfo testInfo) { public void setUp(TestInfo testInfo) {
testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault()); testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath(); storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath();
super.setUp(testInfo); super.setUp(testInfo);
overrideConsumerConfig(consumerConfig());
context = new TieredStorageTestContext(this); context = new TieredStorageTestContext(this);
} }

View File

@ -25,20 +25,26 @@ import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo; import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
public final class ConsumeAction implements TieredStorageTestAction { public final class ConsumeAction implements TieredStorageTestAction {
@ -75,6 +81,9 @@ public final class ConsumeAction implements TieredStorageTestAction {
// type has yet to happen. // type has yet to happen.
LocalTieredStorageHistory history = context.tieredStorageHistory(remoteFetchSpec.getSourceBrokerId()); LocalTieredStorageHistory history = context.tieredStorageHistory(remoteFetchSpec.getSourceBrokerId());
Optional<LocalTieredStorageEvent> latestEventSoFar = history.latestEvent(FETCH_SEGMENT, topicPartition); Optional<LocalTieredStorageEvent> latestEventSoFar = history.latestEvent(FETCH_SEGMENT, topicPartition);
Optional<LocalTieredStorageEvent> latestOffsetIdxEventSoFar = history.latestEvent(FETCH_OFFSET_INDEX, topicPartition);
Optional<LocalTieredStorageEvent> latestTimeIdxEventSoFar = history.latestEvent(FETCH_TIME_INDEX, topicPartition);
Optional<LocalTieredStorageEvent> latestTxnIdxEventSoFar = history.latestEvent(FETCH_TRANSACTION_INDEX, topicPartition);
// Records are consumed here // Records are consumed here
List<ConsumerRecord<String, String>> consumedRecords = List<ConsumerRecord<String, String>> consumedRecords =
@ -119,16 +128,61 @@ public final class ConsumeAction implements TieredStorageTestAction {
assertThat(storedRecords, correspondTo(readRecords, topicPartition, serde, serde)); assertThat(storedRecords, correspondTo(readRecords, topicPartition, serde, serde));
// (B) Assessment of the interactions between the source broker and the second-tier storage. // (B) Assessment of the interactions between the source broker and the second-tier storage.
List<LocalTieredStorageEvent> events = history.getEvents(FETCH_SEGMENT, topicPartition); for (LocalTieredStorageEvent.EventType eventType : Arrays.asList(FETCH_SEGMENT, FETCH_OFFSET_INDEX, FETCH_TIME_INDEX, FETCH_TRANSACTION_INDEX)) {
List<LocalTieredStorageEvent> eventsInScope = latestEventSoFar Optional<LocalTieredStorageEvent> latestEvent;
.map(latestEvent -> switch (eventType) {
events.stream().filter(event -> event.isAfter(latestEvent)).collect(Collectors.toList())) case FETCH_SEGMENT:
.orElse(events); latestEvent = latestEventSoFar;
break;
case FETCH_OFFSET_INDEX:
latestEvent = latestOffsetIdxEventSoFar;
break;
case FETCH_TIME_INDEX:
latestEvent = latestTimeIdxEventSoFar;
break;
case FETCH_TRANSACTION_INDEX:
latestEvent = latestTxnIdxEventSoFar;
break;
default:
latestEvent = Optional.empty();
}
assertEquals(remoteFetchSpec.getCount(), eventsInScope.size(), List<LocalTieredStorageEvent> events = history.getEvents(eventType, topicPartition);
"Number of fetch requests from broker " + remoteFetchSpec.getSourceBrokerId() + " to the " + List<LocalTieredStorageEvent> eventsInScope = latestEvent
"tier storage does not match the expected value for topic-partition " .map(e -> events.stream().filter(event -> event.isAfter(e)).collect(Collectors.toList()))
+ remoteFetchSpec.getTopicPartition()); .orElse(events);
RemoteFetchCount remoteFetchCount = remoteFetchSpec.getRemoteFetchCount();
RemoteFetchCount.FetchCountAndOp expectedCountAndOp;
switch (eventType) {
case FETCH_SEGMENT:
expectedCountAndOp = remoteFetchCount.getSegmentFetchCountAndOp();
break;
case FETCH_OFFSET_INDEX:
expectedCountAndOp = remoteFetchCount.getOffsetIdxFetchCountAndOp();
break;
case FETCH_TIME_INDEX:
expectedCountAndOp = remoteFetchCount.getTimeIdxFetchCountAndOp();
break;
case FETCH_TRANSACTION_INDEX:
expectedCountAndOp = remoteFetchCount.getTxnIdxFetchCountAndOp();
break;
default:
expectedCountAndOp = new RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO);
}
String message = String.format("Number of %s requests from broker %d to the tier storage does not match the expected value for topic-partition %s",
eventType, remoteFetchSpec.getSourceBrokerId(), remoteFetchSpec.getTopicPartition());
if (expectedCountAndOp.getCount() != -1) {
if (expectedCountAndOp.getOperationType() == RemoteFetchCount.OperationType.EQUALS_TO) {
assertEquals(expectedCountAndOp.getCount(), eventsInScope.size(), message);
} else if (expectedCountAndOp.getOperationType() == RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
assertTrue(eventsInScope.size() <= expectedCountAndOp.getCount(), message);
} else {
assertTrue(eventsInScope.size() >= expectedCountAndOp.getCount(), message);
}
}
}
} }
@Override @Override
@ -138,5 +192,6 @@ public final class ConsumeAction implements TieredStorageTestAction {
output.println(" fetch-offset = " + fetchOffset); output.println(" fetch-offset = " + fetchOffset);
output.println(" expected-record-count = " + expectedTotalCount); output.println(" expected-record-count = " + expectedTotalCount);
output.println(" expected-record-from-tiered-storage = " + expectedFromSecondTierCount); output.println(" expected-record-from-tiered-storage = " + expectedFromSecondTierCount);
output.println(" remote-fetch-spec = " + remoteFetchSpec);
} }
} }

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp;
import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.EQUALS_TO;
import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
/**
* Test Cases:
* Elementary offloads and fetches from tiered storage using consumer with read_committed isolation level.
*/
public final class OffloadAndTxnConsumeFromLeaderTest extends TieredStorageTestHarness {
/**
* Cluster of one broker
* @return number of brokers in the cluster
*/
@Override
public int brokerCount() {
return 1;
}
@Override
public Properties overridingProps() {
Properties props = super.overridingProps();
// Configure the remote-log index cache size to hold one entry to simulate eviction of cached index entries.
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "1");
return props;
}
@Override
protected void overrideConsumerConfig(Properties consumerConfig) {
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker = 0;
final String topicA = "topicA";
final Integer p0 = 0;
final Integer partitionCount = 1;
final Integer replicationFactor = 1;
final Integer oneBatchPerSegment = 1;
final Map<Integer, List<Integer>> replicaAssignment = null;
final boolean enableRemoteLogStorage = true;
builder
.createTopic(topicA, partitionCount, replicationFactor, oneBatchPerSegment, replicaAssignment,
enableRemoteLogStorage)
.expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 3, new KeyValueSpec("k3", "v3"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 4, new KeyValueSpec("k4", "v4"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 5, new KeyValueSpec("k5", "v5"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 6L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"),
new KeyValueSpec("k5", "v5"), new KeyValueSpec("k6", "v6"))
// When reading with transactional consumer, the consecutive remote fetch indexes are fetched until the
// LSO found is higher than the fetch-offset.
// summation(n) = (n * (n + 1)) / 2
// Total number of uploaded remote segments = 6. Total number of index fetches = (6 * (6 + 1)) / 2 = 21
// Note that we skip the index fetch when the txn-index is empty, so the effective index fetch count
// should be same as the segment count.
.expectFetchFromTieredStorage(broker, topicA, p0, getRemoteFetchCount())
.consume(topicA, p0, 0L, 7, 6);
}
private static RemoteFetchCount getRemoteFetchCount() {
FetchCountAndOp segmentFetchCountAndOp = new FetchCountAndOp(6, EQUALS_TO);
// RemoteIndexCache might evict the entries much before reaching the maximum size.
// To make the test deterministic, we are using the operation type as LESS_THAN_OR_EQUALS_TO which equals to the
// number of times the RemoteIndexCache gets accessed. The RemoteIndexCache gets accessed twice for each read.
FetchCountAndOp indexFetchCountAndOp = new FetchCountAndOp(12, LESS_THAN_OR_EQUALS_TO);
return new RemoteFetchCount(segmentFetchCountAndOp, indexFetchCountAndOp,
indexFetchCountAndOp, indexFetchCountAndOp);
}
}

View File

@ -21,10 +21,10 @@ import java.util.Objects;
public final class FetchableSpec { public final class FetchableSpec {
private final Integer sourceBrokerId; private final Integer sourceBrokerId;
private final Integer fetchCount; private final RemoteFetchCount fetchCount;
public FetchableSpec(Integer sourceBrokerId, public FetchableSpec(Integer sourceBrokerId,
Integer fetchCount) { RemoteFetchCount fetchCount) {
this.sourceBrokerId = sourceBrokerId; this.sourceBrokerId = sourceBrokerId;
this.fetchCount = fetchCount; this.fetchCount = fetchCount;
} }
@ -33,7 +33,7 @@ public final class FetchableSpec {
return sourceBrokerId; return sourceBrokerId;
} }
public Integer getFetchCount() { public RemoteFetchCount getFetchCount() {
return fetchCount; return fetchCount;
} }

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.specs;
import java.util.Objects;
public class RemoteFetchCount {
private FetchCountAndOp segmentFetchCountAndOp;
private FetchCountAndOp offsetIdxFetchCountAndOp = new FetchCountAndOp(-1);
private FetchCountAndOp timeIdxFetchCountAndOp = new FetchCountAndOp(-1);
private FetchCountAndOp txnIdxFetchCountAndOp = new FetchCountAndOp(-1);
public RemoteFetchCount(int segmentFetchCountAndOp) {
this.segmentFetchCountAndOp = new FetchCountAndOp(segmentFetchCountAndOp);
}
public RemoteFetchCount(int segmentFetchCountAndOp,
int offsetIdxFetchCountAndOp,
int timeIdxFetchCountAndOp,
int txnIdxFetchCountAndOp) {
this.segmentFetchCountAndOp = new FetchCountAndOp(segmentFetchCountAndOp);
this.offsetIdxFetchCountAndOp = new FetchCountAndOp(offsetIdxFetchCountAndOp);
this.timeIdxFetchCountAndOp = new FetchCountAndOp(timeIdxFetchCountAndOp);
this.txnIdxFetchCountAndOp = new FetchCountAndOp(txnIdxFetchCountAndOp);
}
public RemoteFetchCount(FetchCountAndOp segmentFetchCountAndOp,
FetchCountAndOp offsetIdxFetchCountAndOp,
FetchCountAndOp timeIdxFetchCountAndOp,
FetchCountAndOp txnIdxFetchCountAndOp) {
this.segmentFetchCountAndOp = segmentFetchCountAndOp;
this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp;
this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp;
this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp;
}
public FetchCountAndOp getSegmentFetchCountAndOp() {
return segmentFetchCountAndOp;
}
public void setSegmentFetchCountAndOp(FetchCountAndOp segmentFetchCountAndOp) {
this.segmentFetchCountAndOp = segmentFetchCountAndOp;
}
public FetchCountAndOp getOffsetIdxFetchCountAndOp() {
return offsetIdxFetchCountAndOp;
}
public void setOffsetIdxFetchCountAndOp(FetchCountAndOp offsetIdxFetchCountAndOp) {
this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp;
}
public FetchCountAndOp getTimeIdxFetchCountAndOp() {
return timeIdxFetchCountAndOp;
}
public void setTimeIdxFetchCountAndOp(FetchCountAndOp timeIdxFetchCountAndOp) {
this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp;
}
public FetchCountAndOp getTxnIdxFetchCountAndOp() {
return txnIdxFetchCountAndOp;
}
public void setTxnIdxFetchCountAndOp(FetchCountAndOp txnIdxFetchCountAndOp) {
this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp;
}
@Override
public String toString() {
return "RemoteFetchCount{" +
"segmentFetchCountAndOp=" + segmentFetchCountAndOp +
", offsetIdxFetchCountAndOp=" + offsetIdxFetchCountAndOp +
", timeIdxFetchCountAndOp=" + timeIdxFetchCountAndOp +
", txnIdxFetchCountAndOp=" + txnIdxFetchCountAndOp +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RemoteFetchCount that = (RemoteFetchCount) o;
return Objects.equals(segmentFetchCountAndOp, that.segmentFetchCountAndOp) &&
Objects.equals(offsetIdxFetchCountAndOp, that.offsetIdxFetchCountAndOp) &&
Objects.equals(timeIdxFetchCountAndOp, that.timeIdxFetchCountAndOp) &&
Objects.equals(txnIdxFetchCountAndOp, that.txnIdxFetchCountAndOp);
}
@Override
public int hashCode() {
return Objects.hash(segmentFetchCountAndOp, offsetIdxFetchCountAndOp, timeIdxFetchCountAndOp, txnIdxFetchCountAndOp);
}
public enum OperationType {
EQUALS_TO,
GREATER_THAN_OR_EQUALS_TO,
LESS_THAN_OR_EQUALS_TO
}
public static class FetchCountAndOp {
private final int count;
private final OperationType operationType;
public FetchCountAndOp(int count) {
this.count = count;
this.operationType = OperationType.EQUALS_TO;
}
public FetchCountAndOp(int count, OperationType operationType) {
this.count = count;
this.operationType = operationType;
}
public int getCount() {
return count;
}
public OperationType getOperationType() {
return operationType;
}
@Override
public String toString() {
return "FetchCountAndOp{" +
"count=" + count +
", operationType=" + operationType +
'}';
}
}
}

View File

@ -24,7 +24,7 @@ public final class RemoteFetchSpec {
private final int sourceBrokerId; private final int sourceBrokerId;
private final TopicPartition topicPartition; private final TopicPartition topicPartition;
private final int count; private final RemoteFetchCount remoteFetchCount;
/** /**
* Specifies a fetch (download) event from a second-tier storage. This is used to ensure the * Specifies a fetch (download) event from a second-tier storage. This is used to ensure the
@ -32,14 +32,14 @@ public final class RemoteFetchSpec {
* *
* @param sourceBrokerId The broker which fetched (a) remote log segment(s) from the second-tier storage. * @param sourceBrokerId The broker which fetched (a) remote log segment(s) from the second-tier storage.
* @param topicPartition The topic-partition which segment(s) were fetched. * @param topicPartition The topic-partition which segment(s) were fetched.
* @param count The number of remote log segment(s) fetched. * @param remoteFetchCount The number of remote log segment(s) and indexes fetched.
*/ */
public RemoteFetchSpec(int sourceBrokerId, public RemoteFetchSpec(int sourceBrokerId,
TopicPartition topicPartition, TopicPartition topicPartition,
int count) { RemoteFetchCount remoteFetchCount) {
this.sourceBrokerId = sourceBrokerId; this.sourceBrokerId = sourceBrokerId;
this.topicPartition = topicPartition; this.topicPartition = topicPartition;
this.count = count; this.remoteFetchCount = remoteFetchCount;
} }
public int getSourceBrokerId() { public int getSourceBrokerId() {
@ -50,14 +50,14 @@ public final class RemoteFetchSpec {
return topicPartition; return topicPartition;
} }
public int getCount() { public RemoteFetchCount getRemoteFetchCount() {
return count; return remoteFetchCount;
} }
@Override @Override
public String toString() { public String toString() {
return String.format("RemoteFetch[source-broker-id=%d topic-partition=%s count=%d]", return String.format("RemoteFetch[source-broker-id=%d topic-partition=%s remote-fetch-count=%s]",
sourceBrokerId, topicPartition, count); sourceBrokerId, topicPartition, remoteFetchCount);
} }
@Override @Override
@ -66,12 +66,12 @@ public final class RemoteFetchSpec {
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
RemoteFetchSpec that = (RemoteFetchSpec) o; RemoteFetchSpec that = (RemoteFetchSpec) o;
return sourceBrokerId == that.sourceBrokerId return sourceBrokerId == that.sourceBrokerId
&& count == that.count && Objects.equals(remoteFetchCount, that.remoteFetchCount)
&& Objects.equals(topicPartition, that.topicPartition); && Objects.equals(topicPartition, that.topicPartition);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(sourceBrokerId, topicPartition, count); return Objects.hash(sourceBrokerId, topicPartition, remoteFetchCount);
} }
} }