From 624dd458099fa93b3fa1e1715b58bbc6d8689857 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 8 Jan 2025 17:26:41 +0100 Subject: [PATCH] KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes (#18276) * KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes This commit adds the classes to represent a Streams group member in the consumer coordinator. Reviewers: Bill Bejeck , Lucas Brutschy --- .../coordinator/group/streams/Assignment.java | 88 ++++ .../group/streams/MemberState.java | 74 +++ .../group/streams/StreamsGroupMember.java | 463 ++++++++++++++++++ .../group/streams/AssignmentTest.java | 122 +++++ .../group/streams/StreamsGroupMemberTest.java | 429 ++++++++++++++++ .../group/streams/TaskAssignmentTestUtil.java | 57 +++ 6 files changed, 1233 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java new file mode 100644 index 00000000000..da377d19ccd --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An immutable assignment for a member. + * + * @param activeTasks Active tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param standbyTasks Standby tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param warmupTasks Warm-up tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + */ +public record Assignment(Map> activeTasks, + Map> standbyTasks, + Map> warmupTasks) { + + public Assignment { + activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); + standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); + warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); + } + + /** + * An empty assignment. + */ + public static final Assignment EMPTY = new Assignment( + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + /** + * Creates a {{@link org.apache.kafka.coordinator.group.streams.Assignment}} from a + * {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}. + * + * @param record The record. + * @return A {{@link org.apache.kafka.coordinator.group.streams.Assignment}}. + */ + public static Assignment fromRecord( + StreamsGroupTargetAssignmentMemberValue record + ) { + return new Assignment( + record.activeTasks().stream() + .collect(Collectors.toMap( + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, + taskId -> new HashSet<>(taskId.partitions()) + ) + ), + record.standbyTasks().stream() + .collect(Collectors.toMap( + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, + taskId -> new HashSet<>(taskId.partitions()) + ) + ), + record.warmupTasks().stream() + .collect(Collectors.toMap( + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, + taskId -> new HashSet<>(taskId.partitions()) + ) + ) + ); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java new file mode 100644 index 00000000000..71914da48b2 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import java.util.HashMap; +import java.util.Map; + +/** + * The various states that a member can be in. For their definition, refer to the documentation of + * {{@link org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}. + */ +public enum MemberState { + + /** + * The member is fully reconciled with the desired target assignment. + */ + STABLE((byte) 1), + + /** + * The member must revoke some tasks in order to be able to transition to the next epoch. + */ + UNREVOKED_TASKS((byte) 2), + + /** + * The member transitioned to the last epoch but waits on some tasks which have not been revoked by their previous owners yet. + */ + UNRELEASED_TASKS((byte) 3), + + /** + * The member is in an unknown state. This can only happen if a future version of the software introduces a new state unknown by this + * version. + */ + UNKNOWN((byte) 127); + + private static final Map VALUES_TO_ENUMS = new HashMap<>(); + + static { + for (MemberState state : MemberState.values()) { + VALUES_TO_ENUMS.put(state.value(), state); + } + } + + private final byte value; + + MemberState(byte value) { + this.value = value; + } + + public byte value() { + return value; + } + + public static MemberState fromValue(byte value) { + MemberState state = VALUES_TO_ENUMS.get(value); + if (state == null) { + return UNKNOWN; + } + return state; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java new file mode 100644 index 00000000000..e23df3f5701 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Contains all information related to a member within a Streams group. + *

+ * This class is immutable and is fully backed by records stored in the __consumer_offsets topic. + * + * @param memberId The ID of the member. + * @param memberEpoch The current epoch of the member. + * @param previousMemberEpoch The previous epoch of the member. + * @param state The current state of the member. + * @param instanceId The instance ID of the member. + * @param rackId The rack ID of the member. + * @param clientId The client ID of the member. + * @param clientHost The host of the member. + * @param rebalanceTimeoutMs The rebalance timeout in milliseconds. + * @param topologyEpoch The epoch of the topology the member uses. + * @param processId The ID of the Streams client that contains the member. + * @param userEndpoint The user endpoint exposed for Interactive Queries by the Streams client that + * contains the member. + * @param clientTags Tags of the client of the member used for rack-aware assignment. + * @param assignedActiveTasks Active tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param assignedStandbyTasks Standby tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param assignedWarmupTasks Warm-up tasks assigned to the member. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param activeTasksPendingRevocation Active tasks assigned to the member pending revocation. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param standbyTasksPendingRevocation Standby tasks assigned to the member pending revocation. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param warmupTasksPendingRevocation Warm-up tasks assigned to the member pending revocation. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + */ +@SuppressWarnings("checkstyle:JavaNCSS") +public record StreamsGroupMember(String memberId, + Integer memberEpoch, + Integer previousMemberEpoch, + MemberState state, + Optional instanceId, + Optional rackId, + String clientId, + String clientHost, + Integer rebalanceTimeoutMs, + Integer topologyEpoch, + String processId, + Optional userEndpoint, + Map clientTags, + Map> assignedActiveTasks, + Map> assignedStandbyTasks, + Map> assignedWarmupTasks, + Map> activeTasksPendingRevocation, + Map> standbyTasksPendingRevocation, + Map> warmupTasksPendingRevocation) { + + public StreamsGroupMember { + Objects.requireNonNull(memberId, "memberId cannot be null"); + clientTags = clientTags != null ? Collections.unmodifiableMap(clientTags) : null; + assignedActiveTasks = assignedActiveTasks != null ? Collections.unmodifiableMap(assignedActiveTasks) : null; + assignedStandbyTasks = assignedStandbyTasks != null ? Collections.unmodifiableMap(assignedStandbyTasks) : null; + assignedWarmupTasks = assignedWarmupTasks != null ? Collections.unmodifiableMap(assignedWarmupTasks) : null; + activeTasksPendingRevocation = activeTasksPendingRevocation != null ? Collections.unmodifiableMap(activeTasksPendingRevocation) : null; + standbyTasksPendingRevocation = standbyTasksPendingRevocation != null ? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null; + warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ? Collections.unmodifiableMap(warmupTasksPendingRevocation) : null; + } + + /** + * A builder that facilitates the creation of a new member or the update of an existing one. + *

+ * Please refer to the javadoc of {{@link StreamsGroupMember}} for the definition of the fields. + */ + public static class Builder { + + private final String memberId; + private Integer memberEpoch = null; + private Integer previousMemberEpoch = null; + private MemberState state = null; + private Optional instanceId = null; + private Optional rackId = null; + private Integer rebalanceTimeoutMs = null; + private String clientId = null; + private String clientHost = null; + private Integer topologyEpoch = null; + private String processId = null; + private Optional userEndpoint = null; + private Map clientTags = null; + private Map> assignedActiveTasks = null; + private Map> assignedStandbyTasks = null; + private Map> assignedWarmupTasks = null; + private Map> activeTasksPendingRevocation = null; + private Map> standbyTasksPendingRevocation = null; + private Map> warmupTasksPendingRevocation = null; + + public Builder(String memberId) { + this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null"); + } + + public Builder(StreamsGroupMember member) { + Objects.requireNonNull(member, "member cannot be null"); + + this.memberId = member.memberId; + this.memberEpoch = member.memberEpoch; + this.previousMemberEpoch = member.previousMemberEpoch; + this.instanceId = member.instanceId; + this.rackId = member.rackId; + this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; + this.clientId = member.clientId; + this.clientHost = member.clientHost; + this.topologyEpoch = member.topologyEpoch; + this.processId = member.processId; + this.userEndpoint = member.userEndpoint; + this.clientTags = member.clientTags; + this.state = member.state; + this.assignedActiveTasks = member.assignedActiveTasks; + this.assignedStandbyTasks = member.assignedStandbyTasks; + this.assignedWarmupTasks = member.assignedWarmupTasks; + this.activeTasksPendingRevocation = member.activeTasksPendingRevocation; + this.standbyTasksPendingRevocation = member.standbyTasksPendingRevocation; + this.warmupTasksPendingRevocation = member.warmupTasksPendingRevocation; + } + + public Builder updateMemberEpoch(int memberEpoch) { + int currentMemberEpoch = this.memberEpoch; + this.memberEpoch = memberEpoch; + this.previousMemberEpoch = currentMemberEpoch; + return this; + } + + public Builder setMemberEpoch(int memberEpoch) { + this.memberEpoch = memberEpoch; + return this; + } + + public Builder setPreviousMemberEpoch(int previousMemberEpoch) { + this.previousMemberEpoch = previousMemberEpoch; + return this; + } + + public Builder setInstanceId(String instanceId) { + this.instanceId = Optional.ofNullable(instanceId); + return this; + } + + public Builder maybeUpdateInstanceId(Optional instanceId) { + instanceId.ifPresent(this::setInstanceId); + return this; + } + + public Builder setRackId(String rackId) { + this.rackId = Optional.ofNullable(rackId); + return this; + } + + public Builder maybeUpdateRackId(Optional rackId) { + rackId.ifPresent(this::setRackId); + return this; + } + + public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) { + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + return this; + } + + public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt rebalanceTimeoutMs) { + this.rebalanceTimeoutMs = rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs); + return this; + } + + public Builder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder setClientHost(String clientHost) { + this.clientHost = clientHost; + return this; + } + + public Builder setState(MemberState state) { + this.state = state; + return this; + } + + public Builder setTopologyEpoch(int topologyEpoch) { + this.topologyEpoch = topologyEpoch; + return this; + } + + public Builder maybeUpdateTopologyEpoch(OptionalInt topologyEpoch) { + this.topologyEpoch = topologyEpoch.orElse(this.topologyEpoch); + return this; + } + + public Builder setProcessId(String processId) { + this.processId = processId; + return this; + } + + public Builder maybeUpdateProcessId(Optional processId) { + this.processId = processId.orElse(this.processId); + return this; + } + + public Builder setUserEndpoint(StreamsGroupMemberMetadataValue.Endpoint userEndpoint) { + this.userEndpoint = Optional.ofNullable(userEndpoint); + return this; + } + + public Builder maybeUpdateUserEndpoint(Optional userEndpoint) { + userEndpoint.ifPresent(this::setUserEndpoint); + return this; + } + + public Builder setClientTags(Map clientTags) { + this.clientTags = clientTags; + return this; + } + + public Builder maybeUpdateClientTags(Optional> clientTags) { + this.clientTags = clientTags.orElse(this.clientTags); + return this; + } + + public Builder setAssignment(Assignment assignment) { + this.assignedActiveTasks = assignment.activeTasks(); + this.assignedStandbyTasks = assignment.standbyTasks(); + this.assignedWarmupTasks = assignment.warmupTasks(); + return this; + } + + public Builder setAssignedActiveTasks(Map> assignedActiveTasks) { + this.assignedActiveTasks = assignedActiveTasks; + return this; + } + + public Builder setAssignedStandbyTasks(Map> assignedStandbyTasks) { + this.assignedStandbyTasks = assignedStandbyTasks; + return this; + } + + public Builder setAssignedWarmupTasks(Map> assignedWarmupTasks) { + this.assignedWarmupTasks = assignedWarmupTasks; + return this; + } + + public Builder setAssignmentPendingRevocation(Assignment assignment) { + this.activeTasksPendingRevocation = assignment.activeTasks(); + this.standbyTasksPendingRevocation = assignment.standbyTasks(); + this.warmupTasksPendingRevocation = assignment.warmupTasks(); + return this; + } + + public Builder setActiveTasksPendingRevocation( + Map> activeTasksPendingRevocation) { + this.activeTasksPendingRevocation = activeTasksPendingRevocation; + return this; + } + + public Builder setStandbyTasksPendingRevocation( + Map> standbyTasksPendingRevocation) { + this.standbyTasksPendingRevocation = standbyTasksPendingRevocation; + return this; + } + + public Builder setWarmupTasksPendingRevocation( + Map> warmupTasksPendingRevocation) { + this.warmupTasksPendingRevocation = warmupTasksPendingRevocation; + return this; + } + + public Builder updateWith(StreamsGroupMemberMetadataValue record) { + setInstanceId(record.instanceId()); + setRackId(record.rackId()); + setClientId(record.clientId()); + setClientHost(record.clientHost()); + setRebalanceTimeoutMs(record.rebalanceTimeoutMs()); + setTopologyEpoch(record.topologyEpoch()); + setProcessId(record.processId()); + setUserEndpoint(record.userEndpoint()); + setClientTags(record.clientTags().stream().collect(Collectors.toMap( + StreamsGroupMemberMetadataValue.KeyValue::key, + StreamsGroupMemberMetadataValue.KeyValue::value + ))); + return this; + } + + public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue record) { + setMemberEpoch(record.memberEpoch()); + setPreviousMemberEpoch(record.previousMemberEpoch()); + setState(MemberState.fromValue(record.state())); + setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks())); + setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks())); + setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks())); + setActiveTasksPendingRevocation( + assignmentFromTaskIds(record.activeTasksPendingRevocation())); + setStandbyTasksPendingRevocation( + assignmentFromTaskIds(record.standbyTasksPendingRevocation())); + setWarmupTasksPendingRevocation( + assignmentFromTaskIds(record.warmupTasksPendingRevocation())); + return this; + } + + private static Map> assignmentFromTaskIds( + List topicPartitionsList + ) { + return topicPartitionsList.stream().collect(Collectors.toMap( + StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId, + taskIds -> Set.copyOf(taskIds.partitions()))); + } + + public StreamsGroupMember build() { + return new StreamsGroupMember( + memberId, + memberEpoch, + previousMemberEpoch, + state, + instanceId, + rackId, + clientId, + clientHost, + rebalanceTimeoutMs, + topologyEpoch, + processId, + userEndpoint, + clientTags, + assignedActiveTasks, + assignedStandbyTasks, + assignedWarmupTasks, + activeTasksPendingRevocation, + standbyTasksPendingRevocation, + warmupTasksPendingRevocation + ); + } + } + + /** + * @return True if the member is in the Stable state and at the desired epoch. + */ + public boolean isReconciledTo(int targetAssignmentEpoch) { + return state == MemberState.STABLE && memberEpoch == targetAssignmentEpoch; + } + + /** + * Creates a member description for the Streams group describe response from this member. + * + * @param targetAssignment The target assignment of this member in the corresponding group. + * + * @return The StreamsGroupMember mapped as StreamsGroupDescribeResponseData.Member. + */ + public StreamsGroupDescribeResponseData.Member asStreamsGroupDescribeMember( + Assignment targetAssignment + ) { + final StreamsGroupDescribeResponseData.Assignment describedTargetAssignment = + new StreamsGroupDescribeResponseData.Assignment(); + + if (targetAssignment != null) { + describedTargetAssignment + .setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks())) + .setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks())) + .setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks())); + } + + return new StreamsGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(memberId) + .setAssignment( + new StreamsGroupDescribeResponseData.Assignment() + .setActiveTasks(taskIdsFromMap(assignedActiveTasks)) + .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks)) + .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks))) + .setTargetAssignment(describedTargetAssignment) + .setClientHost(clientHost) + .setClientId(clientId) + .setInstanceId(instanceId.orElse(null)) + .setRackId(rackId.orElse(null)) + .setClientTags(clientTags.entrySet().stream().map( + entry -> new StreamsGroupDescribeResponseData.KeyValue() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + ).collect(Collectors.toList())) + .setProcessId(processId) + .setTopologyEpoch(topologyEpoch) + .setUserEndpoint( + userEndpoint.map( + endpoint -> new StreamsGroupDescribeResponseData.Endpoint() + .setHost(endpoint.host()) + .setPort(endpoint.port()) + ).orElse(null) + ); + } + + private static List taskIdsFromMap( + Map> tasks + ) { + List taskIds = new ArrayList<>(); + tasks.forEach((subtopologyId, partitionSet) -> { + taskIds.add(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(new ArrayList<>(partitionSet))); + }); + return taskIds; + } + + /** + * @return True if the two provided members have different assigned active tasks. + */ + public static boolean hasAssignedActiveTasksChanged( + StreamsGroupMember member1, + StreamsGroupMember member2 + ) { + return !member1.assignedActiveTasks().equals(member2.assignedActiveTasks()); + } + + /** + * @return True if the two provided members have different assigned active tasks. + */ + public static boolean hasAssignedStandbyTasksChanged( + StreamsGroupMember member1, + StreamsGroupMember member2 + ) { + return !member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks()); + } + + /** + * @return True if the two provided members have different assigned active tasks. + */ + public static boolean hasAssignedWarmupTasksChanged( + StreamsGroupMember member1, + StreamsGroupMember member2 + ) { + return !member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java new file mode 100644 index 00000000000..7c0baf27364 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AssignmentTest { + + static final String SUBTOPOLOGY_1 = "subtopology1"; + static final String SUBTOPOLOGY_2 = "subtopology2"; + static final String SUBTOPOLOGY_3 = "subtopology3"; + + @Test + public void testTasksCannotBeNull() { + assertThrows(NullPointerException.class, () -> new Assignment(null, Collections.emptyMap(), Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), null, Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), Collections.emptyMap(), null)); + } + + @Test + public void testReturnUnmodifiableTaskAssignments() { + Map> activeTasks = mkTasksPerSubtopology( + mkTasks(SUBTOPOLOGY_1, 1, 2, 3) + ); + Map> standbyTasks = mkTasksPerSubtopology( + mkTasks(SUBTOPOLOGY_2, 9, 8, 7) + ); + Map> warmupTasks = mkTasksPerSubtopology( + mkTasks(SUBTOPOLOGY_3, 4, 5, 6) + ); + Assignment assignment = new Assignment(activeTasks, standbyTasks, warmupTasks); + + assertEquals(activeTasks, assignment.activeTasks()); + assertThrows(UnsupportedOperationException.class, () -> assignment.activeTasks().put("not allowed", Collections.emptySet())); + assertEquals(standbyTasks, assignment.standbyTasks()); + assertThrows(UnsupportedOperationException.class, () -> assignment.standbyTasks().put("not allowed", Collections.emptySet())); + assertEquals(warmupTasks, assignment.warmupTasks()); + assertThrows(UnsupportedOperationException.class, () -> assignment.warmupTasks().put("not allowed", Collections.emptySet())); + } + + @Test + public void testFromTargetAssignmentRecord() { + List activeTasks = new ArrayList<>(); + activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_1) + .setPartitions(Arrays.asList(1, 2, 3))); + activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_2) + .setPartitions(Arrays.asList(4, 5, 6))); + List standbyTasks = new ArrayList<>(); + standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_1) + .setPartitions(Arrays.asList(7, 8, 9))); + standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_2) + .setPartitions(Arrays.asList(1, 2, 3))); + List warmupTasks = new ArrayList<>(); + warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_1) + .setPartitions(Arrays.asList(4, 5, 6))); + warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_2) + .setPartitions(Arrays.asList(7, 8, 9))); + + StreamsGroupTargetAssignmentMemberValue record = new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTasks) + .setStandbyTasks(standbyTasks) + .setWarmupTasks(warmupTasks); + + Assignment assignment = Assignment.fromRecord(record); + + assertEquals( + mkTasksPerSubtopology( + mkTasks(SUBTOPOLOGY_1, 1, 2, 3), + mkTasks(SUBTOPOLOGY_2, 4, 5, 6) + ), + assignment.activeTasks() + ); + assertEquals( + mkTasksPerSubtopology( + mkTasks(SUBTOPOLOGY_1, 7, 8, 9), + mkTasks(SUBTOPOLOGY_2, 1, 2, 3) + ), + assignment.standbyTasks() + ); + assertEquals( + mkTasksPerSubtopology( + mkTasks(SUBTOPOLOGY_1, 4, 5, 6), + mkTasks(SUBTOPOLOGY_2, 7, 8, 9) + ), + assignment.warmupTasks() + ); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java new file mode 100644 index 00000000000..8c6d3d9088a --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class StreamsGroupMemberTest { + + private static final String MEMBER_ID = "member-id"; + private static final int MEMBER_EPOCH = 10; + private static final int PREVIOUS_MEMBER_EPOCH = 9; + private static final MemberState STATE = MemberState.UNRELEASED_TASKS; + private static final String INSTANCE_ID = "instance-id"; + private static final String RACK_ID = "rack-id"; + private static final int REBALANCE_TIMEOUT = 5000; + private static final String CLIENT_ID = "client-id"; + private static final String HOSTNAME = "hostname"; + private static final int TOPOLOGY_EPOCH = 3; + private static final String PROCESS_ID = "process-id"; + private static final String SUBTOPOLOGY1 = "subtopology1"; + private static final String SUBTOPOLOGY2 = "subtopology2"; + private static final String SUBTOPOLOGY3 = "subtopology3"; + private static final StreamsGroupMemberMetadataValue.Endpoint USER_ENDPOINT = + new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090); + private static final String CLIENT_TAG_KEY = "client"; + private static final String CLIENT_TAG_VALUE = "tag"; + private static final Map CLIENT_TAGS = mkMap(mkEntry(CLIENT_TAG_KEY, CLIENT_TAG_VALUE)); + private static final List TASKS1 = List.of(1, 2, 3); + private static final List TASKS2 = List.of(4, 5, 6); + private static final List TASKS3 = List.of(7, 8); + private static final List TASKS4 = List.of(3, 2, 1); + private static final List TASKS5 = List.of(6, 5, 4); + private static final List TASKS6 = List.of(9, 7); + private static final Map> ASSIGNED_ACTIVE_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new))); + private static final Map> ASSIGNED_STANDBY_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new))); + private static final Map> ASSIGNED_WARMUP_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new))); + private static final Map> ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS4.toArray(Integer[]::new))); + private static final Map> STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS5.toArray(Integer[]::new))); + private static final Map> WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS6.toArray(Integer[]::new))); + + @Test + public void testBuilderWithMemberIdIsNull() { + final Exception exception = assertThrows( + NullPointerException.class, + () -> new StreamsGroupMember.Builder((String) null).build() + ); + assertEquals("memberId cannot be null", exception.getMessage()); + } + + @Test + public void testBuilderWithMemberIsNull() { + final Exception exception = assertThrows( + NullPointerException.class, + () -> new StreamsGroupMember.Builder((StreamsGroupMember) null).build() + ); + assertEquals("member cannot be null", exception.getMessage()); + } + + @Test + public void testBuilderWithDefaults() { + StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID).build(); + + assertEquals(MEMBER_ID, member.memberId()); + assertNull(member.memberEpoch()); + assertNull(member.previousMemberEpoch()); + assertNull(member.state()); + assertNull(member.instanceId()); + assertNull(member.rackId()); + assertNull(member.rebalanceTimeoutMs()); + assertNull(member.clientId()); + assertNull(member.clientHost()); + assertNull(member.topologyEpoch()); + assertNull(member.processId()); + assertNull(member.userEndpoint()); + assertNull(member.clientTags()); + assertNull(member.assignedActiveTasks()); + assertNull(member.assignedStandbyTasks()); + assertNull(member.assignedWarmupTasks()); + assertNull(member.activeTasksPendingRevocation()); + assertNull(member.standbyTasksPendingRevocation()); + assertNull(member.warmupTasksPendingRevocation()); + } + + @Test + public void testBuilderNewMember() { + StreamsGroupMember member = createStreamsGroupMember(); + + assertEquals(MEMBER_ID, member.memberId()); + assertEquals(MEMBER_EPOCH, member.memberEpoch()); + assertEquals(PREVIOUS_MEMBER_EPOCH, member.previousMemberEpoch()); + assertEquals(STATE, member.state()); + assertEquals(Optional.of(INSTANCE_ID), member.instanceId()); + assertEquals(Optional.of(RACK_ID), member.rackId()); + assertEquals(CLIENT_ID, member.clientId()); + assertEquals(HOSTNAME, member.clientHost()); + assertEquals(TOPOLOGY_EPOCH, member.topologyEpoch()); + assertEquals(PROCESS_ID, member.processId()); + assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint()); + assertEquals(CLIENT_TAGS, member.clientTags()); + assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks()); + assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks()); + assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks()); + assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, member.activeTasksPendingRevocation()); + assertEquals(STANDBY_TASKS_PENDING_REVOCATION, member.standbyTasksPendingRevocation()); + assertEquals(WARMUP_TASKS_PENDING_REVOCATION, member.warmupTasksPendingRevocation()); + } + + @Test + public void testBuilderUpdateWithStreamsGroupMemberMetadataValue() { + StreamsGroupMemberMetadataValue record = new StreamsGroupMemberMetadataValue() + .setClientId(CLIENT_ID) + .setClientHost(HOSTNAME) + .setInstanceId(INSTANCE_ID) + .setRackId(RACK_ID) + .setRebalanceTimeoutMs(REBALANCE_TIMEOUT) + .setTopologyEpoch(TOPOLOGY_EPOCH) + .setProcessId(PROCESS_ID) + .setUserEndpoint(USER_ENDPOINT) + .setClientTags(CLIENT_TAGS.entrySet().stream() + .map(e -> new KeyValue().setKey(e.getKey()).setValue(e.getValue())) + .collect(Collectors.toList())); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .updateWith(record) + .build(); + + assertEquals(record.clientId(), member.clientId()); + assertEquals(record.clientHost(), member.clientHost()); + assertEquals(Optional.of(record.instanceId()), member.instanceId()); + assertEquals(Optional.of(record.rackId()), member.rackId()); + assertEquals(record.rebalanceTimeoutMs(), member.rebalanceTimeoutMs()); + assertEquals(record.topologyEpoch(), member.topologyEpoch()); + assertEquals(record.processId(), member.processId()); + assertEquals(Optional.of(record.userEndpoint()), member.userEndpoint()); + assertEquals( + record.clientTags().stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value)), + member.clientTags() + ); + assertEquals(MEMBER_ID, member.memberId()); + assertNull(member.memberEpoch()); + assertNull(member.previousMemberEpoch()); + assertNull(member.state()); + assertNull(member.assignedActiveTasks()); + assertNull(member.assignedStandbyTasks()); + assertNull(member.assignedWarmupTasks()); + assertNull(member.activeTasksPendingRevocation()); + assertNull(member.standbyTasksPendingRevocation()); + assertNull(member.warmupTasksPendingRevocation()); + } + + @Test + public void testBuilderUpdateWithConsumerGroupCurrentMemberAssignmentValue() { + StreamsGroupCurrentMemberAssignmentValue record = new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(MEMBER_EPOCH) + .setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH) + .setState(STATE.value()) + .setActiveTasks(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS1))) + .setStandbyTasks(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS2))) + .setWarmupTasks(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS3))) + .setActiveTasksPendingRevocation(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS4))) + .setStandbyTasksPendingRevocation(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS5))) + .setWarmupTasksPendingRevocation(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS6))); + + StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID) + .updateWith(record) + .build(); + + assertEquals(MEMBER_ID, member.memberId()); + assertEquals(record.memberEpoch(), member.memberEpoch()); + assertEquals(record.previousMemberEpoch(), member.previousMemberEpoch()); + assertEquals(MemberState.fromValue(record.state()), member.state()); + assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks()); + assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks()); + assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks()); + assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, member.activeTasksPendingRevocation()); + assertEquals(STANDBY_TASKS_PENDING_REVOCATION, member.standbyTasksPendingRevocation()); + assertEquals(WARMUP_TASKS_PENDING_REVOCATION, member.warmupTasksPendingRevocation()); + assertNull(member.instanceId()); + assertNull(member.rackId()); + assertNull(member.rebalanceTimeoutMs()); + assertNull(member.clientId()); + assertNull(member.clientHost()); + assertNull(member.topologyEpoch()); + assertNull(member.processId()); + assertNull(member.userEndpoint()); + assertNull(member.clientTags()); + } + + @Test + public void testBuilderMaybeUpdateMember() { + final StreamsGroupMember member = createStreamsGroupMember(); + + // This is a no-op. + StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member) + .maybeUpdateRackId(Optional.empty()) + .maybeUpdateInstanceId(Optional.empty()) + .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty()) + .maybeUpdateProcessId(Optional.empty()) + .maybeUpdateTopologyEpoch(OptionalInt.empty()) + .maybeUpdateUserEndpoint(Optional.empty()) + .maybeUpdateClientTags(Optional.empty()) + .build(); + + assertEquals(member, updatedMember); + + final String newRackId = "new" + member.rackId(); + final String newInstanceId = "new" + member.instanceId(); + final Integer newRebalanceTimeout = member.rebalanceTimeoutMs() + 1000; + final String newProcessId = "new" + member.processId(); + final Integer newTopologyEpoch = member.topologyEpoch() + 1; + final StreamsGroupMemberMetadataValue.Endpoint newUserEndpoint = + new StreamsGroupMemberMetadataValue.Endpoint().setHost(member.userEndpoint().get().host() + "2").setPort(9090); + final Map newClientTags = new HashMap<>(member.clientTags()); + newClientTags.put("client2", "tag2"); + + updatedMember = new StreamsGroupMember.Builder(member) + .maybeUpdateRackId(Optional.of(newRackId)) + .maybeUpdateInstanceId(Optional.of(newInstanceId)) + .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000)) + .maybeUpdateProcessId(Optional.of(newProcessId)) + .maybeUpdateTopologyEpoch(OptionalInt.of(newTopologyEpoch)) + .maybeUpdateUserEndpoint(Optional.of(newUserEndpoint)) + .maybeUpdateClientTags(Optional.of(newClientTags)) + .build(); + + assertEquals(Optional.of(newRackId), updatedMember.rackId()); + assertEquals(Optional.of(newInstanceId), updatedMember.instanceId()); + assertEquals(newRebalanceTimeout, updatedMember.rebalanceTimeoutMs()); + assertEquals(newProcessId, updatedMember.processId()); + assertEquals(newTopologyEpoch, updatedMember.topologyEpoch()); + assertEquals(Optional.of(newUserEndpoint), updatedMember.userEndpoint()); + assertEquals(newClientTags, updatedMember.clientTags()); + assertEquals(member.memberId(), updatedMember.memberId()); + assertEquals(member.memberEpoch(), updatedMember.memberEpoch()); + assertEquals(member.previousMemberEpoch(), updatedMember.previousMemberEpoch()); + assertEquals(member.state(), updatedMember.state()); + assertEquals(member.clientId(), updatedMember.clientId()); + assertEquals(member.clientHost(), updatedMember.clientHost()); + assertEquals(member.assignedActiveTasks(), updatedMember.assignedActiveTasks()); + assertEquals(member.assignedStandbyTasks(), updatedMember.assignedStandbyTasks()); + assertEquals(member.assignedWarmupTasks(), updatedMember.assignedWarmupTasks()); + assertEquals(member.activeTasksPendingRevocation(), updatedMember.activeTasksPendingRevocation()); + assertEquals(member.standbyTasksPendingRevocation(), updatedMember.standbyTasksPendingRevocation()); + assertEquals(member.warmupTasksPendingRevocation(), updatedMember.warmupTasksPendingRevocation()); + } + + @Test + public void testBuilderUpdateMemberEpoch() { + final StreamsGroupMember member = createStreamsGroupMember(); + + final int newMemberEpoch = member.memberEpoch() + 1; + final StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member) + .updateMemberEpoch(newMemberEpoch) + .build(); + + assertEquals(member.memberId(), updatedMember.memberId()); + assertEquals(newMemberEpoch, updatedMember.memberEpoch()); + // The previous member epoch becomes the old current member epoch. + assertEquals(member.memberEpoch(), updatedMember.previousMemberEpoch()); + assertEquals(member.state(), updatedMember.state()); + assertEquals(member.instanceId(), updatedMember.instanceId()); + assertEquals(member.rackId(), updatedMember.rackId()); + assertEquals(member.rebalanceTimeoutMs(), updatedMember.rebalanceTimeoutMs()); + assertEquals(member.clientId(), updatedMember.clientId()); + assertEquals(member.clientHost(), updatedMember.clientHost()); + assertEquals(member.topologyEpoch(), updatedMember.topologyEpoch()); + assertEquals(member.processId(), updatedMember.processId()); + assertEquals(member.userEndpoint(), updatedMember.userEndpoint()); + assertEquals(member.clientTags(), updatedMember.clientTags()); + assertEquals(member.assignedActiveTasks(), updatedMember.assignedActiveTasks()); + assertEquals(member.assignedStandbyTasks(), updatedMember.assignedStandbyTasks()); + assertEquals(member.assignedWarmupTasks(), updatedMember.assignedWarmupTasks()); + assertEquals(member.activeTasksPendingRevocation(), updatedMember.activeTasksPendingRevocation()); + assertEquals(member.standbyTasksPendingRevocation(), updatedMember.standbyTasksPendingRevocation()); + assertEquals(member.warmupTasksPendingRevocation(), updatedMember.warmupTasksPendingRevocation()); + } + + @Test + public void testReturnUnmodifiableFields() { + final StreamsGroupMember member = createStreamsGroupMember(); + + assertThrows(UnsupportedOperationException.class, () -> member.clientTags().put("not allowed", "")); + assertThrows(UnsupportedOperationException.class, () -> member.assignedActiveTasks().put("not allowed", Collections.emptySet())); + assertThrows(UnsupportedOperationException.class, () -> member.assignedStandbyTasks().put("not allowed", Collections.emptySet())); + assertThrows(UnsupportedOperationException.class, () -> member.assignedWarmupTasks().put("not allowed", Collections.emptySet())); + assertThrows(UnsupportedOperationException.class, () -> member.activeTasksPendingRevocation().put("not allowed", Collections.emptySet())); + assertThrows(UnsupportedOperationException.class, () -> member.standbyTasksPendingRevocation().put("not allowed", Collections.emptySet())); + assertThrows(UnsupportedOperationException.class, () -> member.warmupTasksPendingRevocation().put("not allowed", Collections.emptySet())); + } + + @Test + public void testAsStreamsGroupDescribeMember() { + final StreamsGroupMember member = createStreamsGroupMember(); + List assignedTasks1 = Arrays.asList(10, 11, 12); + List assignedTasks2 = Arrays.asList(13, 14, 15); + List assignedTasks3 = Arrays.asList(16, 17, 18); + Assignment targetAssignment = new Assignment( + mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))), + mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))), + mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1))) + ); + + StreamsGroupDescribeResponseData.Member actual = member.asStreamsGroupDescribeMember(targetAssignment); + StreamsGroupDescribeResponseData.Member expected = new StreamsGroupDescribeResponseData.Member() + .setMemberId(MEMBER_ID) + .setMemberEpoch(MEMBER_EPOCH) + .setClientId(CLIENT_ID) + .setInstanceId(INSTANCE_ID) + .setRackId(RACK_ID) + .setClientHost(HOSTNAME) + .setProcessId(PROCESS_ID) + .setTopologyEpoch(TOPOLOGY_EPOCH) + .setClientTags(List.of( + new StreamsGroupDescribeResponseData.KeyValue().setKey(CLIENT_TAG_KEY).setValue(CLIENT_TAG_VALUE)) + ) + .setAssignment( + new StreamsGroupDescribeResponseData.Assignment() + .setActiveTasks(List.of( + new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY1) + .setPartitions(TASKS1)) + ) + .setStandbyTasks(List.of( + new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY2) + .setPartitions(TASKS2)) + ) + .setWarmupTasks(List.of( + new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY1) + .setPartitions(TASKS3)) + ) + ) + .setTargetAssignment( + new StreamsGroupDescribeResponseData.Assignment() + .setActiveTasks(List.of( + new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY1) + .setPartitions(assignedTasks3)) + ) + .setStandbyTasks(List.of( + new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY2) + .setPartitions(assignedTasks2)) + ) + .setWarmupTasks(List.of( + new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY3) + .setPartitions(assignedTasks1)) + ) + ) + .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint() + .setHost(USER_ENDPOINT.host()) + .setPort(USER_ENDPOINT.port()) + ); + + assertEquals(expected, actual); + } + + @Test + public void testAsStreamsGroupDescribeWithTargetAssignmentNull() { + final StreamsGroupMember member = createStreamsGroupMember(); + StreamsGroupDescribeResponseData.Member streamsGroupDescribeMember = member.asStreamsGroupDescribeMember(null); + + assertEquals(new StreamsGroupDescribeResponseData.Assignment(), streamsGroupDescribeMember.targetAssignment()); + } + + private StreamsGroupMember createStreamsGroupMember() { + return new StreamsGroupMember.Builder(MEMBER_ID) + .setMemberEpoch(MEMBER_EPOCH) + .setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH) + .setState(STATE) + .setInstanceId(INSTANCE_ID) + .setRackId(RACK_ID) + .setRebalanceTimeoutMs(REBALANCE_TIMEOUT) + .setClientId(CLIENT_ID) + .setClientHost(HOSTNAME) + .setTopologyEpoch(TOPOLOGY_EPOCH) + .setProcessId(PROCESS_ID) + .setUserEndpoint(USER_ENDPOINT) + .setClientTags(CLIENT_TAGS) + .setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS) + .setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS) + .setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS) + .setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION) + .setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION) + .setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION) + .build(); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java new file mode 100644 index 00000000000..47668ec84c0 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class TaskAssignmentTestUtil { + + public static Assignment mkAssignment(final Map> activeTasks, + final Map> standbyTasks, + final Map> warmupTasks) { + return new Assignment( + Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)), + Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)), + Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)) + ); + } + + public static Map.Entry> mkTasks(String subtopologyId, + Integer... tasks) { + return new AbstractMap.SimpleEntry<>( + subtopologyId, + new HashSet<>(List.of(tasks)) + ); + } + + @SafeVarargs + public static Map> mkTasksPerSubtopology(Map.Entry>... entries) { + Map> assignment = new HashMap<>(); + for (Map.Entry> entry : entries) { + assignment.put(entry.getKey(), entry.getValue()); + } + return assignment; + } +}