KAFKA-19478 [1/N]: Precompute values in ProcessState (#20120)

This is a very mechanical and obvious change that is making most
accessors in ProcessState constant time O(1), instead of linear time
O(n), by computing the collections and aggregations at insertion time,
instead of every time the value is accessed.

Since the accessors are used in deeply nested loops, this reduces the
runtime of our worst case benchmarks by ~14x.

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-07-08 13:32:47 +02:00 committed by GitHub
parent dde0b8cd92
commit a88fd01e74
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 22 deletions

View File

@ -22,9 +22,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableSet;
import static org.apache.kafka.common.utils.Utils.union;
/** /**
* Represents the state of a process in the group coordinator. * Represents the state of a process in the group coordinator.
* This includes the capacity of the process, the load on the process, and the tasks assigned to the process. * This includes the capacity of the process, the load on the process, and the tasks assigned to the process.
@ -34,15 +31,18 @@ public class ProcessState {
// number of members // number of members
private int capacity; private int capacity;
private double load; private double load;
private int taskCount;
private int activeTaskCount;
private final Map<String, Integer> memberToTaskCounts; private final Map<String, Integer> memberToTaskCounts;
private final Map<String, Set<TaskId>> assignedActiveTasks; private final Map<String, Set<TaskId>> assignedActiveTasks;
private final Map<String, Set<TaskId>> assignedStandbyTasks; private final Map<String, Set<TaskId>> assignedStandbyTasks;
private final Set<TaskId> assignedTasks;
ProcessState(final String processId) { ProcessState(final String processId) {
this.processId = processId; this.processId = processId;
this.capacity = 0; this.capacity = 0;
this.load = Double.MAX_VALUE; this.load = Double.MAX_VALUE;
this.assignedTasks = new HashSet<>();
this.assignedActiveTasks = new HashMap<>(); this.assignedActiveTasks = new HashMap<>();
this.assignedStandbyTasks = new HashMap<>(); this.assignedStandbyTasks = new HashMap<>();
this.memberToTaskCounts = new HashMap<>(); this.memberToTaskCounts = new HashMap<>();
@ -57,10 +57,6 @@ public class ProcessState {
return capacity; return capacity;
} }
public int totalTaskCount() {
return assignedStandbyTasks().size() + assignedActiveTasks().size();
}
public double load() { public double load() {
return load; return load;
} }
@ -69,6 +65,10 @@ public class ProcessState {
return memberToTaskCounts; return memberToTaskCounts;
} }
public int activeTaskCount() {
return activeTaskCount;
}
public Set<TaskId> assignedActiveTasks() { public Set<TaskId> assignedActiveTasks() {
return assignedActiveTasks.values().stream() return assignedActiveTasks.values().stream()
.flatMap(Set::stream) .flatMap(Set::stream)
@ -90,7 +90,10 @@ public class ProcessState {
} }
public void addTask(final String memberId, final TaskId taskId, final boolean isActive) { public void addTask(final String memberId, final TaskId taskId, final boolean isActive) {
taskCount += 1;
assignedTasks.add(taskId);
if (isActive) { if (isActive) {
activeTaskCount += 1;
assignedActiveTasks.putIfAbsent(memberId, new HashSet<>()); assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
assignedActiveTasks.get(memberId).add(taskId); assignedActiveTasks.get(memberId).add(taskId);
} else { } else {
@ -110,7 +113,7 @@ public class ProcessState {
if (capacity <= 0) { if (capacity <= 0) {
this.load = -1; this.load = -1;
} else { } else {
this.load = (double) totalTaskCount() / capacity; this.load = (double) taskCount / capacity;
} }
} }
@ -120,7 +123,7 @@ public class ProcessState {
} }
public boolean hasCapacity() { public boolean hasCapacity() {
return totalTaskCount() < capacity; return this.load < 1.0;
} }
public int compareTo(final ProcessState other) { public int compareTo(final ProcessState other) {
@ -132,18 +135,10 @@ public class ProcessState {
} }
public boolean hasTask(final TaskId taskId) { public boolean hasTask(final TaskId taskId) {
return assignedActiveTasks().contains(taskId) || assignedStandbyTasks().contains(taskId); } return assignedTasks.contains(taskId);
}
Set<TaskId> assignedTasks() { Set<TaskId> assignedTasks() {
final Set<TaskId> assignedActiveTaskIds = assignedActiveTasks(); return assignedTasks;
final Set<TaskId> assignedStandbyTaskIds = assignedStandbyTasks();
return unmodifiableSet(
union(
() -> new HashSet<>(assignedActiveTaskIds.size() + assignedStandbyTaskIds.size()),
assignedActiveTaskIds,
assignedStandbyTaskIds
)
);
} }
} }

View File

@ -329,7 +329,7 @@ public class StickyTaskAssignor implements TaskAssignor {
if (isActive) { if (isActive) {
// update task per process // update task per process
maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).assignedActiveTasks().size()); maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount());
} }
} }