MINOR: Cleanup storage module (#16207)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2024-08-01 22:07:50 +05:30 committed by GitHub
parent ad605b7bae
commit 7d88bde9a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 20 additions and 30 deletions

View File

@ -202,15 +202,14 @@ public class LeaderEpochFileCache {
* which has messages assigned to it. * which has messages assigned to it.
*/ */
public OptionalInt latestEpoch() { public OptionalInt latestEpoch() {
Optional<EpochEntry> entry = latestEntry(); return latestEntry().map(epochEntry -> OptionalInt.of(epochEntry.epoch)).orElseGet(OptionalInt::empty);
return entry.isPresent() ? OptionalInt.of(entry.get().epoch) : OptionalInt.empty();
} }
public OptionalInt previousEpoch() { public OptionalInt previousEpoch() {
lock.readLock().lock(); lock.readLock().lock();
try { try {
Optional<Map.Entry<Integer, EpochEntry>> lowerEntry = latestEntry().flatMap(entry -> Optional.ofNullable(epochs.lowerEntry(entry.epoch))); return latestEntry().flatMap(entry -> Optional.ofNullable(epochs.lowerEntry(entry.epoch)))
return lowerEntry.isPresent() ? OptionalInt.of(lowerEntry.get().getKey()) : OptionalInt.empty(); .map(integerEpochEntryEntry -> OptionalInt.of(integerEpochEntryEntry.getKey())).orElseGet(OptionalInt::empty);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -222,7 +221,7 @@ public class LeaderEpochFileCache {
public Optional<EpochEntry> earliestEntry() { public Optional<EpochEntry> earliestEntry() {
lock.readLock().lock(); lock.readLock().lock();
try { try {
return Optional.ofNullable(epochs.firstEntry()).map(x -> x.getValue()); return Optional.ofNullable(epochs.firstEntry()).map(Map.Entry::getValue);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -287,7 +286,7 @@ public class LeaderEpochFileCache {
public Map.Entry<Integer, Long> endOffsetFor(int requestedEpoch, long logEndOffset) { public Map.Entry<Integer, Long> endOffsetFor(int requestedEpoch, long logEndOffset) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
Map.Entry<Integer, Long> epochAndOffset = null; Map.Entry<Integer, Long> epochAndOffset;
if (requestedEpoch == UNDEFINED_EPOCH) { if (requestedEpoch == UNDEFINED_EPOCH) {
// This may happen if a bootstrapping follower sends a request with undefined epoch or // This may happen if a bootstrapping follower sends a request with undefined epoch or
// a follower is on the older message format where leader epochs are not recorded // a follower is on the older message format where leader epochs are not recorded

View File

@ -534,7 +534,7 @@ public class LogConfig extends AbstractConfig {
*/ */
public static LogConfig fromProps(Map<?, ?> defaults, Properties overrides) { public static LogConfig fromProps(Map<?, ?> defaults, Properties overrides) {
Properties props = new Properties(); Properties props = new Properties();
defaults.forEach((k, v) -> props.put(k, v)); props.putAll(defaults);
props.putAll(overrides); props.putAll(overrides);
Set<String> overriddenKeys = overrides.keySet().stream().map(k -> (String) k).collect(Collectors.toSet()); Set<String> overriddenKeys = overrides.keySet().stream().map(k -> (String) k).collect(Collectors.toSet());
return new LogConfig(props, overriddenKeys); return new LogConfig(props, overriddenKeys);

View File

@ -460,10 +460,8 @@ public class LogSegment implements Closeable {
} }
public OptionalLong fetchUpperBoundOffset(OffsetPosition startOffsetPosition, int fetchSize) throws IOException { public OptionalLong fetchUpperBoundOffset(OffsetPosition startOffsetPosition, int fetchSize) throws IOException {
Optional<OffsetPosition> position = offsetIndex().fetchUpperBoundOffset(startOffsetPosition, fetchSize); return offsetIndex().fetchUpperBoundOffset(startOffsetPosition, fetchSize)
if (position.isPresent()) .map(offsetPosition -> OptionalLong.of(offsetPosition.offset)).orElseGet(OptionalLong::empty);
return OptionalLong.of(position.get().offset);
return OptionalLong.empty();
} }
/** /**

View File

@ -109,14 +109,13 @@ public class SkimpyOffsetMap implements OffsetMap {
hashInto(key, hash1); hashInto(key, hash1);
// search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot
int attempt = 0; int attempt = 0;
int pos = 0;
//we need to guard against attempt integer overflow if the map is full //we need to guard against attempt integer overflow if the map is full
//limit attempt to number of slots once positionOf(..) enters linear search mode //limit attempt to number of slots once positionOf(..) enters linear search mode
int maxAttempts = slots + hashSize - 4; int maxAttempts = slots + hashSize - 4;
do { do {
if (attempt >= maxAttempts) if (attempt >= maxAttempts)
return -1L; return -1L;
pos = positionOf(hash1, attempt); int pos = positionOf(hash1, attempt);
bytes.position(pos); bytes.position(pos);
if (isEmpty(pos)) if (isEmpty(pos))
return -1L; return -1L;

View File

@ -76,10 +76,7 @@ public final class LocalTieredStorageEvent implements Comparable<LocalTieredStor
if (condition.baseOffset != null && !metadata.isPresent()) { if (condition.baseOffset != null && !metadata.isPresent()) {
return false; return false;
} }
if (condition.baseOffset != null && metadata.get().startOffset() != condition.baseOffset) { return condition.baseOffset == null || metadata.get().startOffset() == condition.baseOffset;
return false;
}
return true;
} }
/** /**

View File

@ -303,7 +303,7 @@ public final class LocalTieredStorageTest {
final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage); final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage);
assertEquals(asList(topicPartition), snapshot.getTopicPartitions()); assertEquals(Collections.singletonList(topicPartition), snapshot.getTopicPartitions());
assertEquals(asList(wrap(record1), wrap(record2)), extractRecordsValue(snapshot, id)); assertEquals(asList(wrap(record1), wrap(record2)), extractRecordsValue(snapshot, id));
} }
@ -330,7 +330,7 @@ public final class LocalTieredStorageTest {
actual.put(idA, extractRecordsValue(snapshot, idA)); actual.put(idA, extractRecordsValue(snapshot, idA));
actual.put(idB, extractRecordsValue(snapshot, idB)); actual.put(idB, extractRecordsValue(snapshot, idB));
assertEquals(asList(topicPartition), snapshot.getTopicPartitions()); assertEquals(Collections.singletonList(topicPartition), snapshot.getTopicPartitions());
assertEquals(expected, actual); assertEquals(expected, actual);
} }

View File

@ -96,7 +96,7 @@ public final class RemoteTopicPartitionDirectory {
void traverse(final LocalTieredStorageTraverser traverser) { void traverse(final LocalTieredStorageTraverser traverser) {
traverser.visitTopicIdPartition(topicIdPartition); traverser.visitTopicIdPartition(topicIdPartition);
listFilesets().stream().forEach(fileset -> traverser.visitSegment(fileset)); listFilesets().forEach(traverser::visitSegment);
} }
private List<RemoteLogSegmentFileset> listFilesets() { private List<RemoteLogSegmentFileset> listFilesets() {

View File

@ -115,9 +115,9 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))
/* /*
* (3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption * (3) Stops and restarts the broker. The purpose of this test is to
* from a given offset and b) verify that upon broker start, existing remote log segments * a) exercise consumption from a given offset and
* metadata are loaded by Kafka and these log segments available. * b) verify that upon broker start, existing remote log segments metadata are loaded by Kafka and these log segments available.
* *
* Acceptance: * Acceptance:
* ----------- * -----------

View File

@ -112,13 +112,10 @@ public final class RecordsKeyValueMatcher<R1, R2, K, V> extends TypeSafeDiagnosi
.appendValue(actual.getClass().getSimpleName()); .appendValue(actual.getClass().getSimpleName());
return false; return false;
} }
if (!compare(expectedRecord.key(), actualRecord.key(), keySerde.deserializer(), "Record key", return compare(expectedRecord.key(), actualRecord.key(), keySerde.deserializer(), "Record key",
mismatchDescription) || mismatchDescription) &&
!compare(expectedRecord.value(), actualRecord.value(), valueSerde.deserializer(), "Record value", compare(expectedRecord.value(), actualRecord.value(), valueSerde.deserializer(), "Record value",
mismatchDescription)) { mismatchDescription);
return false;
}
return true;
} }
private boolean compare(ByteBuffer lhs, private boolean compare(ByteBuffer lhs,