KAFKA-18698: Migrate suitable classes to records in server and server-common modules (#18783)

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
TengYao Chi 2025-02-05 18:00:11 +08:00 committed by GitHub
parent d830179375
commit aac62a32d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 54 additions and 198 deletions

View File

@ -21,11 +21,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public final class FinalizedFeatures {
private final MetadataVersion metadataVersion;
private final Map<String, Short> finalizedFeatures;
private final long finalizedFeaturesEpoch;
public record FinalizedFeatures(
MetadataVersion metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Collections.emptyMap(), -1);
}
@ -40,39 +40,4 @@ public final class FinalizedFeatures {
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
}
public MetadataVersion metadataVersion() {
return metadataVersion;
}
public Map<String, Short> finalizedFeatures() {
return finalizedFeatures;
}
public long finalizedFeaturesEpoch() {
return finalizedFeaturesEpoch;
}
@Override
public boolean equals(Object o) {
if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) return false;
FinalizedFeatures other = (FinalizedFeatures) o;
return metadataVersion == other.metadataVersion &&
finalizedFeatures.equals(other.finalizedFeatures) &&
finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
}
@Override
public int hashCode() {
return Objects.hash(metadataVersion, finalizedFeatures, finalizedFeaturesEpoch);
}
@Override
public String toString() {
return "Features" +
"(metadataVersion=" + metadataVersion +
", finalizedFeatures=" + finalizedFeatures +
", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
")";
}
}

View File

@ -18,45 +18,15 @@ package org.apache.kafka.server.common;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
/**
* Represents a partition using its unique topic Id and partition number.
* @param topicId Universally unique Id representing this topic partition.
* @param partitionId The partition Id.
*/
public final class TopicIdPartition {
private final Uuid topicId;
private final int partitionId;
public TopicIdPartition(Uuid topicId, int partitionId) {
this.topicId = topicId;
this.partitionId = partitionId;
}
/**
* @return Universally unique Id representing this topic partition.
*/
public Uuid topicId() {
return topicId;
}
/**
* @return The partition Id.
*/
public int partitionId() {
return partitionId;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TopicIdPartition other)) return false;
return other.topicId.equals(topicId) && other.partitionId == partitionId;
}
@Override
public int hashCode() {
return Objects.hash(topicId, partitionId);
}
public record TopicIdPartition(
Uuid topicId,
int partitionId
) {
@Override
public String toString() {
return topicId + ":" + partitionId;

View File

@ -89,14 +89,14 @@ public abstract class InterBrokerSendThread extends ShutdownableThread {
private void drainGeneratedRequests() {
generateRequests().forEach(request ->
unsentRequests.put(
request.destination,
request.destination(),
networkClient.newClientRequest(
request.destination.idString(),
request.request,
request.creationTimeMs,
request.destination().idString(),
request.request(),
request.creationTimeMs(),
true,
requestTimeoutMs,
request.handler
request.handler()
)
)
);

View File

@ -20,32 +20,9 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
public final class RequestAndCompletionHandler {
public final long creationTimeMs;
public final Node destination;
public final AbstractRequest.Builder<? extends AbstractRequest> request;
public final RequestCompletionHandler handler;
public RequestAndCompletionHandler(
long creationTimeMs,
Node destination,
AbstractRequest.Builder<? extends AbstractRequest> request,
RequestCompletionHandler handler
) {
this.creationTimeMs = creationTimeMs;
this.destination = destination;
this.request = request;
this.handler = handler;
}
@Override
public String toString() {
return "RequestAndCompletionHandler(" +
"creationTimeMs=" + creationTimeMs +
", destination=" + destination +
", request=" + request +
", handler=" + handler +
')';
}
}
public record RequestAndCompletionHandler(
long creationTimeMs,
Node destination,
AbstractRequest.Builder<? extends AbstractRequest> request,
RequestCompletionHandler handler
) { }

View File

@ -166,15 +166,15 @@ public class InterBrokerSendThreadTest {
final TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
final ClientRequest clientRequest =
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler);
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler());
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
same(handler.request),
same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
same(handler.handler)
same(handler.handler())
)).thenReturn(clientRequest);
when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
@ -187,11 +187,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
same(handler.request),
same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
same(handler.handler));
same(handler.handler()));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).send(same(clientRequest), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
@ -209,15 +209,15 @@ public class InterBrokerSendThreadTest {
final TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
final ClientRequest clientRequest =
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler);
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler());
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
same(handler.request),
same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
same(handler.handler)
same(handler.handler())
)).thenReturn(clientRequest);
when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
@ -236,11 +236,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
same(handler.request),
same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
same(handler.handler));
same(handler.handler()));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).connectionDelay(any(), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
@ -261,16 +261,16 @@ public class InterBrokerSendThreadTest {
final ClientRequest clientRequest =
new ClientRequest(
"dest", request, 0, "1", time.milliseconds(), true, requestTimeoutMs, handler.handler);
"dest", request, 0, "1", time.milliseconds(), true, requestTimeoutMs, handler.handler());
time.sleep(1500L);
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
same(handler.request),
ArgumentMatchers.eq(handler.creationTimeMs),
same(handler.request()),
ArgumentMatchers.eq(handler.creationTimeMs()),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
same(handler.handler)
same(handler.handler())
)).thenReturn(clientRequest);
// make the node unready so the request is not cleared
@ -289,11 +289,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
same(handler.request),
ArgumentMatchers.eq(handler.creationTimeMs),
same(handler.request()),
ArgumentMatchers.eq(handler.creationTimeMs()),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
same(handler.handler));
same(handler.handler()));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).connectionDelay(any(), anyLong());
verify(networkClient).poll(anyLong(), anyLong());

View File

@ -24,64 +24,25 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.Objects;
final class Assignment {
/**
* The topic ID and partition index of the replica.
*/
private final TopicIdPartition topicIdPartition;
/**
* The ID of the directory we are placing the replica into.
*/
private final Uuid directoryId;
/**
* The time in monotonic nanosecond when this assignment was created.
*/
private final long submissionTimeNs;
/**
* The callback to invoke on success.
*/
private final Runnable successCallback;
Assignment(
TopicIdPartition topicIdPartition,
Uuid directoryId,
long submissionTimeNs,
Runnable successCallback
) {
this.topicIdPartition = topicIdPartition;
this.directoryId = directoryId;
this.submissionTimeNs = submissionTimeNs;
this.successCallback = successCallback;
}
TopicIdPartition topicIdPartition() {
return topicIdPartition;
}
Uuid directoryId() {
return directoryId;
}
long submissionTimeNs() {
return submissionTimeNs;
}
Runnable successCallback() {
return successCallback;
}
/**
* @param topicIdPartition The topic ID and partition index of the replica.
* @param directoryId The ID of the directory we are placing the replica into.
* @param submissionTimeNs The time in monotonic nanosecond when this assignment was created.
* @param successCallback The callback to invoke on success.
*/
record Assignment(
TopicIdPartition topicIdPartition,
Uuid directoryId,
long submissionTimeNs,
Runnable successCallback
) {
/**
* Check if this Assignment is still valid to be sent.
*
* @param nodeId The broker ID.
* @param image The metadata image.
*
* @return True only if the Assignment is still valid.
* @param nodeId The broker ID.
* @param image The metadata image.
* @return True only if the Assignment is still valid.
*/
boolean valid(int nodeId, MetadataImage image) {
TopicImage topicImage = image.topics().getTopic(topicIdPartition.topicId());
@ -96,23 +57,6 @@ final class Assignment {
return Replicas.contains(partition.replicas, nodeId);
}
@Override
public boolean equals(Object o) {
if (o == null || (!(o instanceof Assignment other))) return false;
return topicIdPartition.equals(other.topicIdPartition) &&
directoryId.equals(other.directoryId) &&
submissionTimeNs == other.submissionTimeNs &&
successCallback.equals(other.successCallback);
}
@Override
public int hashCode() {
return Objects.hash(topicIdPartition,
directoryId,
submissionTimeNs,
successCallback);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();