diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index c33f0b281f4..4312445bae5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -38,6 +38,8 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment; import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; @@ -493,6 +495,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf ); } + private static void processStreamsPartitionAssignment(final Map clientMetadataMap, + final TaskAssignment taskAssignment) { + taskAssignment.assignment().forEach(kafkaStreamsAssignment -> { + final ProcessId processId = kafkaStreamsAssignment.processId(); + final ClientMetadata clientMetadata = clientMetadataMap.get(processId.id()); + clientMetadata.state.setAssignedTasks(kafkaStreamsAssignment); + }); + } + /** * Verify the subscription versions are within the expected bounds and check for version probing. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 8978628cc1f..1d9d8c47a4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import java.util.SortedMap; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; import org.apache.kafka.streams.processor.internals.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,8 @@ import static java.util.Collections.unmodifiableSet; import static java.util.Comparator.comparing; import static java.util.Comparator.comparingLong; import static org.apache.kafka.common.utils.Utils.union; +import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.ACTIVE; +import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.STANDBY; import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; public class ClientState { @@ -473,6 +476,17 @@ public class ClientState { return new TreeMap<>(consumerToPreviousStatefulTaskIds); } + public void setAssignedTasks(final KafkaStreamsAssignment assignment) { + final Set activeTasks = assignment.assignment().stream() + .filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id) + .collect(Collectors.toSet()); + final Set standbyTasks = assignment.assignment().stream() + .filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id) + .collect(Collectors.toSet()); + assignedActiveTasks.taskIds(activeTasks); + assignedStandbyTasks.taskIds(standbyTasks); + } + public String currentAssignment() { return "[activeTasks: (" + assignedActiveTasks.taskIds() + ") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";