KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure (#16002)

This PR creates the required methods to post-process the result of TaskAssignor.assign into the required ClientMetadata map. This allows most of the internal logic to remain intact after the user's assignment code runs.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Antoine Pourchet 2024-05-21 14:14:39 -06:00 committed by GitHub
parent 4cc99cbf3f
commit 6339e3a6bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 0 deletions

View File

@ -38,6 +38,8 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState; import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment;
import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl; import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
@ -493,6 +495,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
); );
} }
private static void processStreamsPartitionAssignment(final Map<UUID, ClientMetadata> clientMetadataMap,
final TaskAssignment taskAssignment) {
taskAssignment.assignment().forEach(kafkaStreamsAssignment -> {
final ProcessId processId = kafkaStreamsAssignment.processId();
final ClientMetadata clientMetadata = clientMetadataMap.get(processId.id());
clientMetadata.state.setAssignedTasks(kafkaStreamsAssignment);
});
}
/** /**
* Verify the subscription versions are within the expected bounds and check for version probing. * Verify the subscription versions are within the expected bounds and check for version probing.
* *

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
import java.util.SortedMap; import java.util.SortedMap;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -41,6 +42,8 @@ import static java.util.Collections.unmodifiableSet;
import static java.util.Comparator.comparing; import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingLong; import static java.util.Comparator.comparingLong;
import static org.apache.kafka.common.utils.Utils.union; import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
public class ClientState { public class ClientState {
@ -473,6 +476,17 @@ public class ClientState {
return new TreeMap<>(consumerToPreviousStatefulTaskIds); return new TreeMap<>(consumerToPreviousStatefulTaskIds);
} }
public void setAssignedTasks(final KafkaStreamsAssignment assignment) {
final Set<TaskId> activeTasks = assignment.assignment().stream()
.filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id)
.collect(Collectors.toSet());
final Set<TaskId> standbyTasks = assignment.assignment().stream()
.filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id)
.collect(Collectors.toSet());
assignedActiveTasks.taskIds(activeTasks);
assignedStandbyTasks.taskIds(standbyTasks);
}
public String currentAssignment() { public String currentAssignment() {
return "[activeTasks: (" + assignedActiveTasks.taskIds() + return "[activeTasks: (" + assignedActiveTasks.taskIds() +
") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]"; ") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";