diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 5ca596ec7b7..58bc519d3d1 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -1617,51 +1617,24 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { /** * Returns a map containing the epoch vs start-offset for the given leader epoch map by filtering the epochs that * does not contain any messages/records associated with them. - * For ex: - *
-     * {@code
-     *  
-     *  0 - 0
-     *  1 - 10
-     *  2 - 20
-     *  3 - 30
-     *  4 - 40
-     *  5 - 60  // epoch 5 does not have records or messages associated with it
-     *  6 - 60
-     *  7 - 70
-     * }
-     * 
- * When the above leaderEpochMap is passed to this method, it returns the following map: - *
-     * {@code
-     *  
-     *  0 - 0
-     *  1 - 10
-     *  2 - 20
-     *  3 - 30
-     *  4 - 40
-     *  6 - 60
-     *  7 - 70
-     * }
-     * 
+ * * @param leaderEpochs The leader epoch map to be refined. + * @return A map containing only the epochs and their start offsets that have associated messages/records. */ // Visible for testing static NavigableMap buildFilteredLeaderEpochMap(NavigableMap leaderEpochs) { - List epochsWithNoMessages = new ArrayList<>(); + TreeMap filteredLeaderEpochs = new TreeMap<>(); Map.Entry previousEpochAndOffset = null; + for (Map.Entry currentEpochAndOffset : leaderEpochs.entrySet()) { - if (previousEpochAndOffset != null && previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) { - epochsWithNoMessages.add(previousEpochAndOffset.getKey()); + if (previousEpochAndOffset != null && !previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) { + filteredLeaderEpochs.put(previousEpochAndOffset.getKey(), previousEpochAndOffset.getValue()); } previousEpochAndOffset = currentEpochAndOffset; } - if (epochsWithNoMessages.isEmpty()) { - return leaderEpochs; - } - TreeMap filteredLeaderEpochs = new TreeMap<>(leaderEpochs); - for (Integer epochWithNoMessage : epochsWithNoMessages) { - filteredLeaderEpochs.remove(epochWithNoMessage); + + if (previousEpochAndOffset != null) { + filteredLeaderEpochs.put(previousEpochAndOffset.getKey(), previousEpochAndOffset.getValue()); } return filteredLeaderEpochs; } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 99d9d43c9a1..c0b89bbf044 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -1847,6 +1847,18 @@ public class RemoteLogManagerTest { NavigableMap refinedLeaderEpochMap = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset); assertEquals(expectedLeaderEpochs, refinedLeaderEpochMap); + + + TreeMap leaderEpochToStartOffset2 = new TreeMap<>(); + leaderEpochToStartOffset2.put(0, 0L); + leaderEpochToStartOffset2.put(1, 0L); + leaderEpochToStartOffset2.put(2, 0L); + + TreeMap expectedLeaderEpochs2 = new TreeMap<>(); + expectedLeaderEpochs2.put(2, 0L); + + NavigableMap refinedLeaderEpochMap2 = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset2); + assertEquals(expectedLeaderEpochs2, refinedLeaderEpochMap2); } @Test