From f69379cf6b37c413a777adeed8edaaa7ee3a6c69 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 10 Jun 2025 21:22:50 +0500 Subject: [PATCH] MINOR: Remove unused code from storage classes (#19853) Remove unused code from storage classes. Reviewers: Kamal Chandraprakash , TengYao Chi , Kuan-Po Tseng , Chia-Ping Tsai --- .../RemoteLogSegmentMetadataSnapshot.java | 31 ------------------- .../RemotePartitionMetadataEventHandler.java | 6 ---- .../storage/RemoteLogManagerConfig.java | 20 ++++++++++++ .../internals/log/SkimpyOffsetMap.java | 21 +------------ .../remote/storage/LocalTieredStorage.java | 2 -- 5 files changed, 21 insertions(+), 59 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java index 7c67adc5ba3..191999de4c3 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java @@ -83,37 +83,6 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { */ private final boolean txnIdxEmpty; - /** - * Creates an instance with the given metadata of remote log segment. - *

- * {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch - * then it should have an entry with epoch mapping to start-offset of this segment. - * - * @param segmentId Universally unique remote log segment id. - * @param startOffset Start offset of this segment (inclusive). - * @param endOffset End offset of this segment (inclusive). - * @param maxTimestampMs Maximum timestamp in milliseconds in this segment. - * @param brokerId Broker id from which this event is generated. - * @param eventTimestampMs Epoch time in milliseconds at which the remote log segment is copied to the remote tier storage. - * @param segmentSizeInBytes Size of this segment in bytes. - * @param customMetadata Custom metadata. - * @param state State of the respective segment of remoteLogSegmentId. - * @param segmentLeaderEpochs leader epochs occurred within this segment. - */ - public RemoteLogSegmentMetadataSnapshot(Uuid segmentId, - long startOffset, - long endOffset, - long maxTimestampMs, - int brokerId, - long eventTimestampMs, - int segmentSizeInBytes, - Optional customMetadata, - RemoteLogSegmentState state, - Map segmentLeaderEpochs) { - this(segmentId, startOffset, endOffset, maxTimestampMs, brokerId, eventTimestampMs, segmentSizeInBytes, - customMetadata, state, segmentLeaderEpochs, false); - } - /** * Creates an instance with the given metadata of remote log segment. *

diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java index 075cab58817..8cd5801b64d 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java @@ -42,12 +42,6 @@ public abstract class RemotePartitionMetadataEventHandler { protected abstract void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata); - public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, - int metadataPartition, - Long metadataPartitionOffset) { - // no-op by default - } - public abstract void clearTopicPartition(TopicIdPartition topicIdPartition); public abstract void markInitialized(TopicIdPartition partition); diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 0f58ef4e26b..15002d12205 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -407,14 +407,26 @@ public final class RemoteLogManagerConfig { return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); } + /** + * Used by the RemoteStorageManager and RemoteLogMetadataManager plugins. + */ + @SuppressWarnings("unused") public long remoteLogManagerTaskRetryBackoffMs() { return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); } + /** + * Used by the RemoteStorageManager and RemoteLogMetadataManager plugins. + */ + @SuppressWarnings("unused") public long remoteLogManagerTaskRetryBackoffMaxMs() { return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); } + /** + * Used by the RemoteStorageManager and RemoteLogMetadataManager plugins. + */ + @SuppressWarnings("unused") public double remoteLogManagerTaskRetryJitter() { return config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); } @@ -435,10 +447,18 @@ public final class RemoteLogManagerConfig { return config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); } + /** + * Used by the RemoteStorageManager plugin. + */ + @SuppressWarnings("unused") public String remoteStorageManagerPrefix() { return config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } + /** + * Used by the RemoteLogMetadataManager plugin. + */ + @SuppressWarnings("unused") public String remoteLogMetadataManagerPrefix() { return config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java index 959358c2cee..17800cc27ab 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java @@ -56,17 +56,11 @@ public class SkimpyOffsetMap implements OffsetMap { /* number of entries put into the map */ private int entries = 0; - /* number of lookups on the map */ - private long lookups = 0L; - - /* the number of probes for all lookups */ - private long probes = 0L; - /* the latest offset written into the map */ private long lastOffset = -1L; /** - * Create an instance of SkimplyOffsetMap with the default hash algorithm (MD5). + * Create an instance of SkimpyOffsetMap with the default hash algorithm (MD5). * * @param memory The amount of memory this map can use */ @@ -105,7 +99,6 @@ public class SkimpyOffsetMap implements OffsetMap { */ @Override public long get(ByteBuffer key) throws DigestException { - ++lookups; hashInto(key, hash1); // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot int attempt = 0; @@ -136,7 +129,6 @@ public class SkimpyOffsetMap implements OffsetMap { throw new IllegalArgumentException("Attempted to add a new entry to a full offset map, " + "entries: " + entries + ", slots: " + slots); - ++lookups; hashInto(key, hash1); // probe until we find the first empty slot @@ -174,8 +166,6 @@ public class SkimpyOffsetMap implements OffsetMap { @Override public void clear() { this.entries = 0; - this.lookups = 0L; - this.probes = 0L; this.lastOffset = -1L; Arrays.fill(bytes.array(), bytes.arrayOffset(), bytes.arrayOffset() + bytes.limit(), (byte) 0); } @@ -196,14 +186,6 @@ public class SkimpyOffsetMap implements OffsetMap { return lastOffset; } - /** - * The rate of collisions in the lookups - */ - // Visible for testing - public double collisionRate() { - return (this.probes - this.lookups) / (double) this.lookups; - } - /** * Check that there is no entry at the given position */ @@ -223,7 +205,6 @@ public class SkimpyOffsetMap implements OffsetMap { private int positionOf(byte[] hash, int attempt) { int probe = ByteUtils.readIntBE(hash, Math.min(attempt, hashSize - 4)) + Math.max(0, attempt - hashSize + 4); int slot = Utils.abs(probe) % slots; - ++this.probes; return slot * bytesPerEntry; } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java index 2e0adda5858..1f082a92179 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java @@ -106,8 +106,6 @@ import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDir */ public final class LocalTieredStorage implements RemoteStorageManager { - public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local."; - /** * The root directory of this storage. */