From aeca384641bb9593d0862481f9f61200ed841511 Mon Sep 17 00:00:00 2001 From: Linu Shibu <72352875+linu-shibu@users.noreply.github.com> Date: Tue, 7 May 2024 05:19:35 +0530 Subject: [PATCH] KAFKA-16356: Remove class-name dispatch in RemoteLogMetadataSerde (#15620) Reviewers: Greg Harris , Luke Chen , Igor Soarez , The-Gamer-01 <19974361760@163.com> --- build.gradle | 2 +- .../serialization/RemoteLogMetadataSerde.java | 70 +++++++++---------- 2 files changed, 33 insertions(+), 39 deletions(-) diff --git a/build.gradle b/build.gradle index 7cef6402c6a..f9e905753ac 100644 --- a/build.gradle +++ b/build.gradle @@ -272,7 +272,7 @@ subprojects { options.encoding = 'UTF-8' options.compilerArgs << "-Xlint:all" // temporary exclusions until all the warnings are fixed - if (!project.path.startsWith(":connect")) + if (!project.path.startsWith(":connect") && !project.path.startsWith(":storage")) options.compilerArgs << "-Xlint:-rawtypes" options.compilerArgs << "-Xlint:-serial" options.compilerArgs << "-Xlint:-try" diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java index c024640c73e..3b8efbabace 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java @@ -33,8 +33,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; import java.io.PrintStream; -import java.util.HashMap; -import java.util.Map; /** * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde @@ -46,52 +44,45 @@ public class RemoteLogMetadataSerde { private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); - private final Map remoteLogStorageClassToApiKey; - private final Map> keyToTransform; private final BytesApiMessageSerde bytesApiMessageSerde; + private final RemoteLogSegmentMetadataTransform segmentTransform; + private final RemoteLogSegmentMetadataUpdateTransform segmentUpdateTransform; + private final RemotePartitionDeleteMetadataTransform partitionDeleteTransform; + private final RemoteLogSegmentMetadataSnapshotTransform segmentSnapshotTransform; + public RemoteLogMetadataSerde() { - remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap(); - keyToTransform = createRemoteLogMetadataTransforms(); bytesApiMessageSerde = new BytesApiMessageSerde() { @Override public ApiMessage apiMessageFor(short apiKey) { return newApiMessage(apiKey); } }; + segmentTransform = new RemoteLogSegmentMetadataTransform(); + segmentUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform(); + partitionDeleteTransform = new RemotePartitionDeleteMetadataTransform(); + segmentSnapshotTransform = new RemoteLogSegmentMetadataSnapshotTransform(); } protected ApiMessage newApiMessage(short apiKey) { return MetadataRecordType.fromId(apiKey).newMetadataRecord(); } - protected final Map> createRemoteLogMetadataTransforms() { - Map> map = new HashMap<>(); - map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform()); - map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform()); - map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform()); - map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new RemoteLogSegmentMetadataSnapshotTransform()); - return map; - } - - protected final Map createRemoteLogStorageClassToApiKeyMap() { - Map map = new HashMap<>(); - map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); - map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); - map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); - map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); - return map; - } - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { - Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); - if (apiKey == null) { - throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); - } - @SuppressWarnings("unchecked") - ApiMessageAndVersion apiMessageAndVersion = ((RemoteLogMetadataTransform) remoteLogMetadataTransform(apiKey)).toApiMessageAndVersion(remoteLogMetadata); + ApiMessageAndVersion apiMessageAndVersion; + if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) { + apiMessageAndVersion = segmentTransform.toApiMessageAndVersion((RemoteLogSegmentMetadata) remoteLogMetadata); + } else if (remoteLogMetadata instanceof RemoteLogSegmentMetadataUpdate) { + apiMessageAndVersion = segmentUpdateTransform.toApiMessageAndVersion((RemoteLogSegmentMetadataUpdate) remoteLogMetadata); + } else if (remoteLogMetadata instanceof RemotePartitionDeleteMetadata) { + apiMessageAndVersion = partitionDeleteTransform.toApiMessageAndVersion((RemotePartitionDeleteMetadata) remoteLogMetadata); + } else if (remoteLogMetadata instanceof RemoteLogSegmentMetadataSnapshot) { + apiMessageAndVersion = segmentSnapshotTransform.toApiMessageAndVersion((RemoteLogSegmentMetadataSnapshot) remoteLogMetadata); + } else { + throw new IllegalArgumentException("RemoteLogMetadataTransform for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() + + " does not exist."); + } return bytesApiMessageSerde.serialize(apiMessageAndVersion); } @@ -99,16 +90,19 @@ public class RemoteLogMetadataSerde { public RemoteLogMetadata deserialize(byte[] data) { ApiMessageAndVersion apiMessageAndVersion = bytesApiMessageSerde.deserialize(data); - return remoteLogMetadataTransform(apiMessageAndVersion.message().apiKey()).fromApiMessageAndVersion(apiMessageAndVersion); - } - - private RemoteLogMetadataTransform remoteLogMetadataTransform(short apiKey) { - RemoteLogMetadataTransform metadataTransform = keyToTransform.get(apiKey); - if (metadataTransform == null) { + short apiKey = apiMessageAndVersion.message().apiKey(); + if (apiKey == REMOTE_LOG_SEGMENT_METADATA_API_KEY) { + return segmentTransform.fromApiMessageAndVersion(apiMessageAndVersion); + } else if (apiKey == REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY) { + return segmentUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion); + } else if (apiKey == REMOTE_PARTITION_DELETE_API_KEY) { + return partitionDeleteTransform.fromApiMessageAndVersion(apiMessageAndVersion); + } else if (apiKey == REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY) { + return segmentSnapshotTransform.fromApiMessageAndVersion(apiMessageAndVersion); + } else { throw new IllegalArgumentException("RemoteLogMetadataTransform for apikey: " + apiKey + " does not exist."); } - return metadataTransform; } public static class RemoteLogMetadataFormatter implements MessageFormatter {