MINOR: Refine `PartitionAssignor` server-side interface (#13524)

This patch updates the `PartitionAssignor` server-side interface used in the new group coordinator for the new consumer group protocol as follow:
- It switches subscription from topic names to topic ids in order to be closer to the server side implementation.
- It switches assignment from Set to Map<Integer, Set> to be closer to the server side implementation.
- It adds getters for all attributes.
- It makes all attributes final private.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Ritika Reddy 2023-04-14 05:22:51 -07:00 committed by GitHub
parent dc1ede8d89
commit f1e7a64bf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 58 deletions

View File

@ -16,11 +16,13 @@
*/ */
package org.apache.kafka.coordinator.group.assignor; package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
/** /**
* The assignment specification for a consumer group member. * The assignment specification for a consumer group member.
@ -29,58 +31,84 @@ public class AssignmentMemberSpec {
/** /**
* The instance ID if provided. * The instance ID if provided.
*/ */
final Optional<String> instanceId; private final Optional<String> instanceId;
/** /**
* The rack ID if provided. * The rack ID if provided.
*/ */
final Optional<String> rackId; private final Optional<String> rackId;
/** /**
* The topics that the member is subscribed to. * Topics Ids that the member is subscribed to.
*/ */
final Collection<String> subscribedTopics; private final Collection<Uuid> subscribedTopicIds;
/** /**
* The current target partitions of the member. * Partitions assigned keyed by topicId.
*/ */
final Collection<TopicPartition> targetPartitions; private final Map<Uuid, Set<Integer>> assignedPartitions;
/**
* @return The instance ID as an Optional.
*/
public Optional<String> instanceId() {
return instanceId;
}
/**
* @return The rack ID as an Optional.
*/
public Optional<String> rackId() {
return rackId;
}
/**
* @return Collection of subscribed topic Ids.
*/
public Collection<Uuid> subscribedTopicIds() {
return subscribedTopicIds;
}
/**
* @return Assigned partitions keyed by topic Ids.
*/
public Map<Uuid, Set<Integer>> assignedPartitions() {
return assignedPartitions;
}
public AssignmentMemberSpec( public AssignmentMemberSpec(
Optional<String> instanceId, Optional<String> instanceId,
Optional<String> rackId, Optional<String> rackId,
Collection<String> subscribedTopics, Collection<Uuid> subscribedTopicIds,
Collection<TopicPartition> targetPartitions Map<Uuid, Set<Integer>> assignedPartitions
) { ) {
Objects.requireNonNull(instanceId); Objects.requireNonNull(instanceId);
Objects.requireNonNull(rackId); Objects.requireNonNull(rackId);
Objects.requireNonNull(subscribedTopics); Objects.requireNonNull(subscribedTopicIds);
Objects.requireNonNull(targetPartitions); Objects.requireNonNull(assignedPartitions);
this.instanceId = instanceId; this.instanceId = instanceId;
this.rackId = rackId; this.rackId = rackId;
this.subscribedTopics = subscribedTopics; this.subscribedTopicIds = subscribedTopicIds;
this.targetPartitions = targetPartitions; this.assignedPartitions = assignedPartitions;
} }
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
AssignmentMemberSpec that = (AssignmentMemberSpec) o; AssignmentMemberSpec that = (AssignmentMemberSpec) o;
if (!instanceId.equals(that.instanceId)) return false; if (!instanceId.equals(that.instanceId)) return false;
if (!rackId.equals(that.rackId)) return false; if (!rackId.equals(that.rackId)) return false;
if (!subscribedTopics.equals(that.subscribedTopics)) return false; if (!subscribedTopicIds.equals(that.subscribedTopicIds)) return false;
return targetPartitions.equals(that.targetPartitions); return assignedPartitions.equals(that.assignedPartitions);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = instanceId.hashCode(); int result = instanceId.hashCode();
result = 31 * result + rackId.hashCode(); result = 31 * result + rackId.hashCode();
result = 31 * result + subscribedTopics.hashCode(); result = 31 * result + subscribedTopicIds.hashCode();
result = 31 * result + targetPartitions.hashCode(); result = 31 * result + assignedPartitions.hashCode();
return result; return result;
} }
@ -88,8 +116,8 @@ public class AssignmentMemberSpec {
public String toString() { public String toString() {
return "AssignmentMemberSpec(instanceId=" + instanceId + return "AssignmentMemberSpec(instanceId=" + instanceId +
", rackId=" + rackId + ", rackId=" + rackId +
", subscribedTopics=" + subscribedTopics + ", subscribedTopicIds=" + subscribedTopicIds +
", targetPartitions=" + targetPartitions + ", assignedPartitions=" + assignedPartitions +
')'; ')';
} }
} }

View File

@ -28,12 +28,12 @@ public class AssignmentSpec {
/** /**
* The members keyed by member id. * The members keyed by member id.
*/ */
final Map<String, AssignmentMemberSpec> members; private final Map<String, AssignmentMemberSpec> members;
/** /**
* The topics' metadata keyed by topic id * The topics' metadata keyed by topic id.
*/ */
final Map<Uuid, AssignmentTopicMetadata> topics; private final Map<Uuid, AssignmentTopicMetadata> topics;
public AssignmentSpec( public AssignmentSpec(
Map<String, AssignmentMemberSpec> members, Map<String, AssignmentMemberSpec> members,
@ -45,13 +45,25 @@ public class AssignmentSpec {
this.topics = topics; this.topics = topics;
} }
/**
* @return Member metadata keyed by member Ids.
*/
public Map<String, AssignmentMemberSpec> members() {
return members;
}
/**
* @return Topic metadata keyed by topic Ids.
*/
public Map<Uuid, AssignmentTopicMetadata> topics() {
return topics;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
AssignmentSpec that = (AssignmentSpec) o; AssignmentSpec that = (AssignmentSpec) o;
if (!members.equals(that.members)) return false; if (!members.equals(that.members)) return false;
return topics.equals(that.topics); return topics.equals(that.topics);
} }

View File

@ -16,53 +16,44 @@
*/ */
package org.apache.kafka.coordinator.group.assignor; package org.apache.kafka.coordinator.group.assignor;
import java.util.Objects;
/** /**
* Metadata of a topic. * Metadata of a topic.
*/ */
public class AssignmentTopicMetadata { public class AssignmentTopicMetadata {
/**
* The topic name.
*/
final String topicName;
/** /**
* The number of partitions. * The number of partitions.
*/ */
final int numPartitions; private final int numPartitions;
public AssignmentTopicMetadata( public AssignmentTopicMetadata(
String topicName,
int numPartitions int numPartitions
) { ) {
Objects.requireNonNull(topicName);
this.topicName = topicName;
this.numPartitions = numPartitions; this.numPartitions = numPartitions;
} }
/**
* @return The number of partitions present for the topic.
*/
public int numPartitions() {
return numPartitions;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
AssignmentTopicMetadata that = (AssignmentTopicMetadata) o; AssignmentTopicMetadata that = (AssignmentTopicMetadata) o;
return numPartitions == that.numPartitions;
if (numPartitions != that.numPartitions) return false;
return topicName.equals(that.topicName);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = topicName.hashCode(); return numPartitions;
result = 31 * result + numPartitions;
return result;
} }
@Override @Override
public String toString() { public String toString() {
return "AssignmentTopicMetadata(topicName=" + topicName + return "AssignmentTopicMetadata(numPartitions=" + numPartitions + ')';
", numPartitions=" + numPartitions +
')';
} }
} }

View File

@ -26,7 +26,7 @@ public class GroupAssignment {
/** /**
* The member assignments keyed by member id. * The member assignments keyed by member id.
*/ */
final Map<String, MemberAssignment> members; private final Map<String, MemberAssignment> members;
public GroupAssignment( public GroupAssignment(
Map<String, MemberAssignment> members Map<String, MemberAssignment> members
@ -35,13 +35,18 @@ public class GroupAssignment {
this.members = members; this.members = members;
} }
/**
* @return Member assignments keyed by member Ids.
*/
public Map<String, MemberAssignment> members() {
return members;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
GroupAssignment that = (GroupAssignment) o; GroupAssignment that = (GroupAssignment) o;
return members.equals(that.members); return members.equals(that.members);
} }

View File

@ -16,34 +16,38 @@
*/ */
package org.apache.kafka.coordinator.group.assignor; package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid;
import java.util.Collection; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
/** /**
* The partition assignment for a consumer group member. * The partition assignment for a consumer group member.
*/ */
public class MemberAssignment { public class MemberAssignment {
/** /**
* The target partitions assigned to this member. * The target partitions assigned to this member keyed by topicId.
*/ */
final Collection<TopicPartition> targetPartitions; private final Map<Uuid, Set<Integer>> targetPartitions;
public MemberAssignment( public MemberAssignment(Map<Uuid, Set<Integer>> targetPartitions) {
Collection<TopicPartition> targetPartitions
) {
Objects.requireNonNull(targetPartitions); Objects.requireNonNull(targetPartitions);
this.targetPartitions = targetPartitions; this.targetPartitions = targetPartitions;
} }
/**
* @return Target partitions keyed by topic Ids.
*/
public Map<Uuid, Set<Integer>> targetPartitions() {
return this.targetPartitions;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
MemberAssignment that = (MemberAssignment) o; MemberAssignment that = (MemberAssignment) o;
return targetPartitions.equals(that.targetPartitions); return targetPartitions.equals(that.targetPartitions);
} }