MINOR: Small refactor in TargetAssignmentBuilder (#16174)

This patch is a small refactoring which mainly aims at avoid to construct a copy of the new target assignment in the TargetAssignmentBuilder because the copy is not used by the caller. The change relies on the exiting tests and it does not really have an impact on performance (e.g. validated with TargetAssignmentBuilderBenchmark).

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-06-03 20:32:39 +02:00 committed by GitHub
parent 5cbc1d616a
commit 979f8d9aa3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 50 additions and 57 deletions

View File

@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.Assignment;
@ -1904,24 +1905,28 @@ public class GroupMetadataManager {
.withInvertedTargetAssignment(group.invertedTargetAssignment()) .withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics()) .withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember); .addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
// A new static member is replacing an older one with the same subscriptions.
// We just need to remove the older member and add the newer one. The new member should
// reuse the target assignment of the older member.
if (staticMemberReplaced) { if (staticMemberReplaced) {
assignmentResult = assignmentResultBuilder // A new static member is replacing an older one with the same subscriptions.
.removeMember(member.memberId()) // We just need to remove the older member and add the newer one. The new member should
.build(); // reuse the target assignment of the older member.
} else { assignmentResultBuilder.removeMember(member.memberId());
assignmentResult = assignmentResultBuilder
.build();
} }
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
assignmentResultBuilder.build();
log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.",
group.groupId(), groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); group.groupId(), groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment());
records.addAll(assignmentResult.records()); records.addAll(assignmentResult.records());
return assignmentResult.targetAssignment().get(updatedMember.memberId());
MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.targetPartitions());
} else {
return Assignment.EMPTY;
}
} catch (PartitionAssignorException ex) { } catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
groupEpoch, ex.getMessage()); groupEpoch, ex.getMessage());

View File

@ -64,11 +64,11 @@ public class TargetAssignmentBuilder {
/** /**
* The new target assignment for the group. * The new target assignment for the group.
*/ */
private final Map<String, Assignment> targetAssignment; private final Map<String, MemberAssignment> targetAssignment;
TargetAssignmentResult( TargetAssignmentResult(
List<CoordinatorRecord> records, List<CoordinatorRecord> records,
Map<String, Assignment> targetAssignment Map<String, MemberAssignment> targetAssignment
) { ) {
Objects.requireNonNull(records); Objects.requireNonNull(records);
Objects.requireNonNull(targetAssignment); Objects.requireNonNull(targetAssignment);
@ -86,7 +86,7 @@ public class TargetAssignmentBuilder {
/** /**
* @return The target assignment. * @return The target assignment.
*/ */
public Map<String, Assignment> targetAssignment() { public Map<String, MemberAssignment> targetAssignment() {
return targetAssignment; return targetAssignment;
} }
} }
@ -347,38 +347,26 @@ public class TargetAssignmentBuilder {
// Compute delta from previous to new target assignment and create the // Compute delta from previous to new target assignment and create the
// relevant records. // relevant records.
List<CoordinatorRecord> records = new ArrayList<>(); List<CoordinatorRecord> records = new ArrayList<>();
Map<String, Assignment> newTargetAssignment = new HashMap<>();
memberSpecs.keySet().forEach(memberId -> { for (String memberId : memberSpecs.keySet()) {
Assignment oldMemberAssignment = targetAssignment.get(memberId); Assignment oldMemberAssignment = targetAssignment.get(memberId);
Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId); Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
newTargetAssignment.put(memberId, newMemberAssignment); if (!newMemberAssignment.equals(oldMemberAssignment)) {
// If the member had no assignment or had a different assignment, we
if (oldMemberAssignment == null) { // create a record for the new assignment.
// If the member had no assignment, we always create a record for it.
records.add(newTargetAssignmentRecord( records.add(newTargetAssignmentRecord(
groupId, groupId,
memberId, memberId,
newMemberAssignment.partitions() newMemberAssignment.partitions()
)); ));
} else {
// If the member had an assignment, we only create a record if the
// new assignment is different.
if (!newMemberAssignment.equals(oldMemberAssignment)) {
records.add(newTargetAssignmentRecord(
groupId,
memberId,
newMemberAssignment.partitions()
));
}
} }
}); }
// Bump the target assignment epoch. // Bump the target assignment epoch.
records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch)); records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
return new TargetAssignmentResult(records, newTargetAssignment); return new TargetAssignmentResult(records, newGroupAssignment.members());
} }
private Assignment newMemberAssignment( private Assignment newMemberAssignment(

View File

@ -337,12 +337,12 @@ public class TargetAssignmentBuilderTest {
20 20
)), result.records()); )), result.records());
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3) mkTopicAssignment(barTopicId, 1, 2, 3)
))); )));
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6) mkTopicAssignment(barTopicId, 4, 5, 6)
))); )));
@ -400,12 +400,12 @@ public class TargetAssignmentBuilderTest {
20 20
), result.records().get(2)); ), result.records().get(2));
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3) mkTopicAssignment(barTopicId, 1, 2, 3)
))); )));
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6) mkTopicAssignment(barTopicId, 4, 5, 6)
))); )));
@ -474,16 +474,16 @@ public class TargetAssignmentBuilderTest {
20 20
), result.records().get(3)); ), result.records().get(3));
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2) mkTopicAssignment(barTopicId, 1, 2)
))); )));
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4), mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4) mkTopicAssignment(barTopicId, 3, 4)
))); )));
expectedAssignment.put("member-3", new Assignment(mkAssignment( expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6) mkTopicAssignment(barTopicId, 5, 6)
))); )));
@ -561,16 +561,16 @@ public class TargetAssignmentBuilderTest {
20 20
), result.records().get(3)); ), result.records().get(3));
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2) mkTopicAssignment(barTopicId, 1, 2)
))); )));
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4), mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4) mkTopicAssignment(barTopicId, 3, 4)
))); )));
expectedAssignment.put("member-3", new Assignment(mkAssignment( expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6) mkTopicAssignment(barTopicId, 5, 6)
))); )));
@ -639,16 +639,16 @@ public class TargetAssignmentBuilderTest {
20 20
), result.records().get(2)); ), result.records().get(2));
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2) mkTopicAssignment(barTopicId, 1, 2)
))); )));
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 3, 4, 5) mkTopicAssignment(barTopicId, 3, 4, 5)
))); )));
expectedAssignment.put("member-3", new Assignment(mkAssignment( expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 6), mkTopicAssignment(fooTopicId, 6),
mkTopicAssignment(barTopicId, 6) mkTopicAssignment(barTopicId, 6)
))); )));
@ -713,12 +713,12 @@ public class TargetAssignmentBuilderTest {
20 20
), result.records().get(2)); ), result.records().get(2));
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3) mkTopicAssignment(barTopicId, 1, 2, 3)
))); )));
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6) mkTopicAssignment(barTopicId, 4, 5, 6)
))); )));
@ -788,17 +788,17 @@ public class TargetAssignmentBuilderTest {
20 20
), result.records().get(1)); ), result.records().get(1));
Map<String, Assignment> expectedAssignment = new HashMap<>(); Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new Assignment(mkAssignment( expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2) mkTopicAssignment(barTopicId, 1, 2)
))); )));
expectedAssignment.put("member-2", new Assignment(mkAssignment( expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4), mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4) mkTopicAssignment(barTopicId, 3, 4)
))); )));
expectedAssignment.put("member-3-a", new Assignment(mkAssignment( expectedAssignment.put("member-3-a", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6) mkTopicAssignment(barTopicId, 5, 6)
))); )));