mirror of https://github.com/apache/kafka.git
MINOR: Optimize RemoteLogManager#buildFilteredLeaderEpochMap (#20205)
Optimize `RemoteLogManager#buildFilteredLeaderEpochMap` . Add a temporary unit test `testBuildFilteredLeaderEpochMapModify` in `RemoteLogManagerTest` to verify the output consistency of the method before and after optimization. Randomly generate leaderEpochs and iterate 100000 times for verification. ``` @Test public void testBuildFilteredLeaderEpochMapModify() { int testIterations = 100000; for (int i = 0; i < testIterations; i++) { TreeMap<Integer, Long> leaderEpochToStartOffset = generateRandomLeaderEpochAndStartOffset(); // before optimize NavigableMap<Integer, Long> optimizeBefore = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset); // after optimize NavigableMap<Integer, Long> optimizeAfter = RemoteLogManager.buildFilteredLeaderEpochMap2(leaderEpochToStartOffset); assertEquals(optimizeBefore, optimizeAfter); } } private static TreeMap<Integer, Long> generateRandomLeaderEpochAndStartOffset() { TreeMap<Integer, Long> map = new TreeMap<>(); Random random = new Random(); int numEntries = random.nextInt(100000); long lastStartOffset = 0; for (int i = 0; i < numEntries; i++) { // generate a leader epoch int leaderEpoch = random.nextInt(100000); long startOffset; // generate a random start offset , or use the last start offset if (i > 0 && random.nextDouble() < 0.2) { startOffset = lastStartOffset; } else { startOffset = Math.abs(random.nextLong()) % 100000; } lastStartOffset = startOffset; map.put(leaderEpoch, startOffset); } return map; } ``` Command: ``` ./gradlew storage:test --tests RemoteLogManagerTest``` Result: All unit tests passed. <img width="1258" height="424" alt="image" src="https://github.com/user-attachments/assets/7d9fc3b5-3bbc-440f-b1cf-3a2a5f97557a" /> <img width="411" height="66" alt="image" src="https://github.com/user-attachments/assets/22a0b443-88e8-43d2-a3f2-51266935ed34" /> Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
93447d5b88
commit
a27d6e32b0
|
@ -1617,51 +1617,24 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
|
||||||
/**
|
/**
|
||||||
* Returns a map containing the epoch vs start-offset for the given leader epoch map by filtering the epochs that
|
* Returns a map containing the epoch vs start-offset for the given leader epoch map by filtering the epochs that
|
||||||
* does not contain any messages/records associated with them.
|
* does not contain any messages/records associated with them.
|
||||||
* For ex:
|
*
|
||||||
* <pre>
|
|
||||||
* {@code
|
|
||||||
* <epoch - start offset>
|
|
||||||
* 0 - 0
|
|
||||||
* 1 - 10
|
|
||||||
* 2 - 20
|
|
||||||
* 3 - 30
|
|
||||||
* 4 - 40
|
|
||||||
* 5 - 60 // epoch 5 does not have records or messages associated with it
|
|
||||||
* 6 - 60
|
|
||||||
* 7 - 70
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
* When the above leaderEpochMap is passed to this method, it returns the following map:
|
|
||||||
* <pre>
|
|
||||||
* {@code
|
|
||||||
* <epoch - start offset>
|
|
||||||
* 0 - 0
|
|
||||||
* 1 - 10
|
|
||||||
* 2 - 20
|
|
||||||
* 3 - 30
|
|
||||||
* 4 - 40
|
|
||||||
* 6 - 60
|
|
||||||
* 7 - 70
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
* @param leaderEpochs The leader epoch map to be refined.
|
* @param leaderEpochs The leader epoch map to be refined.
|
||||||
|
* @return A map containing only the epochs and their start offsets that have associated messages/records.
|
||||||
*/
|
*/
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
static NavigableMap<Integer, Long> buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> leaderEpochs) {
|
static NavigableMap<Integer, Long> buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> leaderEpochs) {
|
||||||
List<Integer> epochsWithNoMessages = new ArrayList<>();
|
TreeMap<Integer, Long> filteredLeaderEpochs = new TreeMap<>();
|
||||||
Map.Entry<Integer, Long> previousEpochAndOffset = null;
|
Map.Entry<Integer, Long> previousEpochAndOffset = null;
|
||||||
|
|
||||||
for (Map.Entry<Integer, Long> currentEpochAndOffset : leaderEpochs.entrySet()) {
|
for (Map.Entry<Integer, Long> currentEpochAndOffset : leaderEpochs.entrySet()) {
|
||||||
if (previousEpochAndOffset != null && previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) {
|
if (previousEpochAndOffset != null && !previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) {
|
||||||
epochsWithNoMessages.add(previousEpochAndOffset.getKey());
|
filteredLeaderEpochs.put(previousEpochAndOffset.getKey(), previousEpochAndOffset.getValue());
|
||||||
}
|
}
|
||||||
previousEpochAndOffset = currentEpochAndOffset;
|
previousEpochAndOffset = currentEpochAndOffset;
|
||||||
}
|
}
|
||||||
if (epochsWithNoMessages.isEmpty()) {
|
|
||||||
return leaderEpochs;
|
if (previousEpochAndOffset != null) {
|
||||||
}
|
filteredLeaderEpochs.put(previousEpochAndOffset.getKey(), previousEpochAndOffset.getValue());
|
||||||
TreeMap<Integer, Long> filteredLeaderEpochs = new TreeMap<>(leaderEpochs);
|
|
||||||
for (Integer epochWithNoMessage : epochsWithNoMessages) {
|
|
||||||
filteredLeaderEpochs.remove(epochWithNoMessage);
|
|
||||||
}
|
}
|
||||||
return filteredLeaderEpochs;
|
return filteredLeaderEpochs;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1847,6 +1847,18 @@ public class RemoteLogManagerTest {
|
||||||
|
|
||||||
NavigableMap<Integer, Long> refinedLeaderEpochMap = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
|
NavigableMap<Integer, Long> refinedLeaderEpochMap = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
|
||||||
assertEquals(expectedLeaderEpochs, refinedLeaderEpochMap);
|
assertEquals(expectedLeaderEpochs, refinedLeaderEpochMap);
|
||||||
|
|
||||||
|
|
||||||
|
TreeMap<Integer, Long> leaderEpochToStartOffset2 = new TreeMap<>();
|
||||||
|
leaderEpochToStartOffset2.put(0, 0L);
|
||||||
|
leaderEpochToStartOffset2.put(1, 0L);
|
||||||
|
leaderEpochToStartOffset2.put(2, 0L);
|
||||||
|
|
||||||
|
TreeMap<Integer, Long> expectedLeaderEpochs2 = new TreeMap<>();
|
||||||
|
expectedLeaderEpochs2.put(2, 0L);
|
||||||
|
|
||||||
|
NavigableMap<Integer, Long> refinedLeaderEpochMap2 = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset2);
|
||||||
|
assertEquals(expectedLeaderEpochs2, refinedLeaderEpochMap2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue