diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 5d2ae4cf2e0..4317f0c5bc3 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -584,13 +584,32 @@ public class RemoteLogManager implements Closeable { int epochForOffset, long offset) throws RemoteStorageException { Uuid topicId = topicIdByPartitionMap.get(topicPartition); - if (topicId == null) { throw new KafkaException("No topic id registered for topic partition: " + topicPartition); } return remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset); } + /** + * Returns the next segment that may contain the aborted transaction entries. The search ensures that the returned + * segment offsets are greater than or equal to the given offset and in the same epoch. + * @param topicPartition topic partition to search + * @param epochForOffset the epoch + * @param offset the offset + * @return The next segment that contains the transaction index in the same epoch. + * @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata. + */ + public Optional fetchNextSegmentWithTxnIndex(TopicPartition topicPartition, + int epochForOffset, + long offset) throws RemoteStorageException { + Uuid topicId = topicIdByPartitionMap.get(topicPartition); + if (topicId == null) { + throw new KafkaException("No topic id registered for topic partition: " + topicPartition); + } + TopicIdPartition tpId = new TopicIdPartition(topicId, topicPartition); + return remoteLogMetadataManager.nextSegmentWithTxnIndex(tpId, epochForOffset, offset); + } + Optional lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) throws RemoteStorageException, IOException { int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset); @@ -973,9 +992,10 @@ public class RemoteLogManager implements Closeable { Map segmentLeaderEpochs = new HashMap<>(epochEntries.size()); epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset)); + boolean isTxnIdxEmpty = segment.txnIndex().isEmpty(); RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset, segment.largestTimestamp(), brokerId, time.milliseconds(), segment.log().sizeInBytes(), - segmentLeaderEpochs); + segmentLeaderEpochs, isTxnIdxEmpty); remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); @@ -1036,7 +1056,8 @@ public class RemoteLogManager implements Closeable { // Update the highest offset in remote storage for this partition's log so that the local log segments // are not deleted before they are copied to remote storage. log.updateHighestOffsetInRemoteStorage(endOffset); - logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); + logger.info("Copied {} to remote storage with segment-id: {}", + logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size(); long segmentsLag = log.onlyLocalLogSegmentsCount() - 1; @@ -1740,7 +1761,10 @@ public class RemoteLogManager implements Closeable { abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList())); + long startTimeNs = time.nanoseconds(); collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); + LOGGER.debug("Time taken to collect: {} aborted transactions for {} in {} ns", abortedTransactions.size(), + segmentMetadata, time.nanoseconds() - startTimeNs); return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, fetchInfo.records, @@ -1748,29 +1772,51 @@ public class RemoteLogManager implements Closeable { Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions))); } + /** + * Collects the aborted transaction entries from the current and subsequent segments until the upper bound offset. + * Note that the accumulated aborted transaction entries might contain duplicates as it collects the entries across + * segments. We are relying on the client to discard the duplicates. + * @param startOffset The start offset of the fetch request. + * @param upperBoundOffset The upper bound offset of the fetch request. + * @param segmentMetadata The current segment metadata. + * @param accumulator The accumulator to collect the aborted transactions. + * @param log The unified log instance. + * @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata. + */ private void collectAbortedTransactions(long startOffset, long upperBoundOffset, RemoteLogSegmentMetadata segmentMetadata, Consumer> accumulator, UnifiedLog log) throws RemoteStorageException { - // Search in remote segments first. - Optional nextSegmentMetadataOpt = Optional.of(segmentMetadata); - while (nextSegmentMetadataOpt.isPresent()) { - Optional txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex()); + TopicPartition tp = segmentMetadata.topicIdPartition().topicPartition(); + boolean isSearchComplete = false; + LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache().getOrElse(null); + Optional currentMetadataOpt = Optional.of(segmentMetadata); + while (!isSearchComplete && currentMetadataOpt.isPresent()) { + RemoteLogSegmentMetadata currentMetadata = currentMetadataOpt.get(); + Optional txnIndexOpt = getTransactionIndex(currentMetadata); if (txnIndexOpt.isPresent()) { - TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset); + TransactionIndex txnIndex = txnIndexOpt.get(); + TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset); accumulator.accept(searchResult.abortedTransactions); - if (searchResult.isComplete) { - // Return immediately when the search result is complete, it does not need to go through local log segments. - return; - } + isSearchComplete = searchResult.isComplete; + } + if (!isSearchComplete) { + currentMetadataOpt = findNextSegmentWithTxnIndex(tp, currentMetadata.endOffset() + 1, leaderEpochCache); } - - nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache()); } - // Search in local segments - collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, log.logSegments().iterator()); + if (!isSearchComplete) { + collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, log.logSegments().iterator()); + } + } + + private Optional getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) { + return !currentMetadata.isTxnIdxEmpty() ? + // `ofNullable` is needed for backward compatibility for old events that were stored in the + // `__remote_log_metadata` topic. The old events will return the `txnIdxEmpty` as false, but the + // transaction index may not exist in the remote storage. + Optional.ofNullable(indexCache.getIndexEntry(currentMetadata).txnIndex()) : Optional.empty(); } private void collectAbortedTransactionInLocalSegments(long startOffset, @@ -1803,6 +1849,44 @@ public class RemoteLogManager implements Closeable { : Optional.empty(); } + /** + * Returns the next segment metadata that contains the aborted transaction entries from the given offset. + * Note that the search starts from the given (offset-for-epoch, offset) pair, when there are no segments contains + * the transaction index in that epoch, then it proceeds to the next epoch (next-epoch, epoch-start-offset) + * and the search ends when the segment metadata is found or the leader epoch cache is exhausted. + * Note that the returned segment metadata may or may not contain the transaction index. + * Visible for testing + * @param tp The topic partition. + * @param offset The offset to start the search. + * @param leaderEpochCache The leader epoch file cache, this could be null. + * @return The next segment metadata that contains the transaction index. The transaction index may or may not exist + * in that segment metadata which depends on the RLMM plugin implementation. The caller of this method should handle + * for both the cases. + * @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata. + */ + Optional findNextSegmentWithTxnIndex(TopicPartition tp, + long offset, + LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException { + if (leaderEpochCache == null) { + return Optional.empty(); + } + OptionalInt initialEpochOpt = leaderEpochCache.epochForOffset(offset); + if (initialEpochOpt.isEmpty()) { + return Optional.empty(); + } + int initialEpoch = initialEpochOpt.getAsInt(); + for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) { + if (epochEntry.epoch >= initialEpoch) { + long startOffset = Math.max(epochEntry.startOffset, offset); + Optional metadataOpt = fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset); + if (metadataOpt.isPresent()) { + return metadataOpt; + } + } + } + return Optional.empty(); + } + // Visible for testing RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { RecordBatch nextBatch; diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 4ea373327d9..e75a6ca85d4 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -1414,6 +1414,92 @@ public class RemoteLogManagerTest { .remoteLogSegmentMetadata(eq(followerTopicIdPartition), anyInt(), anyLong()); } + @Test + public void testFetchNextSegmentWithTxnIndex() throws RemoteStorageException { + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); + remoteLogManager.fetchNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, 100L); + remoteLogManager.fetchNextSegmentWithTxnIndex(followerTopicIdPartition.topicPartition(), 20, 200L); + + verify(remoteLogMetadataManager) + .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), anyInt(), anyLong()); + verify(remoteLogMetadataManager) + .nextSegmentWithTxnIndex(eq(followerTopicIdPartition), anyInt(), anyLong()); + } + + @Test + public void testFindNextSegmentWithTxnIndex() throws RemoteStorageException { + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())) + .thenReturn(Optional.of(0L)); + when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class), anyInt(), anyLong())) + .thenAnswer(ans -> { + TopicIdPartition topicIdPartition = ans.getArgument(0); + int leaderEpoch = ans.getArgument(1); + long offset = ans.getArgument(2); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); + Map leaderEpochs = new TreeMap<>(); + leaderEpochs.put(leaderEpoch, offset); + RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(segmentId, + offset, offset + 100, time.milliseconds(), 0, time.milliseconds(), 1024, leaderEpochs, true); + return Optional.of(metadata); + }); + + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); + + // For offset-10, epoch is 0. + remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, cache); + verify(remoteLogMetadataManager) + .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0), eq(10L)); + } + + @Test + public void testFindNextSegmentWithTxnIndexTraversesNextEpoch() throws RemoteStorageException { + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())) + .thenReturn(Optional.of(0L)); + when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class), anyInt(), anyLong())) + .thenAnswer(ans -> { + TopicIdPartition topicIdPartition = ans.getArgument(0); + int leaderEpoch = ans.getArgument(1); + long offset = ans.getArgument(2); + Optional metadataOpt = Optional.empty(); + if (leaderEpoch == 2) { + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); + Map leaderEpochs = new TreeMap<>(); + leaderEpochs.put(leaderEpoch, offset); + RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(segmentId, + offset, offset + 100, time.milliseconds(), 0, time.milliseconds(), 1024, leaderEpochs, true); + metadataOpt = Optional.of(metadata); + } + return metadataOpt; + }); + + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); + + // For offset-10, epoch is 0. + // 1. For epoch 0 and 1, it returns empty and + // 2. For epoch 2, it returns the segment metadata. + remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, cache); + verify(remoteLogMetadataManager) + .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0), eq(10L)); + verify(remoteLogMetadataManager) + .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(1), eq(100L)); + verify(remoteLogMetadataManager) + .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(2), eq(200L)); + } + @Test void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() { remoteLogManager.startup(); diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java index 78d48fbd949..2280aa51132 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java @@ -209,4 +209,25 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable { * @return Total size of the log stored in remote storage in bytes. */ long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException; + + /** + * Returns the next segment metadata that contains the aborted transaction entries for the given topic partition, epoch and offset. + *
    + *
  • The default implementation returns the segment metadata that matches the given epoch and offset + * irrespective of the presence of the transaction index.
  • + *
  • The custom implementation can optimize by returning the next segment metadata that contains the txn index + * in the given epoch. If there are no segments with txn index in the given epoch, then return empty.
  • + *
+ * @param topicIdPartition topic partition to search for. + * @param epoch leader epoch for the given offset. + * @param offset offset + * @return The next segment metadata. The transaction index may or may not exist in the returned segment metadata + * which depends on the RLMM plugin implementation. The caller of this method handles for both the cases. + * @throws RemoteStorageException if there are any storage related errors occurred. + */ + default Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, + int epoch, + long offset) throws RemoteStorageException { + return remoteLogSegmentMetadata(topicIdPartition, epoch, offset); + } } \ No newline at end of file diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java index 9b589322bbf..02918d90625 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java @@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { */ private final RemoteLogSegmentState state; + /** + * Indicates whether the transaction index is empty for this segment. + */ + private final boolean txnIdxEmpty; + /** * Creates an instance with the given metadata of remote log segment. *

@@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { Optional customMetadata, RemoteLogSegmentState state, Map segmentLeaderEpochs) { + this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes, + customMetadata, state, segmentLeaderEpochs, false); + } + + /** + * Creates an instance with the given metadata of remote log segment. + *

+ * {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch + * then it should have an entry with epoch mapping to start-offset of this segment. + * + * @param remoteLogSegmentId Universally unique remote log segment id. + * @param startOffset Start offset of this segment (inclusive). + * @param endOffset End offset of this segment (inclusive). + * @param maxTimestampMs Maximum timestamp in milli seconds in this segment. + * @param brokerId Broker id from which this event is generated. + * @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage. + * @param segmentSizeInBytes Size of this segment in bytes. + * @param customMetadata Custom metadata. + * @param state State of the respective segment of remoteLogSegmentId. + * @param segmentLeaderEpochs leader epochs occurred within this segment. + * @param txnIdxEmpty True if the transaction index is empty, false otherwise. + */ + public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, + long startOffset, + long endOffset, + long maxTimestampMs, + int brokerId, + long eventTimestampMs, + int segmentSizeInBytes, + Optional customMetadata, + RemoteLogSegmentState state, + Map segmentLeaderEpochs, + boolean txnIdxEmpty) { super(brokerId, eventTimestampMs); this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null"); this.state = Objects.requireNonNull(state, "state can not be null"); @@ -128,6 +166,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { } this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new TreeMap<>(segmentLeaderEpochs)); + this.txnIdxEmpty = txnIdxEmpty; } /** @@ -164,6 +203,34 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { segmentLeaderEpochs); } + /** + * Creates an instance with the given metadata of remote log segment and its state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}. + *

+ * {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch + * then it should have an entry with epoch mapping to start-offset of this segment. + * + * @param remoteLogSegmentId Universally unique remote log segment id. + * @param startOffset Start offset of this segment (inclusive). + * @param endOffset End offset of this segment (inclusive). + * @param maxTimestampMs Maximum timestamp in this segment + * @param brokerId Broker id from which this event is generated. + * @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage. + * @param segmentSizeInBytes Size of this segment in bytes. + * @param segmentLeaderEpochs leader epochs occurred within this segment + * @param txnIdxEmpty True if the transaction index is empty, false otherwise. + */ + public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, + long startOffset, + long endOffset, + long maxTimestampMs, + int brokerId, + long eventTimestampMs, + int segmentSizeInBytes, + Map segmentLeaderEpochs, + boolean txnIdxEmpty) { + this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes, + Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED, segmentLeaderEpochs, txnIdxEmpty); + } /** * @return unique id of this segment. @@ -227,6 +294,14 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { return state; } + /** + * If true indicates that the transaction index is empty. + * @return True if the Transaction index is empty, false otherwise. + */ + public boolean isTxnIdxEmpty() { + return txnIdxEmpty; + } + /** * Creates a new RemoteLogSegmentMetadata applying the given {@code rlsmUpdate} on this instance. This method will * not update this instance. @@ -241,7 +316,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, rlsmUpdate.brokerId(), rlsmUpdate.eventTimestampMs(), - segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs); + segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs, txnIdxEmpty); } @Override @@ -266,13 +341,14 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { && Objects.equals(customMetadata, that.customMetadata) && state == that.state && eventTimestampMs() == that.eventTimestampMs() - && brokerId() == that.brokerId(); + && brokerId() == that.brokerId() + && txnIdxEmpty == that.txnIdxEmpty; } @Override public int hashCode() { return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId(), maxTimestampMs, - eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state); + eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state, txnIdxEmpty); } @Override @@ -288,6 +364,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { ", segmentSizeInBytes=" + segmentSizeInBytes + ", customMetadata=" + customMetadata + ", state=" + state + + ", txnIdxEmpty=" + txnIdxEmpty + '}'; } diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java index 6ffcf85b9b0..cb9491360f8 100644 --- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java +++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java @@ -79,6 +79,11 @@ public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager { return 0; } + @Override + public Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) { + return Optional.empty(); + } + @Override public void close() throws IOException { } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java index dc2678242f4..1abcbbc20ce 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java @@ -106,6 +106,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetada return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, leaderEpoch)); } + @Override + public Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException { + return withClassLoader(() -> delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset)); + } + @Override public void configure(Map configs) { withClassLoader(() -> { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java index 2ee1c29f6e5..b08458b84b8 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java @@ -125,29 +125,45 @@ public class RemoteLogMetadataCache { * @return the requested remote log segment metadata if it exists. */ public Optional remoteLogSegmentMetadata(int leaderEpoch, long offset) { - RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch); - - if (remoteLogLeaderEpochState == null) { - return Optional.empty(); + RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch, offset); + long epochEndOffset = -1L; + if (metadata != null) { + // Check whether the given offset with leaderEpoch exists in this segment. + // Check for epoch's offset boundaries with in this segment. + // 1. Get the next epoch's start offset -1 if exists + // 2. If no next epoch exists, then segment end offset can be considered as epoch's relative end offset. + Map.Entry nextEntry = metadata.segmentLeaderEpochs().higherEntry(leaderEpoch); + epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : metadata.endOffset(); } - - // Look for floor entry as the given offset may exist in this entry. - RemoteLogSegmentId remoteLogSegmentId = remoteLogLeaderEpochState.floorEntry(offset); - if (remoteLogSegmentId == null) { - // If the offset is lower than the minimum offset available in metadata then return empty. - return Optional.empty(); - } - - RemoteLogSegmentMetadata metadata = idToSegmentMetadata.get(remoteLogSegmentId); - // Check whether the given offset with leaderEpoch exists in this segment. - // Check for epoch's offset boundaries with in this segment. - // 1. Get the next epoch's start offset -1 if exists - // 2. If no next epoch exists, then segment end offset can be considered as epoch's relative end offset. - Map.Entry nextEntry = metadata.segmentLeaderEpochs().higherEntry(leaderEpoch); - long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : metadata.endOffset(); - // Return empty when target offset > epoch's end offset. - return offset > epochEndOffset ? Optional.empty() : Optional.of(metadata); + return offset > epochEndOffset ? Optional.empty() : Optional.ofNullable(metadata); + } + + public Optional nextSegmentWithTxnIndex(int leaderEpoch, long offset) { + boolean txnIdxEmpty = true; + Optional metadataOpt = remoteLogSegmentMetadata(leaderEpoch, offset); + while (metadataOpt.isPresent() && txnIdxEmpty) { + txnIdxEmpty = metadataOpt.get().isTxnIdxEmpty(); + if (txnIdxEmpty) { // If txn index is empty, then look for next segment. + metadataOpt = remoteLogSegmentMetadata(leaderEpoch, metadataOpt.get().endOffset() + 1); + } + } + return txnIdxEmpty ? Optional.empty() : metadataOpt; + } + + private RemoteLogSegmentMetadata getSegmentMetadata(int leaderEpoch, long offset) { + RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch); + if (remoteLogLeaderEpochState != null) { + // Look for floor entry as the given offset may exist in this entry. + RemoteLogSegmentId remoteLogSegmentId = remoteLogLeaderEpochState.floorEntry(offset); + if (remoteLogSegmentId != null) { + return idToSegmentMetadata.get(remoteLogSegmentId); + } else { + log.warn("No remote segment found for leaderEpoch: {}, offset: {}", leaderEpoch, offset); + } + } + // If the offset is lower than the minimum offset available in metadata then return null. + return null; } public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java index a1ba23dd8f8..64540a7fabd 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java @@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { */ private final RemoteLogSegmentState state; + /** + * Indicates whether the transaction index is empty for this segment. + */ + private final boolean txnIdxEmpty; + /** * Creates an instance with the given metadata of remote log segment. *

@@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { Optional customMetadata, RemoteLogSegmentState state, Map segmentLeaderEpochs) { + this(segmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes, + customMetadata, state, segmentLeaderEpochs, false); + } + + /** + * Creates an instance with the given metadata of remote log segment. + *

+ * {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch + * then it should have an entry with epoch mapping to start-offset of this segment. + * + * @param segmentId Universally unique remote log segment id. + * @param startOffset Start offset of this segment (inclusive). + * @param endOffset End offset of this segment (inclusive). + * @param maxTimestampMs Maximum timestamp in milliseconds in this segment. + * @param brokerId Broker id from which this event is generated. + * @param eventTimestampMs Epoch time in milliseconds at which the remote log segment is copied to the remote tier storage. + * @param segmentSizeInBytes Size of this segment in bytes. + * @param customMetadata Custom metadata. + * @param state State of the respective segment of remoteLogSegmentId. + * @param segmentLeaderEpochs leader epochs occurred within this segment. + * @param txnIdxEmpty true if the transaction index is empty, false otherwise. + */ + public RemoteLogSegmentMetadataSnapshot(Uuid segmentId, + long startOffset, + long endOffset, + long maxTimestampMs, + int brokerId, + long eventTimestampMs, + int segmentSizeInBytes, + Optional customMetadata, + RemoteLogSegmentState state, + Map segmentLeaderEpochs, + boolean txnIdxEmpty) { super(brokerId, eventTimestampMs); this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId can not be null"); this.state = Objects.requireNonNull(state, "state can not be null"); @@ -114,6 +152,7 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { this.maxTimestampMs = maxTimestampMs; this.segmentSizeInBytes = segmentSizeInBytes; this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null"); + this.txnIdxEmpty = txnIdxEmpty; if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) { throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty"); @@ -125,7 +164,7 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) { return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(), metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(), - metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs() + metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs(), metadata.isTxnIdxEmpty() ); } @@ -191,6 +230,10 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { return state; } + public boolean isTxnIdxEmpty() { + return txnIdxEmpty; + } + @Override public TopicIdPartition topicIdPartition() { throw new UnsupportedOperationException("This metadata does not have topic partition with it."); @@ -208,12 +251,13 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { && Objects.equals(customMetadata, that.customMetadata) && Objects.equals(segmentId, that.segmentId) && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) - && state == that.state; + && state == that.state + && txnIdxEmpty == that.txnIdxEmpty; } @Override public int hashCode() { - return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state); + return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state, txnIdxEmpty); } @Override @@ -227,6 +271,7 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { ", segmentSizeInBytes=" + segmentSizeInBytes + ", customMetadata=" + customMetadata + ", state=" + state + + ", txnIdxEmpty=" + txnIdxEmpty + '}'; } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index 86c29567d36..c74e97fbe68 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -55,7 +55,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa @Override public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { - log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata); + log.debug("Adding remote log segment: {}", remoteLogSegmentMetadata); final RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition(); @@ -71,7 +71,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa @Override public void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) { - log.debug("Updating remote log segment: [{}]", rlsmUpdate); + log.debug("Updating remote log segment: {}", rlsmUpdate); RemoteLogSegmentId remoteLogSegmentId = rlsmUpdate.remoteLogSegmentId(); TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition(); RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); @@ -88,7 +88,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa @Override public void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) { - log.debug("Received partition delete state with: [{}]", remotePartitionDeleteMetadata); + log.debug("Received partition delete state with: {}", remotePartitionDeleteMetadata); TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topicIdPartition(); idToPartitionDeleteMetadata.put(topicIdPartition, remotePartitionDeleteMetadata); @@ -108,30 +108,25 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); - return getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments(); } public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException { - Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); - return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch); } private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition) throws RemoteResourceNotFoundException { + Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); if (remoteLogMetadataCache == null) { throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } - if (!remoteLogMetadataCache.isInitialized()) { // Throwing a retriable ReplicaNotAvailableException here for clients retry. throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); } - return remoteLogMetadataCache; } @@ -139,15 +134,17 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa long offset, int epochForOffset) throws RemoteStorageException { - Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); - return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset); } + public Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, + int epoch, + long offset) throws RemoteStorageException { + return getRemoteLogMetadataCache(topicIdPartition).nextSegmentWithTxnIndex(epoch, offset); + } + public Optional highestLogOffset(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException { - Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); - return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch); } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index eff498b5437..a5db1ea38ef 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -356,6 +356,17 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana return remoteLogSize; } + @Override + public Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException { + lock.readLock().lock(); + try { + ensureInitializedAndNotClosed(); + return remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset); + } finally { + lock.readLock().unlock(); + } + } + @Override public void configure(Map configs) { Objects.requireNonNull(configs, "configs can not be null."); diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java index ad47ee05c84..4e839d08b3a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java @@ -40,7 +40,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetad .setMaxTimestampMs(segmentMetadata.maxTimestampMs()) .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes()) .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs())) - .setRemoteLogSegmentState(segmentMetadata.state().id()); + .setRemoteLogSegmentState(segmentMetadata.state().id()) + .setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty()); segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value())); return new ApiMessageAndVersion(record, record.highestSupportedVersion()); @@ -72,7 +73,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetad record.segmentSizeInBytes(), customMetadata, RemoteLogSegmentState.forId(record.remoteLogSegmentState()), - segmentLeaderEpochs); + segmentLeaderEpochs, + record.txnIndexEmpty()); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java index 9e893d2cbc3..99f3fc0d90c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java @@ -44,7 +44,8 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans .setMaxTimestampMs(segmentMetadata.maxTimestampMs()) .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes()) .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata)) - .setRemoteLogSegmentState(segmentMetadata.state().id()); + .setRemoteLogSegmentState(segmentMetadata.state().id()) + .setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty()); segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value())); return new ApiMessageAndVersion(record, record.highestSupportedVersion()); @@ -83,7 +84,7 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(), record.maxTimestampMs(), record.brokerId(), record.eventTimestampMs(), record.segmentSizeInBytes(), - segmentLeaderEpochs); + segmentLeaderEpochs, record.txnIndexEmpty()); RemoteLogSegmentMetadataUpdate rlsmUpdate = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(), customMetadata, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 10d1449cdb7..8e089dc3cfc 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -194,6 +194,14 @@ public class TransactionIndex implements Closeable { } } + /** + * Check if the index is empty. + * @return `true` if the index is empty (or) when underlying file doesn't exists, `false` otherwise. + */ + public boolean isEmpty() { + return !iterable().iterator().hasNext(); + } + private FileChannel openChannel() throws IOException { FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json index c737135a6a2..9c035f52630 100644 --- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json +++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json @@ -129,6 +129,14 @@ "type": "int8", "versions": "0+", "about": "State identifier of the remote log segment, which is RemoteLogSegmentState.id()." + }, + { + "name": "TxnIndexEmpty", + "type": "bool", + "versions": "0+", + "about": "Flag to indicate if the transaction index is empty.", + "taggedVersions": "0+", + "tag": 0 } ] } \ No newline at end of file diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json index 20fb1732572..f4a1f19dca4 100644 --- a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json +++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json @@ -95,6 +95,14 @@ "type": "int8", "versions": "0+", "about": "State of the remote log segment" + }, + { + "name": "TxnIndexEmpty", + "type": "bool", + "versions": "0+", + "about": "Flag to indicate if the transaction index is empty.", + "taggedVersions": "0+", + "tag": 0 } ] } \ No newline at end of file diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index d6a03441e8b..47925d01a7d 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest { Optional customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, - segLeaderEpochs); + segLeaderEpochs, true); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); ConsumerRecord metadataRecord = new ConsumerRecord<>( @@ -65,7 +65,7 @@ public class RemoteLogMetadataFormatterTest { "startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, " + "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " + "customMetadata=Optional[CustomMetadata{10 bytes}], " + - "state=COPY_SEGMENT_STARTED}\n", + "state=COPY_SEGMENT_STARTED, txnIdxEmpty=true}\n", TOPIC_ID, SEGMENT_ID); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos)) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java index b4c4110ecea..399529129ff 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java @@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; class RemoteLogSegmentMetadataSnapshotTransformTest { @ParameterizedTest @MethodSource("parameters") - void testToAndFromMessage(Optional customMetadata) { + void testToAndFromMessage(Optional customMetadata, boolean isTxnIdxEmpty) { Map segmentLeaderEpochs = new HashMap<>(); segmentLeaderEpochs.put(0, 0L); RemoteLogSegmentMetadataSnapshot snapshot = new RemoteLogSegmentMetadataSnapshot( @@ -43,7 +43,8 @@ class RemoteLogSegmentMetadataSnapshotTransformTest { 0L, 100L, -1L, 0, 0, 1234, customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, - segmentLeaderEpochs + segmentLeaderEpochs, + isTxnIdxEmpty ); RemoteLogSegmentMetadataSnapshotTransform transform = new RemoteLogSegmentMetadataSnapshotTransform(); @@ -51,11 +52,11 @@ class RemoteLogSegmentMetadataSnapshotTransformTest { assertEquals(snapshot, transform.fromApiMessageAndVersion(message)); } - private static Stream parameters() { + private static Stream parameters() { return Stream.of( - Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), - Optional.of(new CustomMetadata(new byte[0])), - Optional.empty() + new Object[]{Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), true}, + new Object[]{Optional.of(new CustomMetadata(new byte[0])), false}, + new Object[]{Optional.empty(), true} ); } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java index 7a75ab0cced..1d19d433e05 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java @@ -202,4 +202,24 @@ public class TransactionIndexTest { index.deleteIfExists(); assertFalse(file.exists()); } + + @Test + public void testIsEmptyWhenFileDoesNotExist() throws IOException { + File nonExistentFile = TestUtils.tempFile(); + assertTrue(nonExistentFile.delete()); + try (TransactionIndex testIndex = new TransactionIndex(0, nonExistentFile)) { + assertTrue(testIndex.isEmpty()); + } + } + + @Test + public void testIsEmptyWhenFileIsEmpty() { + assertTrue(index.isEmpty()); + } + + @Test + public void testIsEmptyWhenFileIsNotEmpty() throws IOException { + index.append(new AbortedTxn(0L, 0, 10, 2)); + assertFalse(index.isEmpty()); + } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java index a93dea813d1..c51405c8636 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java @@ -53,6 +53,7 @@ import org.apache.kafka.tiered.storage.specs.OffloadableSpec; import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; import org.apache.kafka.tiered.storage.specs.ProducableSpec; import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; import org.apache.kafka.tiered.storage.specs.TopicSpec; @@ -228,10 +229,17 @@ public final class TieredStorageTestBuilder { public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer fromBroker, String topic, Integer partition, - Integer remoteFetchRequestCount) { + Integer segmentFetchRequestCount) { + return expectFetchFromTieredStorage(fromBroker, topic, partition, new RemoteFetchCount(segmentFetchRequestCount)); + } + + public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer fromBroker, + String topic, + Integer partition, + RemoteFetchCount remoteFetchRequestCount) { TopicPartition topicPartition = new TopicPartition(topic, partition); assertTrue(partition >= 0, "Partition must be >= 0"); - assertTrue(remoteFetchRequestCount >= 0, "Expected fetch count from tiered storage must be >= 0"); + assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0, "Expected fetch count from tiered storage must be >= 0"); assertFalse(fetchables.containsKey(topicPartition), "Consume already in progress for " + topicPartition); fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount)); return this; @@ -371,7 +379,7 @@ public final class TieredStorageTestBuilder { private void createConsumeAction() { if (!consumables.isEmpty()) { consumables.forEach((topicPartition, consumableSpec) -> { - FetchableSpec fetchableSpec = fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, 0)); + FetchableSpec fetchableSpec = fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, new RemoteFetchCount(0))); RemoteFetchSpec remoteFetchSpec = new RemoteFetchSpec(fetchableSpec.getSourceBrokerId(), topicPartition, fetchableSpec.getFetchCount()); ConsumeAction action = new ConsumeAction(topicPartition, consumableSpec.getFetchOffset(), diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java index 97feae05654..4b4c401b1d2 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java @@ -92,12 +92,16 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness { protected abstract void writeTestSpecifications(TieredStorageTestBuilder builder); + protected void overrideConsumerConfig(Properties consumerConfig) { + } + @BeforeEach @Override public void setUp(TestInfo testInfo) { testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault()); storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath(); super.setUp(testInfo); + overrideConsumerConfig(consumerConfig()); context = new TieredStorageTestContext(this); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java index 250db6a46e8..9549a6b6916 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java @@ -25,20 +25,26 @@ import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; import java.io.PrintStream; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public final class ConsumeAction implements TieredStorageTestAction { @@ -75,6 +81,9 @@ public final class ConsumeAction implements TieredStorageTestAction { // type has yet to happen. LocalTieredStorageHistory history = context.tieredStorageHistory(remoteFetchSpec.getSourceBrokerId()); Optional latestEventSoFar = history.latestEvent(FETCH_SEGMENT, topicPartition); + Optional latestOffsetIdxEventSoFar = history.latestEvent(FETCH_OFFSET_INDEX, topicPartition); + Optional latestTimeIdxEventSoFar = history.latestEvent(FETCH_TIME_INDEX, topicPartition); + Optional latestTxnIdxEventSoFar = history.latestEvent(FETCH_TRANSACTION_INDEX, topicPartition); // Records are consumed here List> consumedRecords = @@ -119,16 +128,61 @@ public final class ConsumeAction implements TieredStorageTestAction { assertThat(storedRecords, correspondTo(readRecords, topicPartition, serde, serde)); // (B) Assessment of the interactions between the source broker and the second-tier storage. - List events = history.getEvents(FETCH_SEGMENT, topicPartition); - List eventsInScope = latestEventSoFar - .map(latestEvent -> - events.stream().filter(event -> event.isAfter(latestEvent)).collect(Collectors.toList())) - .orElse(events); + for (LocalTieredStorageEvent.EventType eventType : Arrays.asList(FETCH_SEGMENT, FETCH_OFFSET_INDEX, FETCH_TIME_INDEX, FETCH_TRANSACTION_INDEX)) { + Optional latestEvent; + switch (eventType) { + case FETCH_SEGMENT: + latestEvent = latestEventSoFar; + break; + case FETCH_OFFSET_INDEX: + latestEvent = latestOffsetIdxEventSoFar; + break; + case FETCH_TIME_INDEX: + latestEvent = latestTimeIdxEventSoFar; + break; + case FETCH_TRANSACTION_INDEX: + latestEvent = latestTxnIdxEventSoFar; + break; + default: + latestEvent = Optional.empty(); + } - assertEquals(remoteFetchSpec.getCount(), eventsInScope.size(), - "Number of fetch requests from broker " + remoteFetchSpec.getSourceBrokerId() + " to the " + - "tier storage does not match the expected value for topic-partition " - + remoteFetchSpec.getTopicPartition()); + List events = history.getEvents(eventType, topicPartition); + List eventsInScope = latestEvent + .map(e -> events.stream().filter(event -> event.isAfter(e)).collect(Collectors.toList())) + .orElse(events); + + RemoteFetchCount remoteFetchCount = remoteFetchSpec.getRemoteFetchCount(); + RemoteFetchCount.FetchCountAndOp expectedCountAndOp; + switch (eventType) { + case FETCH_SEGMENT: + expectedCountAndOp = remoteFetchCount.getSegmentFetchCountAndOp(); + break; + case FETCH_OFFSET_INDEX: + expectedCountAndOp = remoteFetchCount.getOffsetIdxFetchCountAndOp(); + break; + case FETCH_TIME_INDEX: + expectedCountAndOp = remoteFetchCount.getTimeIdxFetchCountAndOp(); + break; + case FETCH_TRANSACTION_INDEX: + expectedCountAndOp = remoteFetchCount.getTxnIdxFetchCountAndOp(); + break; + default: + expectedCountAndOp = new RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO); + } + + String message = String.format("Number of %s requests from broker %d to the tier storage does not match the expected value for topic-partition %s", + eventType, remoteFetchSpec.getSourceBrokerId(), remoteFetchSpec.getTopicPartition()); + if (expectedCountAndOp.getCount() != -1) { + if (expectedCountAndOp.getOperationType() == RemoteFetchCount.OperationType.EQUALS_TO) { + assertEquals(expectedCountAndOp.getCount(), eventsInScope.size(), message); + } else if (expectedCountAndOp.getOperationType() == RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) { + assertTrue(eventsInScope.size() <= expectedCountAndOp.getCount(), message); + } else { + assertTrue(eventsInScope.size() >= expectedCountAndOp.getCount(), message); + } + } + } } @Override @@ -138,5 +192,6 @@ public final class ConsumeAction implements TieredStorageTestAction { output.println(" fetch-offset = " + fetchOffset); output.println(" expected-record-count = " + expectedTotalCount); output.println(" expected-record-from-tiered-storage = " + expectedFromSecondTierCount); + output.println(" remote-fetch-spec = " + remoteFetchSpec); } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java new file mode 100644 index 00000000000..38b7ae3df2d --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp; +import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.EQUALS_TO; +import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO; + +/** + * Test Cases: + * Elementary offloads and fetches from tiered storage using consumer with read_committed isolation level. + */ +public final class OffloadAndTxnConsumeFromLeaderTest extends TieredStorageTestHarness { + + /** + * Cluster of one broker + * @return number of brokers in the cluster + */ + @Override + public int brokerCount() { + return 1; + } + + @Override + public Properties overridingProps() { + Properties props = super.overridingProps(); + // Configure the remote-log index cache size to hold one entry to simulate eviction of cached index entries. + props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "1"); + return props; + } + + @Override + protected void overrideConsumerConfig(Properties consumerConfig) { + consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + } + + @Override + protected void writeTestSpecifications(TieredStorageTestBuilder builder) { + final Integer broker = 0; + final String topicA = "topicA"; + final Integer p0 = 0; + final Integer partitionCount = 1; + final Integer replicationFactor = 1; + final Integer oneBatchPerSegment = 1; + final Map> replicaAssignment = null; + final boolean enableRemoteLogStorage = true; + + builder + .createTopic(topicA, partitionCount, replicationFactor, oneBatchPerSegment, replicaAssignment, + enableRemoteLogStorage) + .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k0", "v0")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k1", "v1")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 2, new KeyValueSpec("k2", "v2")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 3, new KeyValueSpec("k3", "v3")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 4, new KeyValueSpec("k4", "v4")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 5, new KeyValueSpec("k5", "v5")) + .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 6L) + .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), + new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"), + new KeyValueSpec("k5", "v5"), new KeyValueSpec("k6", "v6")) + // When reading with transactional consumer, the consecutive remote fetch indexes are fetched until the + // LSO found is higher than the fetch-offset. + // summation(n) = (n * (n + 1)) / 2 + // Total number of uploaded remote segments = 6. Total number of index fetches = (6 * (6 + 1)) / 2 = 21 + // Note that we skip the index fetch when the txn-index is empty, so the effective index fetch count + // should be same as the segment count. + .expectFetchFromTieredStorage(broker, topicA, p0, getRemoteFetchCount()) + .consume(topicA, p0, 0L, 7, 6); + } + + private static RemoteFetchCount getRemoteFetchCount() { + FetchCountAndOp segmentFetchCountAndOp = new FetchCountAndOp(6, EQUALS_TO); + // RemoteIndexCache might evict the entries much before reaching the maximum size. + // To make the test deterministic, we are using the operation type as LESS_THAN_OR_EQUALS_TO which equals to the + // number of times the RemoteIndexCache gets accessed. The RemoteIndexCache gets accessed twice for each read. + FetchCountAndOp indexFetchCountAndOp = new FetchCountAndOp(12, LESS_THAN_OR_EQUALS_TO); + return new RemoteFetchCount(segmentFetchCountAndOp, indexFetchCountAndOp, + indexFetchCountAndOp, indexFetchCountAndOp); + } +} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java index 48dd34052b7..9e2f23af7a7 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java @@ -21,10 +21,10 @@ import java.util.Objects; public final class FetchableSpec { private final Integer sourceBrokerId; - private final Integer fetchCount; + private final RemoteFetchCount fetchCount; public FetchableSpec(Integer sourceBrokerId, - Integer fetchCount) { + RemoteFetchCount fetchCount) { this.sourceBrokerId = sourceBrokerId; this.fetchCount = fetchCount; } @@ -33,7 +33,7 @@ public final class FetchableSpec { return sourceBrokerId; } - public Integer getFetchCount() { + public RemoteFetchCount getFetchCount() { return fetchCount; } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java new file mode 100644 index 00000000000..8dfc9a762b8 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.specs; + +import java.util.Objects; + +public class RemoteFetchCount { + private FetchCountAndOp segmentFetchCountAndOp; + private FetchCountAndOp offsetIdxFetchCountAndOp = new FetchCountAndOp(-1); + private FetchCountAndOp timeIdxFetchCountAndOp = new FetchCountAndOp(-1); + private FetchCountAndOp txnIdxFetchCountAndOp = new FetchCountAndOp(-1); + + public RemoteFetchCount(int segmentFetchCountAndOp) { + this.segmentFetchCountAndOp = new FetchCountAndOp(segmentFetchCountAndOp); + } + + public RemoteFetchCount(int segmentFetchCountAndOp, + int offsetIdxFetchCountAndOp, + int timeIdxFetchCountAndOp, + int txnIdxFetchCountAndOp) { + this.segmentFetchCountAndOp = new FetchCountAndOp(segmentFetchCountAndOp); + this.offsetIdxFetchCountAndOp = new FetchCountAndOp(offsetIdxFetchCountAndOp); + this.timeIdxFetchCountAndOp = new FetchCountAndOp(timeIdxFetchCountAndOp); + this.txnIdxFetchCountAndOp = new FetchCountAndOp(txnIdxFetchCountAndOp); + } + + public RemoteFetchCount(FetchCountAndOp segmentFetchCountAndOp, + FetchCountAndOp offsetIdxFetchCountAndOp, + FetchCountAndOp timeIdxFetchCountAndOp, + FetchCountAndOp txnIdxFetchCountAndOp) { + this.segmentFetchCountAndOp = segmentFetchCountAndOp; + this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp; + this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp; + this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp; + } + + public FetchCountAndOp getSegmentFetchCountAndOp() { + return segmentFetchCountAndOp; + } + + public void setSegmentFetchCountAndOp(FetchCountAndOp segmentFetchCountAndOp) { + this.segmentFetchCountAndOp = segmentFetchCountAndOp; + } + + public FetchCountAndOp getOffsetIdxFetchCountAndOp() { + return offsetIdxFetchCountAndOp; + } + + public void setOffsetIdxFetchCountAndOp(FetchCountAndOp offsetIdxFetchCountAndOp) { + this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp; + } + + public FetchCountAndOp getTimeIdxFetchCountAndOp() { + return timeIdxFetchCountAndOp; + } + + public void setTimeIdxFetchCountAndOp(FetchCountAndOp timeIdxFetchCountAndOp) { + this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp; + } + + public FetchCountAndOp getTxnIdxFetchCountAndOp() { + return txnIdxFetchCountAndOp; + } + + public void setTxnIdxFetchCountAndOp(FetchCountAndOp txnIdxFetchCountAndOp) { + this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp; + } + + @Override + public String toString() { + return "RemoteFetchCount{" + + "segmentFetchCountAndOp=" + segmentFetchCountAndOp + + ", offsetIdxFetchCountAndOp=" + offsetIdxFetchCountAndOp + + ", timeIdxFetchCountAndOp=" + timeIdxFetchCountAndOp + + ", txnIdxFetchCountAndOp=" + txnIdxFetchCountAndOp + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RemoteFetchCount that = (RemoteFetchCount) o; + return Objects.equals(segmentFetchCountAndOp, that.segmentFetchCountAndOp) && + Objects.equals(offsetIdxFetchCountAndOp, that.offsetIdxFetchCountAndOp) && + Objects.equals(timeIdxFetchCountAndOp, that.timeIdxFetchCountAndOp) && + Objects.equals(txnIdxFetchCountAndOp, that.txnIdxFetchCountAndOp); + } + + @Override + public int hashCode() { + return Objects.hash(segmentFetchCountAndOp, offsetIdxFetchCountAndOp, timeIdxFetchCountAndOp, txnIdxFetchCountAndOp); + } + + public enum OperationType { + EQUALS_TO, + GREATER_THAN_OR_EQUALS_TO, + LESS_THAN_OR_EQUALS_TO + } + + public static class FetchCountAndOp { + private final int count; + private final OperationType operationType; + + public FetchCountAndOp(int count) { + this.count = count; + this.operationType = OperationType.EQUALS_TO; + } + + public FetchCountAndOp(int count, OperationType operationType) { + this.count = count; + this.operationType = operationType; + } + + public int getCount() { + return count; + } + + public OperationType getOperationType() { + return operationType; + } + + @Override + public String toString() { + return "FetchCountAndOp{" + + "count=" + count + + ", operationType=" + operationType + + '}'; + } + } +} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java index 5a17ebee0cd..823d4321bc7 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java @@ -24,7 +24,7 @@ public final class RemoteFetchSpec { private final int sourceBrokerId; private final TopicPartition topicPartition; - private final int count; + private final RemoteFetchCount remoteFetchCount; /** * Specifies a fetch (download) event from a second-tier storage. This is used to ensure the @@ -32,14 +32,14 @@ public final class RemoteFetchSpec { * * @param sourceBrokerId The broker which fetched (a) remote log segment(s) from the second-tier storage. * @param topicPartition The topic-partition which segment(s) were fetched. - * @param count The number of remote log segment(s) fetched. + * @param remoteFetchCount The number of remote log segment(s) and indexes fetched. */ public RemoteFetchSpec(int sourceBrokerId, TopicPartition topicPartition, - int count) { + RemoteFetchCount remoteFetchCount) { this.sourceBrokerId = sourceBrokerId; this.topicPartition = topicPartition; - this.count = count; + this.remoteFetchCount = remoteFetchCount; } public int getSourceBrokerId() { @@ -50,14 +50,14 @@ public final class RemoteFetchSpec { return topicPartition; } - public int getCount() { - return count; + public RemoteFetchCount getRemoteFetchCount() { + return remoteFetchCount; } @Override public String toString() { - return String.format("RemoteFetch[source-broker-id=%d topic-partition=%s count=%d]", - sourceBrokerId, topicPartition, count); + return String.format("RemoteFetch[source-broker-id=%d topic-partition=%s remote-fetch-count=%s]", + sourceBrokerId, topicPartition, remoteFetchCount); } @Override @@ -66,12 +66,12 @@ public final class RemoteFetchSpec { if (o == null || getClass() != o.getClass()) return false; RemoteFetchSpec that = (RemoteFetchSpec) o; return sourceBrokerId == that.sourceBrokerId - && count == that.count + && Objects.equals(remoteFetchCount, that.remoteFetchCount) && Objects.equals(topicPartition, that.topicPartition); } @Override public int hashCode() { - return Objects.hash(sourceBrokerId, topicPartition, count); + return Objects.hash(sourceBrokerId, topicPartition, remoteFetchCount); } }