KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes (#18276)

* KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes

This commit adds the classes to represent a Streams group member in the
consumer coordinator.

Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2025-01-08 17:26:41 +01:00 committed by GitHub
parent aa22676c48
commit 624dd45809
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1233 additions and 0 deletions

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* An immutable assignment for a member.
*
* @param activeTasks Active tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param standbyTasks Standby tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param warmupTasks Warm-up tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
*/
public record Assignment(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,
Map<String, Set<Integer>> warmupTasks) {
public Assignment {
activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
}
/**
* An empty assignment.
*/
public static final Assignment EMPTY = new Assignment(
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
/**
* Creates a {{@link org.apache.kafka.coordinator.group.streams.Assignment}} from a
* {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
*
* @param record The record.
* @return A {{@link org.apache.kafka.coordinator.group.streams.Assignment}}.
*/
public static Assignment fromRecord(
StreamsGroupTargetAssignmentMemberValue record
) {
return new Assignment(
record.activeTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
record.standbyTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
record.warmupTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
)
);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import java.util.HashMap;
import java.util.Map;
/**
* The various states that a member can be in. For their definition, refer to the documentation of
* {{@link org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}.
*/
public enum MemberState {
/**
* The member is fully reconciled with the desired target assignment.
*/
STABLE((byte) 1),
/**
* The member must revoke some tasks in order to be able to transition to the next epoch.
*/
UNREVOKED_TASKS((byte) 2),
/**
* The member transitioned to the last epoch but waits on some tasks which have not been revoked by their previous owners yet.
*/
UNRELEASED_TASKS((byte) 3),
/**
* The member is in an unknown state. This can only happen if a future version of the software introduces a new state unknown by this
* version.
*/
UNKNOWN((byte) 127);
private static final Map<Byte, MemberState> VALUES_TO_ENUMS = new HashMap<>();
static {
for (MemberState state : MemberState.values()) {
VALUES_TO_ENUMS.put(state.value(), state);
}
}
private final byte value;
MemberState(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static MemberState fromValue(byte value) {
MemberState state = VALUES_TO_ENUMS.get(value);
if (state == null) {
return UNKNOWN;
}
return state;
}
}

View File

@ -0,0 +1,463 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Contains all information related to a member within a Streams group.
* <p>
* This class is immutable and is fully backed by records stored in the __consumer_offsets topic.
*
* @param memberId The ID of the member.
* @param memberEpoch The current epoch of the member.
* @param previousMemberEpoch The previous epoch of the member.
* @param state The current state of the member.
* @param instanceId The instance ID of the member.
* @param rackId The rack ID of the member.
* @param clientId The client ID of the member.
* @param clientHost The host of the member.
* @param rebalanceTimeoutMs The rebalance timeout in milliseconds.
* @param topologyEpoch The epoch of the topology the member uses.
* @param processId The ID of the Streams client that contains the member.
* @param userEndpoint The user endpoint exposed for Interactive Queries by the Streams client that
* contains the member.
* @param clientTags Tags of the client of the member used for rack-aware assignment.
* @param assignedActiveTasks Active tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param assignedStandbyTasks Standby tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param assignedWarmupTasks Warm-up tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param activeTasksPendingRevocation Active tasks assigned to the member pending revocation.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param standbyTasksPendingRevocation Standby tasks assigned to the member pending revocation.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param warmupTasksPendingRevocation Warm-up tasks assigned to the member pending revocation.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
*/
@SuppressWarnings("checkstyle:JavaNCSS")
public record StreamsGroupMember(String memberId,
Integer memberEpoch,
Integer previousMemberEpoch,
MemberState state,
Optional<String> instanceId,
Optional<String> rackId,
String clientId,
String clientHost,
Integer rebalanceTimeoutMs,
Integer topologyEpoch,
String processId,
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
Map<String, String> clientTags,
Map<String, Set<Integer>> assignedActiveTasks,
Map<String, Set<Integer>> assignedStandbyTasks,
Map<String, Set<Integer>> assignedWarmupTasks,
Map<String, Set<Integer>> activeTasksPendingRevocation,
Map<String, Set<Integer>> standbyTasksPendingRevocation,
Map<String, Set<Integer>> warmupTasksPendingRevocation) {
public StreamsGroupMember {
Objects.requireNonNull(memberId, "memberId cannot be null");
clientTags = clientTags != null ? Collections.unmodifiableMap(clientTags) : null;
assignedActiveTasks = assignedActiveTasks != null ? Collections.unmodifiableMap(assignedActiveTasks) : null;
assignedStandbyTasks = assignedStandbyTasks != null ? Collections.unmodifiableMap(assignedStandbyTasks) : null;
assignedWarmupTasks = assignedWarmupTasks != null ? Collections.unmodifiableMap(assignedWarmupTasks) : null;
activeTasksPendingRevocation = activeTasksPendingRevocation != null ? Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
standbyTasksPendingRevocation = standbyTasksPendingRevocation != null ? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ? Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
}
/**
* A builder that facilitates the creation of a new member or the update of an existing one.
* <p>
* Please refer to the javadoc of {{@link StreamsGroupMember}} for the definition of the fields.
*/
public static class Builder {
private final String memberId;
private Integer memberEpoch = null;
private Integer previousMemberEpoch = null;
private MemberState state = null;
private Optional<String> instanceId = null;
private Optional<String> rackId = null;
private Integer rebalanceTimeoutMs = null;
private String clientId = null;
private String clientHost = null;
private Integer topologyEpoch = null;
private String processId = null;
private Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint = null;
private Map<String, String> clientTags = null;
private Map<String, Set<Integer>> assignedActiveTasks = null;
private Map<String, Set<Integer>> assignedStandbyTasks = null;
private Map<String, Set<Integer>> assignedWarmupTasks = null;
private Map<String, Set<Integer>> activeTasksPendingRevocation = null;
private Map<String, Set<Integer>> standbyTasksPendingRevocation = null;
private Map<String, Set<Integer>> warmupTasksPendingRevocation = null;
public Builder(String memberId) {
this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null");
}
public Builder(StreamsGroupMember member) {
Objects.requireNonNull(member, "member cannot be null");
this.memberId = member.memberId;
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
this.instanceId = member.instanceId;
this.rackId = member.rackId;
this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
this.clientId = member.clientId;
this.clientHost = member.clientHost;
this.topologyEpoch = member.topologyEpoch;
this.processId = member.processId;
this.userEndpoint = member.userEndpoint;
this.clientTags = member.clientTags;
this.state = member.state;
this.assignedActiveTasks = member.assignedActiveTasks;
this.assignedStandbyTasks = member.assignedStandbyTasks;
this.assignedWarmupTasks = member.assignedWarmupTasks;
this.activeTasksPendingRevocation = member.activeTasksPendingRevocation;
this.standbyTasksPendingRevocation = member.standbyTasksPendingRevocation;
this.warmupTasksPendingRevocation = member.warmupTasksPendingRevocation;
}
public Builder updateMemberEpoch(int memberEpoch) {
int currentMemberEpoch = this.memberEpoch;
this.memberEpoch = memberEpoch;
this.previousMemberEpoch = currentMemberEpoch;
return this;
}
public Builder setMemberEpoch(int memberEpoch) {
this.memberEpoch = memberEpoch;
return this;
}
public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
this.previousMemberEpoch = previousMemberEpoch;
return this;
}
public Builder setInstanceId(String instanceId) {
this.instanceId = Optional.ofNullable(instanceId);
return this;
}
public Builder maybeUpdateInstanceId(Optional<String> instanceId) {
instanceId.ifPresent(this::setInstanceId);
return this;
}
public Builder setRackId(String rackId) {
this.rackId = Optional.ofNullable(rackId);
return this;
}
public Builder maybeUpdateRackId(Optional<String> rackId) {
rackId.ifPresent(this::setRackId);
return this;
}
public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
return this;
}
public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt rebalanceTimeoutMs) {
this.rebalanceTimeoutMs = rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs);
return this;
}
public Builder setClientId(String clientId) {
this.clientId = clientId;
return this;
}
public Builder setClientHost(String clientHost) {
this.clientHost = clientHost;
return this;
}
public Builder setState(MemberState state) {
this.state = state;
return this;
}
public Builder setTopologyEpoch(int topologyEpoch) {
this.topologyEpoch = topologyEpoch;
return this;
}
public Builder maybeUpdateTopologyEpoch(OptionalInt topologyEpoch) {
this.topologyEpoch = topologyEpoch.orElse(this.topologyEpoch);
return this;
}
public Builder setProcessId(String processId) {
this.processId = processId;
return this;
}
public Builder maybeUpdateProcessId(Optional<String> processId) {
this.processId = processId.orElse(this.processId);
return this;
}
public Builder setUserEndpoint(StreamsGroupMemberMetadataValue.Endpoint userEndpoint) {
this.userEndpoint = Optional.ofNullable(userEndpoint);
return this;
}
public Builder maybeUpdateUserEndpoint(Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint) {
userEndpoint.ifPresent(this::setUserEndpoint);
return this;
}
public Builder setClientTags(Map<String, String> clientTags) {
this.clientTags = clientTags;
return this;
}
public Builder maybeUpdateClientTags(Optional<Map<String, String>> clientTags) {
this.clientTags = clientTags.orElse(this.clientTags);
return this;
}
public Builder setAssignment(Assignment assignment) {
this.assignedActiveTasks = assignment.activeTasks();
this.assignedStandbyTasks = assignment.standbyTasks();
this.assignedWarmupTasks = assignment.warmupTasks();
return this;
}
public Builder setAssignedActiveTasks(Map<String, Set<Integer>> assignedActiveTasks) {
this.assignedActiveTasks = assignedActiveTasks;
return this;
}
public Builder setAssignedStandbyTasks(Map<String, Set<Integer>> assignedStandbyTasks) {
this.assignedStandbyTasks = assignedStandbyTasks;
return this;
}
public Builder setAssignedWarmupTasks(Map<String, Set<Integer>> assignedWarmupTasks) {
this.assignedWarmupTasks = assignedWarmupTasks;
return this;
}
public Builder setAssignmentPendingRevocation(Assignment assignment) {
this.activeTasksPendingRevocation = assignment.activeTasks();
this.standbyTasksPendingRevocation = assignment.standbyTasks();
this.warmupTasksPendingRevocation = assignment.warmupTasks();
return this;
}
public Builder setActiveTasksPendingRevocation(
Map<String, Set<Integer>> activeTasksPendingRevocation) {
this.activeTasksPendingRevocation = activeTasksPendingRevocation;
return this;
}
public Builder setStandbyTasksPendingRevocation(
Map<String, Set<Integer>> standbyTasksPendingRevocation) {
this.standbyTasksPendingRevocation = standbyTasksPendingRevocation;
return this;
}
public Builder setWarmupTasksPendingRevocation(
Map<String, Set<Integer>> warmupTasksPendingRevocation) {
this.warmupTasksPendingRevocation = warmupTasksPendingRevocation;
return this;
}
public Builder updateWith(StreamsGroupMemberMetadataValue record) {
setInstanceId(record.instanceId());
setRackId(record.rackId());
setClientId(record.clientId());
setClientHost(record.clientHost());
setRebalanceTimeoutMs(record.rebalanceTimeoutMs());
setTopologyEpoch(record.topologyEpoch());
setProcessId(record.processId());
setUserEndpoint(record.userEndpoint());
setClientTags(record.clientTags().stream().collect(Collectors.toMap(
StreamsGroupMemberMetadataValue.KeyValue::key,
StreamsGroupMemberMetadataValue.KeyValue::value
)));
return this;
}
public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue record) {
setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state()));
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks()));
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks()));
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks()));
setActiveTasksPendingRevocation(
assignmentFromTaskIds(record.activeTasksPendingRevocation()));
setStandbyTasksPendingRevocation(
assignmentFromTaskIds(record.standbyTasksPendingRevocation()));
setWarmupTasksPendingRevocation(
assignmentFromTaskIds(record.warmupTasksPendingRevocation()));
return this;
}
private static Map<String, Set<Integer>> assignmentFromTaskIds(
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> topicPartitionsList
) {
return topicPartitionsList.stream().collect(Collectors.toMap(
StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId,
taskIds -> Set.copyOf(taskIds.partitions())));
}
public StreamsGroupMember build() {
return new StreamsGroupMember(
memberId,
memberEpoch,
previousMemberEpoch,
state,
instanceId,
rackId,
clientId,
clientHost,
rebalanceTimeoutMs,
topologyEpoch,
processId,
userEndpoint,
clientTags,
assignedActiveTasks,
assignedStandbyTasks,
assignedWarmupTasks,
activeTasksPendingRevocation,
standbyTasksPendingRevocation,
warmupTasksPendingRevocation
);
}
}
/**
* @return True if the member is in the Stable state and at the desired epoch.
*/
public boolean isReconciledTo(int targetAssignmentEpoch) {
return state == MemberState.STABLE && memberEpoch == targetAssignmentEpoch;
}
/**
* Creates a member description for the Streams group describe response from this member.
*
* @param targetAssignment The target assignment of this member in the corresponding group.
*
* @return The StreamsGroupMember mapped as StreamsGroupDescribeResponseData.Member.
*/
public StreamsGroupDescribeResponseData.Member asStreamsGroupDescribeMember(
Assignment targetAssignment
) {
final StreamsGroupDescribeResponseData.Assignment describedTargetAssignment =
new StreamsGroupDescribeResponseData.Assignment();
if (targetAssignment != null) {
describedTargetAssignment
.setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks()))
.setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks()))
.setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks()));
}
return new StreamsGroupDescribeResponseData.Member()
.setMemberEpoch(memberEpoch)
.setMemberId(memberId)
.setAssignment(
new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(taskIdsFromMap(assignedActiveTasks))
.setStandbyTasks(taskIdsFromMap(assignedStandbyTasks))
.setWarmupTasks(taskIdsFromMap(assignedWarmupTasks)))
.setTargetAssignment(describedTargetAssignment)
.setClientHost(clientHost)
.setClientId(clientId)
.setInstanceId(instanceId.orElse(null))
.setRackId(rackId.orElse(null))
.setClientTags(clientTags.entrySet().stream().map(
entry -> new StreamsGroupDescribeResponseData.KeyValue()
.setKey(entry.getKey())
.setValue(entry.getValue())
).collect(Collectors.toList()))
.setProcessId(processId)
.setTopologyEpoch(topologyEpoch)
.setUserEndpoint(
userEndpoint.map(
endpoint -> new StreamsGroupDescribeResponseData.Endpoint()
.setHost(endpoint.host())
.setPort(endpoint.port())
).orElse(null)
);
}
private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap(
Map<String, Set<Integer>> tasks
) {
List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>();
tasks.forEach((subtopologyId, partitionSet) -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(subtopologyId)
.setPartitions(new ArrayList<>(partitionSet)));
});
return taskIds;
}
/**
* @return True if the two provided members have different assigned active tasks.
*/
public static boolean hasAssignedActiveTasksChanged(
StreamsGroupMember member1,
StreamsGroupMember member2
) {
return !member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
}
/**
* @return True if the two provided members have different assigned active tasks.
*/
public static boolean hasAssignedStandbyTasksChanged(
StreamsGroupMember member1,
StreamsGroupMember member2
) {
return !member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
}
/**
* @return True if the two provided members have different assigned active tasks.
*/
public static boolean hasAssignedWarmupTasksChanged(
StreamsGroupMember member1,
StreamsGroupMember member2
) {
return !member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AssignmentTest {
static final String SUBTOPOLOGY_1 = "subtopology1";
static final String SUBTOPOLOGY_2 = "subtopology2";
static final String SUBTOPOLOGY_3 = "subtopology3";
@Test
public void testTasksCannotBeNull() {
assertThrows(NullPointerException.class, () -> new Assignment(null, Collections.emptyMap(), Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), null, Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), Collections.emptyMap(), null));
}
@Test
public void testReturnUnmodifiableTaskAssignments() {
Map<String, Set<Integer>> activeTasks = mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 1, 2, 3)
);
Map<String, Set<Integer>> standbyTasks = mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_2, 9, 8, 7)
);
Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_3, 4, 5, 6)
);
Assignment assignment = new Assignment(activeTasks, standbyTasks, warmupTasks);
assertEquals(activeTasks, assignment.activeTasks());
assertThrows(UnsupportedOperationException.class, () -> assignment.activeTasks().put("not allowed", Collections.emptySet()));
assertEquals(standbyTasks, assignment.standbyTasks());
assertThrows(UnsupportedOperationException.class, () -> assignment.standbyTasks().put("not allowed", Collections.emptySet()));
assertEquals(warmupTasks, assignment.warmupTasks());
assertThrows(UnsupportedOperationException.class, () -> assignment.warmupTasks().put("not allowed", Collections.emptySet()));
}
@Test
public void testFromTargetAssignmentRecord() {
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks = new ArrayList<>();
activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_1)
.setPartitions(Arrays.asList(1, 2, 3)));
activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_2)
.setPartitions(Arrays.asList(4, 5, 6)));
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks = new ArrayList<>();
standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_1)
.setPartitions(Arrays.asList(7, 8, 9)));
standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_2)
.setPartitions(Arrays.asList(1, 2, 3)));
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks = new ArrayList<>();
warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_1)
.setPartitions(Arrays.asList(4, 5, 6)));
warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_2)
.setPartitions(Arrays.asList(7, 8, 9)));
StreamsGroupTargetAssignmentMemberValue record = new StreamsGroupTargetAssignmentMemberValue()
.setActiveTasks(activeTasks)
.setStandbyTasks(standbyTasks)
.setWarmupTasks(warmupTasks);
Assignment assignment = Assignment.fromRecord(record);
assertEquals(
mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 1, 2, 3),
mkTasks(SUBTOPOLOGY_2, 4, 5, 6)
),
assignment.activeTasks()
);
assertEquals(
mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 7, 8, 9),
mkTasks(SUBTOPOLOGY_2, 1, 2, 3)
),
assignment.standbyTasks()
);
assertEquals(
mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 4, 5, 6),
mkTasks(SUBTOPOLOGY_2, 7, 8, 9)
),
assignment.warmupTasks()
);
}
}

View File

@ -0,0 +1,429 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class StreamsGroupMemberTest {
private static final String MEMBER_ID = "member-id";
private static final int MEMBER_EPOCH = 10;
private static final int PREVIOUS_MEMBER_EPOCH = 9;
private static final MemberState STATE = MemberState.UNRELEASED_TASKS;
private static final String INSTANCE_ID = "instance-id";
private static final String RACK_ID = "rack-id";
private static final int REBALANCE_TIMEOUT = 5000;
private static final String CLIENT_ID = "client-id";
private static final String HOSTNAME = "hostname";
private static final int TOPOLOGY_EPOCH = 3;
private static final String PROCESS_ID = "process-id";
private static final String SUBTOPOLOGY1 = "subtopology1";
private static final String SUBTOPOLOGY2 = "subtopology2";
private static final String SUBTOPOLOGY3 = "subtopology3";
private static final StreamsGroupMemberMetadataValue.Endpoint USER_ENDPOINT =
new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090);
private static final String CLIENT_TAG_KEY = "client";
private static final String CLIENT_TAG_VALUE = "tag";
private static final Map<String, String> CLIENT_TAGS = mkMap(mkEntry(CLIENT_TAG_KEY, CLIENT_TAG_VALUE));
private static final List<Integer> TASKS1 = List.of(1, 2, 3);
private static final List<Integer> TASKS2 = List.of(4, 5, 6);
private static final List<Integer> TASKS3 = List.of(7, 8);
private static final List<Integer> TASKS4 = List.of(3, 2, 1);
private static final List<Integer> TASKS5 = List.of(6, 5, 4);
private static final List<Integer> TASKS6 = List.of(9, 7);
private static final Map<String, Set<Integer>> ASSIGNED_ACTIVE_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new)));
private static final Map<String, Set<Integer>> ASSIGNED_STANDBY_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new)));
private static final Map<String, Set<Integer>> ASSIGNED_WARMUP_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new)));
private static final Map<String, Set<Integer>> ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS4.toArray(Integer[]::new)));
private static final Map<String, Set<Integer>> STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS5.toArray(Integer[]::new)));
private static final Map<String, Set<Integer>> WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS6.toArray(Integer[]::new)));
@Test
public void testBuilderWithMemberIdIsNull() {
final Exception exception = assertThrows(
NullPointerException.class,
() -> new StreamsGroupMember.Builder((String) null).build()
);
assertEquals("memberId cannot be null", exception.getMessage());
}
@Test
public void testBuilderWithMemberIsNull() {
final Exception exception = assertThrows(
NullPointerException.class,
() -> new StreamsGroupMember.Builder((StreamsGroupMember) null).build()
);
assertEquals("member cannot be null", exception.getMessage());
}
@Test
public void testBuilderWithDefaults() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID).build();
assertEquals(MEMBER_ID, member.memberId());
assertNull(member.memberEpoch());
assertNull(member.previousMemberEpoch());
assertNull(member.state());
assertNull(member.instanceId());
assertNull(member.rackId());
assertNull(member.rebalanceTimeoutMs());
assertNull(member.clientId());
assertNull(member.clientHost());
assertNull(member.topologyEpoch());
assertNull(member.processId());
assertNull(member.userEndpoint());
assertNull(member.clientTags());
assertNull(member.assignedActiveTasks());
assertNull(member.assignedStandbyTasks());
assertNull(member.assignedWarmupTasks());
assertNull(member.activeTasksPendingRevocation());
assertNull(member.standbyTasksPendingRevocation());
assertNull(member.warmupTasksPendingRevocation());
}
@Test
public void testBuilderNewMember() {
StreamsGroupMember member = createStreamsGroupMember();
assertEquals(MEMBER_ID, member.memberId());
assertEquals(MEMBER_EPOCH, member.memberEpoch());
assertEquals(PREVIOUS_MEMBER_EPOCH, member.previousMemberEpoch());
assertEquals(STATE, member.state());
assertEquals(Optional.of(INSTANCE_ID), member.instanceId());
assertEquals(Optional.of(RACK_ID), member.rackId());
assertEquals(CLIENT_ID, member.clientId());
assertEquals(HOSTNAME, member.clientHost());
assertEquals(TOPOLOGY_EPOCH, member.topologyEpoch());
assertEquals(PROCESS_ID, member.processId());
assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint());
assertEquals(CLIENT_TAGS, member.clientTags());
assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, member.activeTasksPendingRevocation());
assertEquals(STANDBY_TASKS_PENDING_REVOCATION, member.standbyTasksPendingRevocation());
assertEquals(WARMUP_TASKS_PENDING_REVOCATION, member.warmupTasksPendingRevocation());
}
@Test
public void testBuilderUpdateWithStreamsGroupMemberMetadataValue() {
StreamsGroupMemberMetadataValue record = new StreamsGroupMemberMetadataValue()
.setClientId(CLIENT_ID)
.setClientHost(HOSTNAME)
.setInstanceId(INSTANCE_ID)
.setRackId(RACK_ID)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT)
.setTopologyEpoch(TOPOLOGY_EPOCH)
.setProcessId(PROCESS_ID)
.setUserEndpoint(USER_ENDPOINT)
.setClientTags(CLIENT_TAGS.entrySet().stream()
.map(e -> new KeyValue().setKey(e.getKey()).setValue(e.getValue()))
.collect(Collectors.toList()));
StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
.updateWith(record)
.build();
assertEquals(record.clientId(), member.clientId());
assertEquals(record.clientHost(), member.clientHost());
assertEquals(Optional.of(record.instanceId()), member.instanceId());
assertEquals(Optional.of(record.rackId()), member.rackId());
assertEquals(record.rebalanceTimeoutMs(), member.rebalanceTimeoutMs());
assertEquals(record.topologyEpoch(), member.topologyEpoch());
assertEquals(record.processId(), member.processId());
assertEquals(Optional.of(record.userEndpoint()), member.userEndpoint());
assertEquals(
record.clientTags().stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value)),
member.clientTags()
);
assertEquals(MEMBER_ID, member.memberId());
assertNull(member.memberEpoch());
assertNull(member.previousMemberEpoch());
assertNull(member.state());
assertNull(member.assignedActiveTasks());
assertNull(member.assignedStandbyTasks());
assertNull(member.assignedWarmupTasks());
assertNull(member.activeTasksPendingRevocation());
assertNull(member.standbyTasksPendingRevocation());
assertNull(member.warmupTasksPendingRevocation());
}
@Test
public void testBuilderUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
StreamsGroupCurrentMemberAssignmentValue record = new StreamsGroupCurrentMemberAssignmentValue()
.setMemberEpoch(MEMBER_EPOCH)
.setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH)
.setState(STATE.value())
.setActiveTasks(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS1)))
.setStandbyTasks(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS2)))
.setWarmupTasks(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS3)))
.setActiveTasksPendingRevocation(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS4)))
.setStandbyTasksPendingRevocation(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS5)))
.setWarmupTasksPendingRevocation(List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS6)));
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.updateWith(record)
.build();
assertEquals(MEMBER_ID, member.memberId());
assertEquals(record.memberEpoch(), member.memberEpoch());
assertEquals(record.previousMemberEpoch(), member.previousMemberEpoch());
assertEquals(MemberState.fromValue(record.state()), member.state());
assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, member.activeTasksPendingRevocation());
assertEquals(STANDBY_TASKS_PENDING_REVOCATION, member.standbyTasksPendingRevocation());
assertEquals(WARMUP_TASKS_PENDING_REVOCATION, member.warmupTasksPendingRevocation());
assertNull(member.instanceId());
assertNull(member.rackId());
assertNull(member.rebalanceTimeoutMs());
assertNull(member.clientId());
assertNull(member.clientHost());
assertNull(member.topologyEpoch());
assertNull(member.processId());
assertNull(member.userEndpoint());
assertNull(member.clientTags());
}
@Test
public void testBuilderMaybeUpdateMember() {
final StreamsGroupMember member = createStreamsGroupMember();
// This is a no-op.
StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member)
.maybeUpdateRackId(Optional.empty())
.maybeUpdateInstanceId(Optional.empty())
.maybeUpdateRebalanceTimeoutMs(OptionalInt.empty())
.maybeUpdateProcessId(Optional.empty())
.maybeUpdateTopologyEpoch(OptionalInt.empty())
.maybeUpdateUserEndpoint(Optional.empty())
.maybeUpdateClientTags(Optional.empty())
.build();
assertEquals(member, updatedMember);
final String newRackId = "new" + member.rackId();
final String newInstanceId = "new" + member.instanceId();
final Integer newRebalanceTimeout = member.rebalanceTimeoutMs() + 1000;
final String newProcessId = "new" + member.processId();
final Integer newTopologyEpoch = member.topologyEpoch() + 1;
final StreamsGroupMemberMetadataValue.Endpoint newUserEndpoint =
new StreamsGroupMemberMetadataValue.Endpoint().setHost(member.userEndpoint().get().host() + "2").setPort(9090);
final Map<String, String> newClientTags = new HashMap<>(member.clientTags());
newClientTags.put("client2", "tag2");
updatedMember = new StreamsGroupMember.Builder(member)
.maybeUpdateRackId(Optional.of(newRackId))
.maybeUpdateInstanceId(Optional.of(newInstanceId))
.maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000))
.maybeUpdateProcessId(Optional.of(newProcessId))
.maybeUpdateTopologyEpoch(OptionalInt.of(newTopologyEpoch))
.maybeUpdateUserEndpoint(Optional.of(newUserEndpoint))
.maybeUpdateClientTags(Optional.of(newClientTags))
.build();
assertEquals(Optional.of(newRackId), updatedMember.rackId());
assertEquals(Optional.of(newInstanceId), updatedMember.instanceId());
assertEquals(newRebalanceTimeout, updatedMember.rebalanceTimeoutMs());
assertEquals(newProcessId, updatedMember.processId());
assertEquals(newTopologyEpoch, updatedMember.topologyEpoch());
assertEquals(Optional.of(newUserEndpoint), updatedMember.userEndpoint());
assertEquals(newClientTags, updatedMember.clientTags());
assertEquals(member.memberId(), updatedMember.memberId());
assertEquals(member.memberEpoch(), updatedMember.memberEpoch());
assertEquals(member.previousMemberEpoch(), updatedMember.previousMemberEpoch());
assertEquals(member.state(), updatedMember.state());
assertEquals(member.clientId(), updatedMember.clientId());
assertEquals(member.clientHost(), updatedMember.clientHost());
assertEquals(member.assignedActiveTasks(), updatedMember.assignedActiveTasks());
assertEquals(member.assignedStandbyTasks(), updatedMember.assignedStandbyTasks());
assertEquals(member.assignedWarmupTasks(), updatedMember.assignedWarmupTasks());
assertEquals(member.activeTasksPendingRevocation(), updatedMember.activeTasksPendingRevocation());
assertEquals(member.standbyTasksPendingRevocation(), updatedMember.standbyTasksPendingRevocation());
assertEquals(member.warmupTasksPendingRevocation(), updatedMember.warmupTasksPendingRevocation());
}
@Test
public void testBuilderUpdateMemberEpoch() {
final StreamsGroupMember member = createStreamsGroupMember();
final int newMemberEpoch = member.memberEpoch() + 1;
final StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member)
.updateMemberEpoch(newMemberEpoch)
.build();
assertEquals(member.memberId(), updatedMember.memberId());
assertEquals(newMemberEpoch, updatedMember.memberEpoch());
// The previous member epoch becomes the old current member epoch.
assertEquals(member.memberEpoch(), updatedMember.previousMemberEpoch());
assertEquals(member.state(), updatedMember.state());
assertEquals(member.instanceId(), updatedMember.instanceId());
assertEquals(member.rackId(), updatedMember.rackId());
assertEquals(member.rebalanceTimeoutMs(), updatedMember.rebalanceTimeoutMs());
assertEquals(member.clientId(), updatedMember.clientId());
assertEquals(member.clientHost(), updatedMember.clientHost());
assertEquals(member.topologyEpoch(), updatedMember.topologyEpoch());
assertEquals(member.processId(), updatedMember.processId());
assertEquals(member.userEndpoint(), updatedMember.userEndpoint());
assertEquals(member.clientTags(), updatedMember.clientTags());
assertEquals(member.assignedActiveTasks(), updatedMember.assignedActiveTasks());
assertEquals(member.assignedStandbyTasks(), updatedMember.assignedStandbyTasks());
assertEquals(member.assignedWarmupTasks(), updatedMember.assignedWarmupTasks());
assertEquals(member.activeTasksPendingRevocation(), updatedMember.activeTasksPendingRevocation());
assertEquals(member.standbyTasksPendingRevocation(), updatedMember.standbyTasksPendingRevocation());
assertEquals(member.warmupTasksPendingRevocation(), updatedMember.warmupTasksPendingRevocation());
}
@Test
public void testReturnUnmodifiableFields() {
final StreamsGroupMember member = createStreamsGroupMember();
assertThrows(UnsupportedOperationException.class, () -> member.clientTags().put("not allowed", ""));
assertThrows(UnsupportedOperationException.class, () -> member.assignedActiveTasks().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.assignedStandbyTasks().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.assignedWarmupTasks().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.activeTasksPendingRevocation().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.standbyTasksPendingRevocation().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.warmupTasksPendingRevocation().put("not allowed", Collections.emptySet()));
}
@Test
public void testAsStreamsGroupDescribeMember() {
final StreamsGroupMember member = createStreamsGroupMember();
List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12);
List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15);
List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18);
Assignment targetAssignment = new Assignment(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))),
mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1)))
);
StreamsGroupDescribeResponseData.Member actual = member.asStreamsGroupDescribeMember(targetAssignment);
StreamsGroupDescribeResponseData.Member expected = new StreamsGroupDescribeResponseData.Member()
.setMemberId(MEMBER_ID)
.setMemberEpoch(MEMBER_EPOCH)
.setClientId(CLIENT_ID)
.setInstanceId(INSTANCE_ID)
.setRackId(RACK_ID)
.setClientHost(HOSTNAME)
.setProcessId(PROCESS_ID)
.setTopologyEpoch(TOPOLOGY_EPOCH)
.setClientTags(List.of(
new StreamsGroupDescribeResponseData.KeyValue().setKey(CLIENT_TAG_KEY).setValue(CLIENT_TAG_VALUE))
)
.setAssignment(
new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(List.of(
new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY1)
.setPartitions(TASKS1))
)
.setStandbyTasks(List.of(
new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY2)
.setPartitions(TASKS2))
)
.setWarmupTasks(List.of(
new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY1)
.setPartitions(TASKS3))
)
)
.setTargetAssignment(
new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(List.of(
new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY1)
.setPartitions(assignedTasks3))
)
.setStandbyTasks(List.of(
new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY2)
.setPartitions(assignedTasks2))
)
.setWarmupTasks(List.of(
new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY3)
.setPartitions(assignedTasks1))
)
)
.setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint()
.setHost(USER_ENDPOINT.host())
.setPort(USER_ENDPOINT.port())
);
assertEquals(expected, actual);
}
@Test
public void testAsStreamsGroupDescribeWithTargetAssignmentNull() {
final StreamsGroupMember member = createStreamsGroupMember();
StreamsGroupDescribeResponseData.Member streamsGroupDescribeMember = member.asStreamsGroupDescribeMember(null);
assertEquals(new StreamsGroupDescribeResponseData.Assignment(), streamsGroupDescribeMember.targetAssignment());
}
private StreamsGroupMember createStreamsGroupMember() {
return new StreamsGroupMember.Builder(MEMBER_ID)
.setMemberEpoch(MEMBER_EPOCH)
.setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH)
.setState(STATE)
.setInstanceId(INSTANCE_ID)
.setRackId(RACK_ID)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT)
.setClientId(CLIENT_ID)
.setClientHost(HOSTNAME)
.setTopologyEpoch(TOPOLOGY_EPOCH)
.setProcessId(PROCESS_ID)
.setUserEndpoint(USER_ENDPOINT)
.setClientTags(CLIENT_TAGS)
.setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS)
.setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS)
.setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS)
.setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION)
.setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION)
.setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION)
.build();
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class TaskAssignmentTestUtil {
public static Assignment mkAssignment(final Map<String, Set<Integer>> activeTasks,
final Map<String, Set<Integer>> standbyTasks,
final Map<String, Set<Integer>> warmupTasks) {
return new Assignment(
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)),
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)),
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks))
);
}
public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId,
Integer... tasks) {
return new AbstractMap.SimpleEntry<>(
subtopologyId,
new HashSet<>(List.of(tasks))
);
}
@SafeVarargs
public static Map<String, Set<Integer>> mkTasksPerSubtopology(Map.Entry<String,
Set<Integer>>... entries) {
Map<String, Set<Integer>> assignment = new HashMap<>();
for (Map.Entry<String, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue());
}
return assignment;
}
}