mirror of https://github.com/apache/kafka.git
KAFKA-16356: Remove class-name dispatch in RemoteLogMetadataSerde (#15620)
Reviewers: Greg Harris <greg.harris@aiven.io>, Luke Chen <showuon@gmail.com>, Igor Soarez <soarez@apple.com>, The-Gamer-01 <19974361760@163.com>
This commit is contained in:
parent
0b4eaefd86
commit
aeca384641
|
@ -272,7 +272,7 @@ subprojects {
|
|||
options.encoding = 'UTF-8'
|
||||
options.compilerArgs << "-Xlint:all"
|
||||
// temporary exclusions until all the warnings are fixed
|
||||
if (!project.path.startsWith(":connect"))
|
||||
if (!project.path.startsWith(":connect") && !project.path.startsWith(":storage"))
|
||||
options.compilerArgs << "-Xlint:-rawtypes"
|
||||
options.compilerArgs << "-Xlint:-serial"
|
||||
options.compilerArgs << "-Xlint:-try"
|
||||
|
|
|
@ -33,8 +33,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate
|
|||
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde
|
||||
|
@ -46,52 +44,45 @@ public class RemoteLogMetadataSerde {
|
|||
private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey();
|
||||
private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
|
||||
|
||||
private final Map<String, Short> remoteLogStorageClassToApiKey;
|
||||
private final Map<Short, RemoteLogMetadataTransform<? extends RemoteLogMetadata>> keyToTransform;
|
||||
private final BytesApiMessageSerde bytesApiMessageSerde;
|
||||
|
||||
private final RemoteLogSegmentMetadataTransform segmentTransform;
|
||||
private final RemoteLogSegmentMetadataUpdateTransform segmentUpdateTransform;
|
||||
private final RemotePartitionDeleteMetadataTransform partitionDeleteTransform;
|
||||
private final RemoteLogSegmentMetadataSnapshotTransform segmentSnapshotTransform;
|
||||
|
||||
public RemoteLogMetadataSerde() {
|
||||
remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap();
|
||||
keyToTransform = createRemoteLogMetadataTransforms();
|
||||
bytesApiMessageSerde = new BytesApiMessageSerde() {
|
||||
@Override
|
||||
public ApiMessage apiMessageFor(short apiKey) {
|
||||
return newApiMessage(apiKey);
|
||||
}
|
||||
};
|
||||
segmentTransform = new RemoteLogSegmentMetadataTransform();
|
||||
segmentUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform();
|
||||
partitionDeleteTransform = new RemotePartitionDeleteMetadataTransform();
|
||||
segmentSnapshotTransform = new RemoteLogSegmentMetadataSnapshotTransform();
|
||||
}
|
||||
|
||||
protected ApiMessage newApiMessage(short apiKey) {
|
||||
return MetadataRecordType.fromId(apiKey).newMetadataRecord();
|
||||
}
|
||||
|
||||
protected final Map<Short, RemoteLogMetadataTransform<? extends RemoteLogMetadata>> createRemoteLogMetadataTransforms() {
|
||||
Map<Short, RemoteLogMetadataTransform<? extends RemoteLogMetadata>> map = new HashMap<>();
|
||||
map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform());
|
||||
map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform());
|
||||
map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform());
|
||||
map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new RemoteLogSegmentMetadataSnapshotTransform());
|
||||
return map;
|
||||
}
|
||||
|
||||
protected final Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
|
||||
Map<String, Short> map = new HashMap<>();
|
||||
map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY);
|
||||
map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
|
||||
map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY);
|
||||
map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
|
||||
return map;
|
||||
}
|
||||
|
||||
public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
|
||||
Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
|
||||
if (apiKey == null) {
|
||||
throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
|
||||
+ " does not exist.");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ApiMessageAndVersion apiMessageAndVersion = ((RemoteLogMetadataTransform<RemoteLogMetadata>) remoteLogMetadataTransform(apiKey)).toApiMessageAndVersion(remoteLogMetadata);
|
||||
ApiMessageAndVersion apiMessageAndVersion;
|
||||
if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) {
|
||||
apiMessageAndVersion = segmentTransform.toApiMessageAndVersion((RemoteLogSegmentMetadata) remoteLogMetadata);
|
||||
} else if (remoteLogMetadata instanceof RemoteLogSegmentMetadataUpdate) {
|
||||
apiMessageAndVersion = segmentUpdateTransform.toApiMessageAndVersion((RemoteLogSegmentMetadataUpdate) remoteLogMetadata);
|
||||
} else if (remoteLogMetadata instanceof RemotePartitionDeleteMetadata) {
|
||||
apiMessageAndVersion = partitionDeleteTransform.toApiMessageAndVersion((RemotePartitionDeleteMetadata) remoteLogMetadata);
|
||||
} else if (remoteLogMetadata instanceof RemoteLogSegmentMetadataSnapshot) {
|
||||
apiMessageAndVersion = segmentSnapshotTransform.toApiMessageAndVersion((RemoteLogSegmentMetadataSnapshot) remoteLogMetadata);
|
||||
} else {
|
||||
throw new IllegalArgumentException("RemoteLogMetadataTransform for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
|
||||
+ " does not exist.");
|
||||
}
|
||||
|
||||
return bytesApiMessageSerde.serialize(apiMessageAndVersion);
|
||||
}
|
||||
|
@ -99,16 +90,19 @@ public class RemoteLogMetadataSerde {
|
|||
public RemoteLogMetadata deserialize(byte[] data) {
|
||||
ApiMessageAndVersion apiMessageAndVersion = bytesApiMessageSerde.deserialize(data);
|
||||
|
||||
return remoteLogMetadataTransform(apiMessageAndVersion.message().apiKey()).fromApiMessageAndVersion(apiMessageAndVersion);
|
||||
}
|
||||
|
||||
private RemoteLogMetadataTransform<? extends RemoteLogMetadata> remoteLogMetadataTransform(short apiKey) {
|
||||
RemoteLogMetadataTransform<? extends RemoteLogMetadata> metadataTransform = keyToTransform.get(apiKey);
|
||||
if (metadataTransform == null) {
|
||||
short apiKey = apiMessageAndVersion.message().apiKey();
|
||||
if (apiKey == REMOTE_LOG_SEGMENT_METADATA_API_KEY) {
|
||||
return segmentTransform.fromApiMessageAndVersion(apiMessageAndVersion);
|
||||
} else if (apiKey == REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY) {
|
||||
return segmentUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
|
||||
} else if (apiKey == REMOTE_PARTITION_DELETE_API_KEY) {
|
||||
return partitionDeleteTransform.fromApiMessageAndVersion(apiMessageAndVersion);
|
||||
} else if (apiKey == REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY) {
|
||||
return segmentSnapshotTransform.fromApiMessageAndVersion(apiMessageAndVersion);
|
||||
} else {
|
||||
throw new IllegalArgumentException("RemoteLogMetadataTransform for apikey: " + apiKey + " does not exist.");
|
||||
}
|
||||
|
||||
return metadataTransform;
|
||||
}
|
||||
|
||||
public static class RemoteLogMetadataFormatter implements MessageFormatter {
|
||||
|
|
Loading…
Reference in New Issue