From 8be2a8ed4e0ecea9a213c5a5ee72ddb2029774ac Mon Sep 17 00:00:00 2001 From: Joao Pedro Fonseca Dantas <67479090+fonsdant@users.noreply.github.com> Date: Wed, 5 Feb 2025 22:20:53 -0300 Subject: [PATCH] MINOR: Add javadocs to AbstractMergedSortedCacheStoreIterator (#18772) While reviewing PR #18287, I wrote some javadocs to help me understand the AbstractMergedSortedCacheStoreIterator. Maybe we could add them to help the next developers getting into it. Reviewers: Anna Sophie Blee-Goldman --- ...bstractMergedSortedCacheStoreIterator.java | 136 +++++++++++++++++- 1 file changed, 133 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index 834b18f6491..e39257adba9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -23,16 +23,31 @@ import org.apache.kafka.streams.state.KeyValueIterator; import java.util.NoSuchElementException; /** - * Merges two iterators. Assumes each of them is sorted by key + * AbstractMergedSortedCacheStoreIterator is an abstract class for merging two sorted iterators, one from a cache and + * the other from a store. It ensures the merged results maintain sorted order while resolving conflicts between cache + * and store entries. * - * @param - * @param + *

This iterator is used for state stores in Kafka Streams, which have an (optional) caching layer that needs to be + * "merged" with the underlying state. It handles common scenarios like skipping records with cached tombstones (deleted + * entries) and preferring cache entries over store entries when conflicts arise.

+ * + * @param The type of the resulting merged key. + * @param The type of the store key. + * @param The type of the resulting merged value. + * @param The type of the store value. */ abstract class AbstractMergedSortedCacheStoreIterator implements KeyValueIterator { private final PeekingKeyValueIterator cacheIterator; private final KeyValueIterator storeIterator; private final boolean forward; + /** + * Constructs an AbstractMergedSortedCacheStoreIterator. + * + * @param cacheIterator The iterator for the cache, assumed to be sorted by key. + * @param storeIterator The iterator for the store, assumed to be sorted by key. + * @param forward The direction of iteration. True for forward, false for reverse. + */ AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator cacheIterator, final KeyValueIterator storeIterator, final boolean forward) { @@ -41,20 +56,79 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K this.forward = forward; } + /** + * Compares the keys from the cache and store to determine their ordering. + * + * @param cacheKey The key from the cache. + * @param storeKey The key from the store. + * + * @return A negative integer, zero, or a positive integer as the cache key is less than, + * equal to, or greater than the store key. + */ abstract int compare(final Bytes cacheKey, final KS storeKey); + /** + * Deserializes a store key into a generic merged key type. + * + * @param key The store key to deserialize. + * + * @return The deserialized key. + */ abstract K deserializeStoreKey(final KS key); + /** + * Deserializes a key-value pair from the store into a generic merged key-value pair. + * + * @param pair The key-value pair from the store. + * + * @return The deserialized key-value pair. + */ abstract KeyValue deserializeStorePair(final KeyValue pair); + /** + * Deserializes a cache key into a generic merged key type. + * + * @param cacheKey The cache key to deserialize. + * + * @return The deserialized key. + */ abstract K deserializeCacheKey(final Bytes cacheKey); + /** + * Deserializes a cache entry into a generic value type. + * + * @param cacheEntry The cache entry to deserialize. + * + * @return The deserialized value. + */ abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry); + /** + * Checks if a cache entry is a tombstone (representing a deleted value). + * + * @param nextFromCache The cache entry to check. + * + * @return True if the cache entry is a tombstone, false otherwise. + */ private boolean isDeletedCacheEntry(final KeyValue nextFromCache) { return nextFromCache.value.value() == null; } + /** + * Determines if there are more entries to iterate over, resolving conflicts between cache and store entries (e.g., + * skipping tombstones). + * + *

Conflict resolution scenarios:

+ * + *
    + *
  • Cache contains a tombstone for a key: Skip both the cache tombstone and the corresponding store entry (if exists).
  • + *
  • Cache contains a value for a key present in the store: Prefer the cache value and skip the store entry.
  • + *
  • Cache key is unique: Return the cache value.
  • + *
  • Store key is unique: Return the store value.
  • + *
+ * + * @return True if there are more entries, false otherwise. + */ @Override public boolean hasNext() { // skip over items deleted from cache, and corresponding store items if they have the same key @@ -86,6 +160,13 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K return cacheIterator.hasNext() || storeIterator.hasNext(); } + /** + * Retrieves the next key-value pair in the merged iteration. + * + * @return The next key-value pair. + * + * @throws NoSuchElementException If there are no more elements to iterate. + */ @Override public KeyValue next() { if (!hasNext()) { @@ -107,6 +188,15 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K return chooseNextValue(nextCacheKey, nextStoreKey, comparison); } + /** + * Resolves which source (cache or store) to fetch the next key-value pair when a comparison is performed. + * + * @param nextCacheKey The next key from the cache. + * @param nextStoreKey The next key from the store. + * @param comparison The comparison result between the cache and store keys. + * + * @return The next key-value pair. + */ private KeyValue chooseNextValue(final Bytes nextCacheKey, final KS nextStoreKey, final int comparison) { @@ -133,6 +223,15 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K } } + /** + * Fetches the next value from the store, ensuring it matches the expected key. + * + * @param nextStoreKey The expected next key from the store. + * + * @return The next key-value pair from the store. + * + * @throws IllegalStateException If the key does not match the expected key. + */ private KeyValue nextStoreValue(final KS nextStoreKey) { final KeyValue next = storeIterator.next(); @@ -143,6 +242,15 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K return deserializeStorePair(next); } + /** + * Fetches the next value from the cache, ensuring it matches the expected key. + * + * @param nextCacheKey The expected next key from the cache. + * + * @return The next key-value pair from the cache. + * + * @throws IllegalStateException If the key does not match the expected key. + */ private KeyValue nextCacheValue(final Bytes nextCacheKey) { final KeyValue next = cacheIterator.next(); @@ -153,6 +261,13 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value)); } + /** + * Peeks at the next key in the merged iteration without advancing the iterator. + * + * @return The next key in the iteration. + * + * @throws NoSuchElementException If there are no more elements to peek. + */ @Override public K peekNextKey() { if (!hasNext()) { @@ -174,6 +289,18 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K return chooseNextKey(nextCacheKey, nextStoreKey, comparison); } + /** + * Determines the next key to return from the merged iteration based on the comparison of the cache and store keys. + * Resolves conflicts by considering the iteration direction and ensuring the merged order is maintained. + * + * @param nextCacheKey The next key from the cache. + * @param nextStoreKey The next key from the store. + * @param comparison The comparison result between the cache and store keys. A negative value indicates the cache + * key is smaller, zero indicates equality, and a positive value indicates the store key is + * smaller. + * + * @return The next key to return from the merged iteration. + */ private K chooseNextKey(final Bytes nextCacheKey, final KS nextStoreKey, final int comparison) { @@ -200,6 +327,9 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K } } + /** + * Closes the iterators and releases any associated resources. + */ @Override public void close() { cacheIterator.close();