From 7d88bde9a9a0000e69fe69baadd0859d594dc7a0 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Thu, 1 Aug 2024 22:07:50 +0530 Subject: [PATCH] MINOR: Cleanup storage module (#16207) Reviewers: Chia-Ping Tsai --- .../storage/internals/epoch/LeaderEpochFileCache.java | 11 +++++------ .../apache/kafka/storage/internals/log/LogConfig.java | 2 +- .../kafka/storage/internals/log/LogSegment.java | 6 ++---- .../kafka/storage/internals/log/SkimpyOffsetMap.java | 3 +-- .../log/remote/storage/LocalTieredStorageEvent.java | 5 +---- .../log/remote/storage/LocalTieredStorageTest.java | 4 ++-- .../remote/storage/RemoteTopicPartitionDirectory.java | 2 +- .../integration/OffloadAndConsumeFromLeaderTest.java | 6 +++--- .../tiered/storage/utils/RecordsKeyValueMatcher.java | 11 ++++------- 9 files changed, 20 insertions(+), 30 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index ece9f4b6468..8961957306d 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -202,15 +202,14 @@ public class LeaderEpochFileCache { * which has messages assigned to it. */ public OptionalInt latestEpoch() { - Optional entry = latestEntry(); - return entry.isPresent() ? OptionalInt.of(entry.get().epoch) : OptionalInt.empty(); + return latestEntry().map(epochEntry -> OptionalInt.of(epochEntry.epoch)).orElseGet(OptionalInt::empty); } public OptionalInt previousEpoch() { lock.readLock().lock(); try { - Optional> lowerEntry = latestEntry().flatMap(entry -> Optional.ofNullable(epochs.lowerEntry(entry.epoch))); - return lowerEntry.isPresent() ? OptionalInt.of(lowerEntry.get().getKey()) : OptionalInt.empty(); + return latestEntry().flatMap(entry -> Optional.ofNullable(epochs.lowerEntry(entry.epoch))) + .map(integerEpochEntryEntry -> OptionalInt.of(integerEpochEntryEntry.getKey())).orElseGet(OptionalInt::empty); } finally { lock.readLock().unlock(); } @@ -222,7 +221,7 @@ public class LeaderEpochFileCache { public Optional earliestEntry() { lock.readLock().lock(); try { - return Optional.ofNullable(epochs.firstEntry()).map(x -> x.getValue()); + return Optional.ofNullable(epochs.firstEntry()).map(Map.Entry::getValue); } finally { lock.readLock().unlock(); } @@ -287,7 +286,7 @@ public class LeaderEpochFileCache { public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffset) { lock.readLock().lock(); try { - Map.Entry epochAndOffset = null; + Map.Entry epochAndOffset; if (requestedEpoch == UNDEFINED_EPOCH) { // This may happen if a bootstrapping follower sends a request with undefined epoch or // a follower is on the older message format where leader epochs are not recorded diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 9cf4a71e38e..d026ea3b4b1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -534,7 +534,7 @@ public class LogConfig extends AbstractConfig { */ public static LogConfig fromProps(Map defaults, Properties overrides) { Properties props = new Properties(); - defaults.forEach((k, v) -> props.put(k, v)); + props.putAll(defaults); props.putAll(overrides); Set overriddenKeys = overrides.keySet().stream().map(k -> (String) k).collect(Collectors.toSet()); return new LogConfig(props, overriddenKeys); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index bf702949ffc..2113bf1de29 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -460,10 +460,8 @@ public class LogSegment implements Closeable { } public OptionalLong fetchUpperBoundOffset(OffsetPosition startOffsetPosition, int fetchSize) throws IOException { - Optional position = offsetIndex().fetchUpperBoundOffset(startOffsetPosition, fetchSize); - if (position.isPresent()) - return OptionalLong.of(position.get().offset); - return OptionalLong.empty(); + return offsetIndex().fetchUpperBoundOffset(startOffsetPosition, fetchSize) + .map(offsetPosition -> OptionalLong.of(offsetPosition.offset)).orElseGet(OptionalLong::empty); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java index 8f15a4f7f9d..959358c2cee 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java @@ -109,14 +109,13 @@ public class SkimpyOffsetMap implements OffsetMap { hashInto(key, hash1); // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot int attempt = 0; - int pos = 0; //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode int maxAttempts = slots + hashSize - 4; do { if (attempt >= maxAttempts) return -1L; - pos = positionOf(hash1, attempt); + int pos = positionOf(hash1, attempt); bytes.position(pos); if (isEmpty(pos)) return -1L; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java index 9617da17c65..83884e6ce3d 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java @@ -76,10 +76,7 @@ public final class LocalTieredStorageEvent implements Comparable traverser.visitSegment(fileset)); + listFilesets().forEach(traverser::visitSegment); } private List listFilesets() { diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java index ffb8e666d18..af21f0742e7 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java @@ -115,9 +115,9 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) /* - * (3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption - * from a given offset and b) verify that upon broker start, existing remote log segments - * metadata are loaded by Kafka and these log segments available. + * (3) Stops and restarts the broker. The purpose of this test is to + * a) exercise consumption from a given offset and + * b) verify that upon broker start, existing remote log segments metadata are loaded by Kafka and these log segments available. * * Acceptance: * ----------- diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java index d9843f7f857..4034cd63a2a 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java @@ -112,13 +112,10 @@ public final class RecordsKeyValueMatcher extends TypeSafeDiagnosi .appendValue(actual.getClass().getSimpleName()); return false; } - if (!compare(expectedRecord.key(), actualRecord.key(), keySerde.deserializer(), "Record key", - mismatchDescription) || - !compare(expectedRecord.value(), actualRecord.value(), valueSerde.deserializer(), "Record value", - mismatchDescription)) { - return false; - } - return true; + return compare(expectedRecord.key(), actualRecord.key(), keySerde.deserializer(), "Record key", + mismatchDescription) && + compare(expectedRecord.value(), actualRecord.value(), valueSerde.deserializer(), "Record value", + mismatchDescription); } private boolean compare(ByteBuffer lhs,