KAFKA-18324: Add CurrentAssignmentBuilder (#18476)

Implements the current assignment builder, analogous to the current assignment builder of consumer groups. The main difference is the underlying assigned resource, and slightly different logic around process IDs: We make sure to move a task only to a new client, once the task is not owned anymore by any client with the same process ID (sharing the same state directory) - in any role (active, standby or warm-up).

Compared to the feature branch, the main difference is that I refactored the separate treatment of active, standby and warm-up tasks into a compound datatype called TaskTuple (which is used in place of the more specific Assignment class). This also has effects on StreamsGroupMember.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-01-23 17:35:03 +01:00 committed by GitHub
parent 7e46087570
commit aea699bdef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1529 additions and 236 deletions

View File

@ -0,0 +1,451 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
/**
* The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a
* member and a desired or target assignment state, the state machine takes the necessary steps to converge them.
*/
public class CurrentAssignmentBuilder {
/**
* The streams group member which is reconciled.
*/
private final StreamsGroupMember member;
/**
* The target assignment epoch.
*/
private int targetAssignmentEpoch;
/**
* The target assignment.
*/
private TasksTuple targetAssignment;
/**
* A function which returns the current process ID of an active task or null if the active task
* is not assigned. The current process ID is the process ID of the current owner.
*/
private BiFunction<String, Integer, String> currentActiveTaskProcessId;
/**
* A function which returns the current process IDs of a standby task or null if the standby
* task is not assigned. The current process IDs are the process IDs of all current owners.
*/
private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds;
/**
* A function which returns the current process IDs of a warmup task or null if the warmup task
* is not assigned. The current process IDs are the process IDs of all current owners.
*/
private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds;
/**
* The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request.
*/
private Optional<TasksTuple> ownedTasks = Optional.empty();
/**
* Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member.
*
* @param member The streams group member that must be reconciled.
*/
public CurrentAssignmentBuilder(StreamsGroupMember member) {
this.member = Objects.requireNonNull(member);
}
/**
* Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to.
*
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @return This object.
*/
public CurrentAssignmentBuilder withTargetAssignment(int targetAssignmentEpoch,
TasksTuple targetAssignment) {
this.targetAssignmentEpoch = targetAssignmentEpoch;
this.targetAssignment = Objects.requireNonNull(targetAssignment);
return this;
}
/**
* Sets a BiFunction which allows to retrieve the current process ID of an active task. This is
* used by the state machine to determine if an active task is free or still used by another
* member, and if there is still a task on a specific process that is not yet revoked.
*
* @param currentActiveTaskProcessId A BiFunction which gets the process ID of a subtopology ID /
* partition ID pair.
* @return This object.
*/
public CurrentAssignmentBuilder withCurrentActiveTaskProcessId(BiFunction<String, Integer, String> currentActiveTaskProcessId) {
this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId);
return this;
}
/**
* Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is
* used by the state machine to determine if there is still a task on a specific process that is
* not yet revoked.
*
* @param currentStandbyTaskProcessIds A BiFunction which gets the process IDs of a subtopology
* ID / partition ID pair.
* @return This object.
*/
public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds
) {
this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds);
return this;
}
/**
* Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is
* used by the state machine to determine if there is still a task on a specific process that is
* not yet revoked.
*
* @param currentWarmupTaskProcessIds A BiFunction which gets the process IDs of a subtopology ID
* / partition ID pair.
* @return This object.
*/
public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds) {
this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds);
return this;
}
/**
* Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to
* determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide
* its owned tasks in this heartbeat.
*
* @param ownedAssignment A collection of active, standby and warm-up tasks
* @return This object.
*/
protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
this.ownedTasks = Optional.ofNullable(ownedAssignment);
return this;
}
/**
* Builds the next state for the member or keep the current one if it is not possible to move forward with the current state.
*
* @return A new StreamsGroupMember or the current one.
*/
public StreamsGroupMember build() {
switch (member.state()) {
case STABLE:
// When the member is in the STABLE state, we verify if a newer
// epoch (or target assignment) is available. If it is, we can
// reconcile the member towards it. Otherwise, we return.
if (member.memberEpoch() != targetAssignmentEpoch) {
return computeNextAssignment(
member.memberEpoch(),
member.assignedTasks()
);
} else {
return member;
}
case UNREVOKED_TASKS:
// When the member is in the UNREVOKED_TASKS state, we wait
// until the member has revoked the necessary tasks. They are
// considered revoked when they are not anymore reported in the
// owned tasks set in the StreamsGroupHeartbeat API.
// If the member provides its owned tasks, we verify if it still
// owns any of the revoked tasks. If it did not provide it's
// owned tasks, or we still own some of the revoked tasks, we
// cannot progress.
if (
ownedTasks.isEmpty() || ownedTasks.get().containsAny(member.tasksPendingRevocation())
) {
return member;
}
// When the member has revoked all the pending tasks, it can
// transition to the next epoch (current + 1) and we can reconcile
// its state towards the latest target assignment.
return computeNextAssignment(
member.memberEpoch() + 1,
member.assignedTasks()
);
case UNRELEASED_TASKS:
// When the member is in the UNRELEASED_TASKS, we reconcile the
// member towards the latest target assignment. This will assign any
// of the unreleased tasks when they become available.
return computeNextAssignment(
member.memberEpoch(),
member.assignedTasks()
);
case UNKNOWN:
// We could only end up in this state if a new state is added in the
// future and the group coordinator is downgraded. In this case, the
// best option is to fence the member to force it to rejoin the group
// without any tasks and to reconcile it again from scratch.
if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) {
throw new FencedMemberEpochException(
"The streams group member is in a unknown state. "
+ "The member must abandon all its tasks and rejoin.");
}
return computeNextAssignment(
targetAssignmentEpoch,
member.assignedTasks()
);
}
return member;
}
/**
* Takes the current currentAssignment and the targetAssignment, and generates three
* collections:
*
* - the resultAssignedTasks: the tasks that are assigned in both the current and target
* assignments.
* - the resultTasksPendingRevocation: the tasks that are assigned in the current
* assignment but not in the target assignment.
* - the resultTasksPendingAssignment: the tasks that are assigned in the target assignment but
* not in the current assignment, and can be assigned currently (i.e., they are not owned by
* another member, as defined by the `isUnreleasedTask` predicate).
*/
private boolean computeAssignmentDifference(Map<String, Set<Integer>> currentAssignment,
Map<String, Set<Integer>> targetAssignment,
Map<String, Set<Integer>> resultAssignedTasks,
Map<String, Set<Integer>> resultTasksPendingRevocation,
Map<String, Set<Integer>> resultTasksPendingAssignment,
BiPredicate<String, Integer> isUnreleasedTask) {
boolean hasUnreleasedTasks = false;
Set<String> allSubtopologyIds = new HashSet<>(targetAssignment.keySet());
allSubtopologyIds.addAll(currentAssignment.keySet());
for (String subtopologyId : allSubtopologyIds) {
hasUnreleasedTasks |= computeAssignmentDifferenceForOneSubtopology(
subtopologyId,
currentAssignment.getOrDefault(subtopologyId, Collections.emptySet()),
targetAssignment.getOrDefault(subtopologyId, Collections.emptySet()),
resultAssignedTasks,
resultTasksPendingRevocation,
resultTasksPendingAssignment,
isUnreleasedTask
);
}
return hasUnreleasedTasks;
}
private static boolean computeAssignmentDifferenceForOneSubtopology(final String subtopologyId,
final Set<Integer> currentTasksForThisSubtopology,
final Set<Integer> targetTasksForThisSubtopology,
final Map<String, Set<Integer>> resultAssignedTasks,
final Map<String, Set<Integer>> resultTasksPendingRevocation,
final Map<String, Set<Integer>> resultTasksPendingAssignment,
final BiPredicate<String, Integer> isUnreleasedTask) {
// Result Assigned Tasks = Current Tasks Target Tasks
// i.e. we remove all tasks from the current assignment that are not in the target
// assignment
Set<Integer> resultAssignedTasksForThisSubtopology = new HashSet<>(currentTasksForThisSubtopology);
resultAssignedTasksForThisSubtopology.retainAll(targetTasksForThisSubtopology);
// Result Tasks Pending Revocation = Current Tasks - Result Assigned Tasks
// i.e. we will ask the member to revoke all tasks in its current assignment that
// are not in the target assignment
Set<Integer> resultTasksPendingRevocationForThisSubtopology = new HashSet<>(currentTasksForThisSubtopology);
resultTasksPendingRevocationForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
// Result Tasks Pending Assignment = Target Tasks - Result Assigned Tasks - Unreleased Tasks
// i.e. we will ask the member to assign all tasks in its target assignment,
// except those that are already assigned, and those that are unreleased
Set<Integer> resultTasksPendingAssignmentForThisSubtopology = new HashSet<>(targetTasksForThisSubtopology);
resultTasksPendingAssignmentForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
boolean hasUnreleasedTasks = resultTasksPendingAssignmentForThisSubtopology.removeIf(taskId ->
isUnreleasedTask.test(subtopologyId, taskId)
);
if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
resultAssignedTasks.put(subtopologyId, resultAssignedTasksForThisSubtopology);
}
if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
resultTasksPendingRevocation.put(subtopologyId, resultTasksPendingRevocationForThisSubtopology);
}
if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
resultTasksPendingAssignment.put(subtopologyId, resultTasksPendingAssignmentForThisSubtopology);
}
return hasUnreleasedTasks;
}
/**
* Computes the next assignment.
*
* @param memberEpoch The epoch of the member to use. This may be different from
* the epoch in {@link CurrentAssignmentBuilder#member}.
* @param memberAssignedTasks The assigned tasks of the member to use.
* @return A new StreamsGroupMember.
*/
private StreamsGroupMember computeNextAssignment(int memberEpoch,
TasksTuple memberAssignedTasks) {
Map<String, Set<Integer>> newActiveAssignedTasks = new HashMap<>();
Map<String, Set<Integer>> newActiveTasksPendingRevocation = new HashMap<>();
Map<String, Set<Integer>> newActiveTasksPendingAssignment = new HashMap<>();
Map<String, Set<Integer>> newStandbyAssignedTasks = new HashMap<>();
Map<String, Set<Integer>> newStandbyTasksPendingRevocation = new HashMap<>();
Map<String, Set<Integer>> newStandbyTasksPendingAssignment = new HashMap<>();
Map<String, Set<Integer>> newWarmupAssignedTasks = new HashMap<>();
Map<String, Set<Integer>> newWarmupTasksPendingRevocation = new HashMap<>();
Map<String, Set<Integer>> newWarmupTasksPendingAssignment = new HashMap<>();
boolean hasUnreleasedActiveTasks = computeAssignmentDifference(
memberAssignedTasks.activeTasks(),
targetAssignment.activeTasks(),
newActiveAssignedTasks,
newActiveTasksPendingRevocation,
newActiveTasksPendingAssignment,
(subtopologyId, partitionId) ->
currentActiveTaskProcessId.apply(subtopologyId, partitionId) != null ||
currentStandbyTaskProcessIds.apply(subtopologyId, partitionId)
.contains(member.processId()) ||
currentWarmupTaskProcessIds.apply(subtopologyId, partitionId)
.contains(member.processId())
);
boolean hasUnreleasedStandbyTasks = computeAssignmentDifference(
memberAssignedTasks.standbyTasks(),
targetAssignment.standbyTasks(),
newStandbyAssignedTasks,
newStandbyTasksPendingRevocation,
newStandbyTasksPendingAssignment,
(subtopologyId, partitionId) ->
Objects.equals(currentActiveTaskProcessId.apply(subtopologyId, partitionId),
member.processId()) ||
currentStandbyTaskProcessIds.apply(subtopologyId, partitionId)
.contains(member.processId()) ||
currentWarmupTaskProcessIds.apply(subtopologyId, partitionId)
.contains(member.processId())
);
boolean hasUnreleasedWarmupTasks = computeAssignmentDifference(
memberAssignedTasks.warmupTasks(),
targetAssignment.warmupTasks(),
newWarmupAssignedTasks,
newWarmupTasksPendingRevocation,
newWarmupTasksPendingAssignment,
(subtopologyId, partitionId) ->
Objects.equals(currentActiveTaskProcessId.apply(subtopologyId, partitionId),
member.processId()) ||
currentStandbyTaskProcessIds.apply(subtopologyId, partitionId)
.contains(member.processId()) ||
currentWarmupTaskProcessIds.apply(subtopologyId, partitionId)
.contains(member.processId())
);
return buildNewMember(
memberEpoch,
new TasksTuple(
newActiveTasksPendingRevocation,
newStandbyTasksPendingRevocation,
newWarmupTasksPendingRevocation
),
new TasksTuple(
newActiveAssignedTasks,
newStandbyAssignedTasks,
newWarmupAssignedTasks
),
new TasksTuple(
newActiveTasksPendingAssignment,
newStandbyTasksPendingAssignment,
newWarmupTasksPendingAssignment
),
hasUnreleasedActiveTasks || hasUnreleasedStandbyTasks || hasUnreleasedWarmupTasks
);
}
private StreamsGroupMember buildNewMember(final int memberEpoch,
final TasksTuple newTasksPendingRevocation,
final TasksTuple newAssignedTasks,
final TasksTuple newTasksPendingAssignment,
final boolean hasUnreleasedTasks) {
final boolean hasTasksToBeRevoked =
(!newTasksPendingRevocation.isEmpty())
&& (ownedTasks.isEmpty() || ownedTasks.get().containsAny(newTasksPendingRevocation));
if (hasTasksToBeRevoked) {
// If there are tasks to be revoked, the member remains in its current
// epoch and requests the revocation of those tasks. It transitions to
// the UNREVOKED_TASKS state to wait until the client acknowledges the
// revocation of the tasks.
return new StreamsGroupMember.Builder(member)
.setState(MemberState.UNREVOKED_TASKS)
.updateMemberEpoch(memberEpoch)
.setAssignedTasks(newAssignedTasks)
.setTasksPendingRevocation(newTasksPendingRevocation)
.build();
} else if (!newTasksPendingAssignment.isEmpty()) {
// If there are tasks to be assigned, the member transitions to the
// target epoch and requests the assignment of those tasks. Note that
// the tasks are directly added to the assigned tasks set. The
// member transitions to the STABLE state or to the UNRELEASED_TASKS
// state depending on whether there are unreleased tasks or not.
MemberState newState =
hasUnreleasedTasks
? MemberState.UNRELEASED_TASKS
: MemberState.STABLE;
return new StreamsGroupMember.Builder(member)
.setState(newState)
.updateMemberEpoch(targetAssignmentEpoch)
.setAssignedTasks(newAssignedTasks.merge(newTasksPendingAssignment))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
} else if (hasUnreleasedTasks) {
// If there are no tasks to be revoked nor to be assigned but some
// tasks are not available yet, the member transitions to the target
// epoch, to the UNRELEASED_TASKS state and waits.
return new StreamsGroupMember.Builder(member)
.setState(MemberState.UNRELEASED_TASKS)
.updateMemberEpoch(targetAssignmentEpoch)
.setAssignedTasks(newAssignedTasks)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
} else {
// Otherwise, the member transitions to the target epoch and to the
// STABLE state.
return new StreamsGroupMember.Builder(member)
.setState(MemberState.STABLE)
.updateMemberEpoch(targetAssignmentEpoch)
.setAssignedTasks(newAssignedTasks)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
}
}
}

View File

@ -49,18 +49,8 @@ import java.util.stream.Collectors;
* @param userEndpoint The user endpoint exposed for Interactive Queries by the Streams client that * @param userEndpoint The user endpoint exposed for Interactive Queries by the Streams client that
* contains the member. * contains the member.
* @param clientTags Tags of the client of the member used for rack-aware assignment. * @param clientTags Tags of the client of the member used for rack-aware assignment.
* @param assignedActiveTasks Active tasks assigned to the member. * @param assignedTasks Tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs. * @param tasksPendingRevocation Tasks owned by the member pending revocation.
* @param assignedStandbyTasks Standby tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param assignedWarmupTasks Warm-up tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param activeTasksPendingRevocation Active tasks assigned to the member pending revocation.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param standbyTasksPendingRevocation Standby tasks assigned to the member pending revocation.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param warmupTasksPendingRevocation Warm-up tasks assigned to the member pending revocation.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
*/ */
@SuppressWarnings("checkstyle:JavaNCSS") @SuppressWarnings("checkstyle:JavaNCSS")
public record StreamsGroupMember(String memberId, public record StreamsGroupMember(String memberId,
@ -76,22 +66,12 @@ public record StreamsGroupMember(String memberId,
String processId, String processId,
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint, Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
Map<String, String> clientTags, Map<String, String> clientTags,
Map<String, Set<Integer>> assignedActiveTasks, TasksTuple assignedTasks,
Map<String, Set<Integer>> assignedStandbyTasks, TasksTuple tasksPendingRevocation) {
Map<String, Set<Integer>> assignedWarmupTasks,
Map<String, Set<Integer>> activeTasksPendingRevocation,
Map<String, Set<Integer>> standbyTasksPendingRevocation,
Map<String, Set<Integer>> warmupTasksPendingRevocation) {
public StreamsGroupMember { public StreamsGroupMember {
Objects.requireNonNull(memberId, "memberId cannot be null"); Objects.requireNonNull(memberId, "memberId cannot be null");
clientTags = clientTags != null ? Collections.unmodifiableMap(clientTags) : null; clientTags = clientTags != null ? Collections.unmodifiableMap(clientTags) : null;
assignedActiveTasks = assignedActiveTasks != null ? Collections.unmodifiableMap(assignedActiveTasks) : null;
assignedStandbyTasks = assignedStandbyTasks != null ? Collections.unmodifiableMap(assignedStandbyTasks) : null;
assignedWarmupTasks = assignedWarmupTasks != null ? Collections.unmodifiableMap(assignedWarmupTasks) : null;
activeTasksPendingRevocation = activeTasksPendingRevocation != null ? Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
standbyTasksPendingRevocation = standbyTasksPendingRevocation != null ? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ? Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
} }
/** /**
@ -114,12 +94,8 @@ public record StreamsGroupMember(String memberId,
private String processId = null; private String processId = null;
private Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint = null; private Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint = null;
private Map<String, String> clientTags = null; private Map<String, String> clientTags = null;
private Map<String, Set<Integer>> assignedActiveTasks = null; private TasksTuple assignedTasks = null;
private Map<String, Set<Integer>> assignedStandbyTasks = null; private TasksTuple tasksPendingRevocation = null;
private Map<String, Set<Integer>> assignedWarmupTasks = null;
private Map<String, Set<Integer>> activeTasksPendingRevocation = null;
private Map<String, Set<Integer>> standbyTasksPendingRevocation = null;
private Map<String, Set<Integer>> warmupTasksPendingRevocation = null;
public Builder(String memberId) { public Builder(String memberId) {
this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null"); this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null");
@ -141,12 +117,8 @@ public record StreamsGroupMember(String memberId,
this.userEndpoint = member.userEndpoint; this.userEndpoint = member.userEndpoint;
this.clientTags = member.clientTags; this.clientTags = member.clientTags;
this.state = member.state; this.state = member.state;
this.assignedActiveTasks = member.assignedActiveTasks; this.assignedTasks = member.assignedTasks;
this.assignedStandbyTasks = member.assignedStandbyTasks; this.tasksPendingRevocation = member.tasksPendingRevocation;
this.assignedWarmupTasks = member.assignedWarmupTasks;
this.activeTasksPendingRevocation = member.activeTasksPendingRevocation;
this.standbyTasksPendingRevocation = member.standbyTasksPendingRevocation;
this.warmupTasksPendingRevocation = member.warmupTasksPendingRevocation;
} }
public Builder updateMemberEpoch(int memberEpoch) { public Builder updateMemberEpoch(int memberEpoch) {
@ -251,50 +223,13 @@ public record StreamsGroupMember(String memberId,
return this; return this;
} }
public Builder setAssignment(Assignment assignment) { public Builder setAssignedTasks(TasksTuple assignedTasks) {
this.assignedActiveTasks = assignment.activeTasks(); this.assignedTasks = assignedTasks;
this.assignedStandbyTasks = assignment.standbyTasks();
this.assignedWarmupTasks = assignment.warmupTasks();
return this; return this;
} }
public Builder setAssignedActiveTasks(Map<String, Set<Integer>> assignedActiveTasks) { public Builder setTasksPendingRevocation(TasksTuple tasksPendingRevocation) {
this.assignedActiveTasks = assignedActiveTasks; this.tasksPendingRevocation = tasksPendingRevocation;
return this;
}
public Builder setAssignedStandbyTasks(Map<String, Set<Integer>> assignedStandbyTasks) {
this.assignedStandbyTasks = assignedStandbyTasks;
return this;
}
public Builder setAssignedWarmupTasks(Map<String, Set<Integer>> assignedWarmupTasks) {
this.assignedWarmupTasks = assignedWarmupTasks;
return this;
}
public Builder setAssignmentPendingRevocation(Assignment assignment) {
this.activeTasksPendingRevocation = assignment.activeTasks();
this.standbyTasksPendingRevocation = assignment.standbyTasks();
this.warmupTasksPendingRevocation = assignment.warmupTasks();
return this;
}
public Builder setActiveTasksPendingRevocation(
Map<String, Set<Integer>> activeTasksPendingRevocation) {
this.activeTasksPendingRevocation = activeTasksPendingRevocation;
return this;
}
public Builder setStandbyTasksPendingRevocation(
Map<String, Set<Integer>> standbyTasksPendingRevocation) {
this.standbyTasksPendingRevocation = standbyTasksPendingRevocation;
return this;
}
public Builder setWarmupTasksPendingRevocation(
Map<String, Set<Integer>> warmupTasksPendingRevocation) {
this.warmupTasksPendingRevocation = warmupTasksPendingRevocation;
return this; return this;
} }
@ -318,15 +253,20 @@ public record StreamsGroupMember(String memberId,
setMemberEpoch(record.memberEpoch()); setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch()); setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state())); setState(MemberState.fromValue(record.state()));
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks())); setAssignedTasks(
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks())); new TasksTuple(
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks())); assignmentFromTaskIds(record.activeTasks()),
setActiveTasksPendingRevocation( assignmentFromTaskIds(record.standbyTasks()),
assignmentFromTaskIds(record.activeTasksPendingRevocation())); assignmentFromTaskIds(record.warmupTasks())
setStandbyTasksPendingRevocation( )
assignmentFromTaskIds(record.standbyTasksPendingRevocation())); );
setWarmupTasksPendingRevocation( setTasksPendingRevocation(
assignmentFromTaskIds(record.warmupTasksPendingRevocation())); new TasksTuple(
assignmentFromTaskIds(record.activeTasksPendingRevocation()),
assignmentFromTaskIds(record.standbyTasksPendingRevocation()),
assignmentFromTaskIds(record.warmupTasksPendingRevocation())
)
);
return this; return this;
} }
@ -353,12 +293,8 @@ public record StreamsGroupMember(String memberId,
processId, processId,
userEndpoint, userEndpoint,
clientTags, clientTags,
assignedActiveTasks, assignedTasks,
assignedStandbyTasks, tasksPendingRevocation
assignedWarmupTasks,
activeTasksPendingRevocation,
standbyTasksPendingRevocation,
warmupTasksPendingRevocation
); );
} }
} }
@ -377,9 +313,7 @@ public record StreamsGroupMember(String memberId,
* *
* @return The StreamsGroupMember mapped as StreamsGroupDescribeResponseData.Member. * @return The StreamsGroupMember mapped as StreamsGroupDescribeResponseData.Member.
*/ */
public StreamsGroupDescribeResponseData.Member asStreamsGroupDescribeMember( public StreamsGroupDescribeResponseData.Member asStreamsGroupDescribeMember(TasksTuple targetAssignment) {
Assignment targetAssignment
) {
final StreamsGroupDescribeResponseData.Assignment describedTargetAssignment = final StreamsGroupDescribeResponseData.Assignment describedTargetAssignment =
new StreamsGroupDescribeResponseData.Assignment(); new StreamsGroupDescribeResponseData.Assignment();
@ -395,9 +329,9 @@ public record StreamsGroupMember(String memberId,
.setMemberId(memberId) .setMemberId(memberId)
.setAssignment( .setAssignment(
new StreamsGroupDescribeResponseData.Assignment() new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(taskIdsFromMap(assignedActiveTasks)) .setActiveTasks(taskIdsFromMap(assignedTasks.activeTasks()))
.setStandbyTasks(taskIdsFromMap(assignedStandbyTasks)) .setStandbyTasks(taskIdsFromMap(assignedTasks.standbyTasks()))
.setWarmupTasks(taskIdsFromMap(assignedWarmupTasks))) .setWarmupTasks(taskIdsFromMap(assignedTasks.warmupTasks())))
.setTargetAssignment(describedTargetAssignment) .setTargetAssignment(describedTargetAssignment)
.setClientHost(clientHost) .setClientHost(clientHost)
.setClientId(clientId) .setClientId(clientId)
@ -419,9 +353,7 @@ public record StreamsGroupMember(String memberId,
); );
} }
private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap( private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap(Map<String, Set<Integer>> tasks) {
Map<String, Set<Integer>> tasks
) {
List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>(); List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>();
tasks.forEach((subtopologyId, partitionSet) -> { tasks.forEach((subtopologyId, partitionSet) -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds() taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
@ -432,32 +364,9 @@ public record StreamsGroupMember(String memberId,
} }
/** /**
* @return True if the two provided members have different assigned active tasks. * @return True if the two provided members have different assigned tasks.
*/ */
public static boolean hasAssignedActiveTasksChanged( public static boolean hasAssignedTasksChanged(StreamsGroupMember member1, StreamsGroupMember member2) {
StreamsGroupMember member1, return !member1.assignedTasks().equals(member2.assignedTasks());
StreamsGroupMember member2
) {
return !member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
}
/**
* @return True if the two provided members have different assigned active tasks.
*/
public static boolean hasAssignedStandbyTasksChanged(
StreamsGroupMember member1,
StreamsGroupMember member2
) {
return !member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
}
/**
* @return True if the two provided members have different assigned active tasks.
*/
public static boolean hasAssignedWarmupTasksChanged(
StreamsGroupMember member1,
StreamsGroupMember member2
) {
return !member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -26,45 +27,89 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* An immutable assignment for a member. * An immutable tuple containing active, standby and warm-up tasks.
* *
* @param activeTasks Active tasks assigned to the member. * @param activeTasks Active tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs. * The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param standbyTasks Standby tasks assigned to the member. * @param standbyTasks Standby tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs. * The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param warmupTasks Warm-up tasks assigned to the member. * @param warmupTasks Warm-up tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs. * The key of the map is the subtopology ID and the value is the set of partition IDs.
*/ */
public record Assignment(Map<String, Set<Integer>> activeTasks, public record TasksTuple(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks, Map<String, Set<Integer>> standbyTasks,
Map<String, Set<Integer>> warmupTasks) { Map<String, Set<Integer>> warmupTasks) {
public Assignment { public TasksTuple {
activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
} }
/** /**
* An empty assignment. * An empty task tuple.
*/ */
public static final Assignment EMPTY = new Assignment( public static final TasksTuple EMPTY = new TasksTuple(
Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap() Collections.emptyMap()
); );
/** /**
* Creates a {{@link org.apache.kafka.coordinator.group.streams.Assignment}} from a * @return true if all collections in the tuple are empty.
*/
public boolean isEmpty() {
return activeTasks.isEmpty() && standbyTasks.isEmpty() && warmupTasks.isEmpty();
}
/**
* Merges this task tuple with another task tuple.
*
* @param other The other task tuple.
* @return A new task tuple, containing all active tasks, standby tasks and warm-up tasks from both tuples.
*/
public TasksTuple merge(TasksTuple other) {
Map<String, Set<Integer>> mergedActiveTasks = merge(activeTasks, other.activeTasks);
Map<String, Set<Integer>> mergedStandbyTasks = merge(standbyTasks, other.standbyTasks);
Map<String, Set<Integer>> mergedWarmupTasks = merge(warmupTasks, other.warmupTasks);
return new TasksTuple(mergedActiveTasks, mergedStandbyTasks, mergedWarmupTasks);
}
private static Map<String, Set<Integer>> merge(final Map<String, Set<Integer>> tasks1, final Map<String, Set<Integer>> tasks2) {
HashMap<String, Set<Integer>> result = new HashMap<>();
tasks1.forEach((subtopologyId, tasks) ->
result.put(subtopologyId, new HashSet<>(tasks)));
tasks2.forEach((subtopologyId, tasks) -> result
.computeIfAbsent(subtopologyId, __ -> new HashSet<>())
.addAll(tasks));
return result;
}
/**
* Checks if this task tuple contains any of the tasks in another task tuple.
*
* @param other The other task tuple.
* @return true if there is at least one active, standby or warm-up task that is present in both tuples.
*/
public boolean containsAny(TasksTuple other) {
return activeTasks.entrySet().stream().anyMatch(
entry -> other.activeTasks.containsKey(entry.getKey()) && !Collections.disjoint(entry.getValue(), other.activeTasks.get(entry.getKey()))
) || standbyTasks.entrySet().stream().anyMatch(
entry -> other.standbyTasks.containsKey(entry.getKey()) && !Collections.disjoint(entry.getValue(), other.standbyTasks.get(entry.getKey()))
) || warmupTasks.entrySet().stream().anyMatch(
entry -> other.warmupTasks.containsKey(entry.getKey()) && !Collections.disjoint(entry.getValue(), other.warmupTasks.get(entry.getKey()))
);
}
/**
* Creates a {{@link TasksTuple}} from a
* {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}. * {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
* *
* @param record The record. * @param record The record.
* @return A {{@link org.apache.kafka.coordinator.group.streams.Assignment}}. * @return A {{@link TasksTuple}}.
*/ */
public static Assignment fromRecord( public static TasksTuple fromTargetAssignmentRecord(StreamsGroupTargetAssignmentMemberValue record) {
StreamsGroupTargetAssignmentMemberValue record return new TasksTuple(
) {
return new Assignment(
record.activeTasks().stream() record.activeTasks().stream()
.collect(Collectors.toMap( .collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,

View File

@ -22,7 +22,7 @@ import java.util.Objects;
/** /**
* The task assignment for a streams group. * The task assignment for a streams group.
* *
* @param members The member assignments keyed by member id. * @param members The member assignments keyed by member ID.
*/ */
public record GroupAssignment(Map<String, MemberAssignment> members) { public record GroupAssignment(Map<String, MemberAssignment> members) {

View File

@ -0,0 +1,825 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class CurrentAssignmentBuilderTest {
private static final String SUBTOPOLOGY_ID1 = Uuid.randomUuid().toString();
private static final String SUBTOPOLOGY_ID2 = Uuid.randomUuid().toString();
private static final String PROCESS_ID = "process_id";
private static final String MEMBER_NAME = "member";
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToStable(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member =
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(
mkTasksTuple(
taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(
taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToStableAtTargetEpoch(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member =
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(
mkTasksTuple(
taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(
taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToStableWithNewTasks(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToUnrevokedTasks(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 4, 5)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2),
mkTasks(SUBTOPOLOGY_ID2, 4)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1),
mkTasks(SUBTOPOLOGY_ID2, 3)))
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToUnrevokedWithEmptyAssignment(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member =
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(
mkTasksTuple(
taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, TasksTuple.EMPTY)
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(
mkTasksTuple(
taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToUnreleasedTasks(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testStableToUnreleasedTasksWithOwnedTasksNotHavingRevokedTasks(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3, 5)))
.withCurrentActiveTaskProcessId((subtopologyId, __) ->
SUBTOPOLOGY_ID2.equals(subtopologyId) ? PROCESS_ID : null
)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.withOwnedAssignment(mkTasksTuple(taskRole))
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
mkTasks(SUBTOPOLOGY_ID2, 3)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnrevokedTasksToStable(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1),
mkTasks(SUBTOPOLOGY_ID2, 4)))
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.withOwnedAssignment(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testRemainsInUnrevokedTasks(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1),
mkTasks(SUBTOPOLOGY_ID2, 4)))
.build();
CurrentAssignmentBuilder currentAssignmentBuilder = new CurrentAssignmentBuilder(
member)
.withTargetAssignment(memberEpoch + 2, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet());
assertEquals(
member,
currentAssignmentBuilder
.withOwnedAssignment(null)
.build()
);
assertEquals(
member,
currentAssignmentBuilder
.withOwnedAssignment(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.build()
);
assertEquals(
member,
currentAssignmentBuilder
.withOwnedAssignment(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 4, 5, 6)))
.build()
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnrevokedTasksToUnrevokedTasks(TaskRole taskRole) {
final int memberEpoch = 10;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1),
mkTasks(SUBTOPOLOGY_ID2, 4)))
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 2, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withOwnedAssignment(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2),
mkTasks(SUBTOPOLOGY_ID2, 5)))
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnrevokedTasksToUnreleasedTasks(TaskRole taskRole) {
final int memberEpoch = 11;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch - 1)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 1),
mkTasks(SUBTOPOLOGY_ID2, 4)))
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.withOwnedAssignment(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6))
)
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnreleasedTasksToStable(TaskRole taskRole) {
final int memberEpoch = 11;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId("process1")
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.singleton(PROCESS_ID))
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.singleton(PROCESS_ID))
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId("process1")
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnreleasedTasksToStableWithNewTasks(TaskRole taskRole) {
int memberEpoch = 11;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId("process1")
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId("process1")
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnreleasedTasksToUnreleasedTasks(TaskRole taskRole) {
int memberEpoch = 11;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.singleton(PROCESS_ID))
.withCurrentWarmupTaskProcessIds(
(subtopologyId, partitionId) -> Collections.singleton(PROCESS_ID))
.build();
assertEquals(member, updatedMember);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnreleasedTasksToUnreleasedTasksOtherUnreleasedTaskRole(TaskRole taskRole) {
int memberEpoch = 11;
// The unreleased task is owned by a task of a different role on the same process.
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> (taskRole == TaskRole.STANDBY)
? Collections.emptySet() : Collections.singleton(PROCESS_ID))
.withCurrentWarmupTaskProcessIds(
(subtopologyId, partitionId) -> (taskRole == TaskRole.STANDBY)
? Collections.singleton(PROCESS_ID) : Collections.emptySet())
.build();
assertEquals(member, updatedMember);
}
@Test
public void testUnreleasedTasksToUnreleasedTasksAnyActiveOwner() {
int memberEpoch = 11;
// The unreleased task remains unreleased, because it is owned by any other instance in
// an active role, no matter the process.
// The task that is not unreleased can be assigned.
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(TaskRole.ACTIVE,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.build();
StreamsGroupMember expectedMember = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(TaskRole.ACTIVE,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch, mkTasksTuple(TaskRole.ACTIVE,
mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
(subtopologyId.equals(SUBTOPOLOGY_ID1) && partitionId == 4) ? "anyOtherProcess"
: null)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(expectedMember, updatedMember);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnreleasedTasksToUnrevokedTasks(TaskRole taskRole) {
int memberEpoch = 11;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNRELEASED_TASKS)
.setProcessId("process1")
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2, 3),
mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
.setTasksPendingRevocation(mkTasksTuple(TaskRole.ACTIVE,
mkTasks(SUBTOPOLOGY_ID1, 4),
mkTasks(SUBTOPOLOGY_ID2, 7)))
.build();
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNREVOKED_TASKS)
.setProcessId("process1")
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2),
mkTasks(SUBTOPOLOGY_ID2, 5)))
.build(),
updatedMember
);
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUnknownState(TaskRole taskRole) {
int memberEpoch = 11;
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.UNKNOWN)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setProcessId(PROCESS_ID)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.setTasksPendingRevocation(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 2),
mkTasks(SUBTOPOLOGY_ID2, 5)))
.build();
// When the member is in an unknown state, the member is first to force
// a reset of the client side member state.
assertThrows(FencedMemberEpochException.class, () -> new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.build());
// Then the member rejoins with no owned tasks.
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
.withCurrentStandbyTaskProcessIds(
(subtopologyId, partitionId) -> Collections.emptySet())
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet())
.withOwnedAssignment(mkTasksTuple(taskRole))
.build();
assertEquals(
new StreamsGroupMember.Builder(MEMBER_NAME)
.setState(MemberState.STABLE)
.setProcessId(PROCESS_ID)
.setMemberEpoch(memberEpoch + 1)
.setPreviousMemberEpoch(memberEpoch)
.setAssignedTasks(mkTasksTuple(taskRole,
mkTasks(SUBTOPOLOGY_ID1, 3),
mkTasks(SUBTOPOLOGY_ID2, 6)))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.build(),
updatedMember
);
}
}

View File

@ -25,14 +25,12 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -40,8 +38,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology; import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class StreamsGroupMemberTest { public class StreamsGroupMemberTest {
@ -70,12 +70,18 @@ public class StreamsGroupMemberTest {
private static final List<Integer> TASKS4 = List.of(3, 2, 1); private static final List<Integer> TASKS4 = List.of(3, 2, 1);
private static final List<Integer> TASKS5 = List.of(6, 5, 4); private static final List<Integer> TASKS5 = List.of(6, 5, 4);
private static final List<Integer> TASKS6 = List.of(9, 7); private static final List<Integer> TASKS6 = List.of(9, 7);
private static final Map<String, Set<Integer>> ASSIGNED_ACTIVE_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new))); private static final TasksTuple ASSIGNED_TASKS =
private static final Map<String, Set<Integer>> ASSIGNED_STANDBY_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new))); new TasksTuple(
private static final Map<String, Set<Integer>> ASSIGNED_WARMUP_TASKS = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new))); mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new))),
private static final Map<String, Set<Integer>> ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS4.toArray(Integer[]::new))); mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new))),
private static final Map<String, Set<Integer>> STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS5.toArray(Integer[]::new))); mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new)))
private static final Map<String, Set<Integer>> WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS6.toArray(Integer[]::new))); );
private static final TasksTuple TASKS_PENDING_REVOCATION =
new TasksTuple(
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS4.toArray(Integer[]::new))),
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS5.toArray(Integer[]::new))),
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS6.toArray(Integer[]::new)))
);
@Test @Test
public void testBuilderWithMemberIdIsNull() { public void testBuilderWithMemberIdIsNull() {
@ -112,12 +118,8 @@ public class StreamsGroupMemberTest {
assertNull(member.processId()); assertNull(member.processId());
assertNull(member.userEndpoint()); assertNull(member.userEndpoint());
assertNull(member.clientTags()); assertNull(member.clientTags());
assertNull(member.assignedActiveTasks()); assertNull(member.assignedTasks());
assertNull(member.assignedStandbyTasks()); assertNull(member.tasksPendingRevocation());
assertNull(member.assignedWarmupTasks());
assertNull(member.activeTasksPendingRevocation());
assertNull(member.standbyTasksPendingRevocation());
assertNull(member.warmupTasksPendingRevocation());
} }
@Test @Test
@ -136,12 +138,8 @@ public class StreamsGroupMemberTest {
assertEquals(PROCESS_ID, member.processId()); assertEquals(PROCESS_ID, member.processId());
assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint()); assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint());
assertEquals(CLIENT_TAGS, member.clientTags()); assertEquals(CLIENT_TAGS, member.clientTags());
assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks()); assertEquals(ASSIGNED_TASKS, member.assignedTasks());
assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks()); assertEquals(TASKS_PENDING_REVOCATION, member.tasksPendingRevocation());
assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, member.activeTasksPendingRevocation());
assertEquals(STANDBY_TASKS_PENDING_REVOCATION, member.standbyTasksPendingRevocation());
assertEquals(WARMUP_TASKS_PENDING_REVOCATION, member.warmupTasksPendingRevocation());
} }
@Test @Test
@ -179,12 +177,8 @@ public class StreamsGroupMemberTest {
assertNull(member.memberEpoch()); assertNull(member.memberEpoch());
assertNull(member.previousMemberEpoch()); assertNull(member.previousMemberEpoch());
assertNull(member.state()); assertNull(member.state());
assertNull(member.assignedActiveTasks()); assertNull(member.assignedTasks());
assertNull(member.assignedStandbyTasks()); assertNull(member.tasksPendingRevocation());
assertNull(member.assignedWarmupTasks());
assertNull(member.activeTasksPendingRevocation());
assertNull(member.standbyTasksPendingRevocation());
assertNull(member.warmupTasksPendingRevocation());
} }
@Test @Test
@ -208,12 +202,8 @@ public class StreamsGroupMemberTest {
assertEquals(record.memberEpoch(), member.memberEpoch()); assertEquals(record.memberEpoch(), member.memberEpoch());
assertEquals(record.previousMemberEpoch(), member.previousMemberEpoch()); assertEquals(record.previousMemberEpoch(), member.previousMemberEpoch());
assertEquals(MemberState.fromValue(record.state()), member.state()); assertEquals(MemberState.fromValue(record.state()), member.state());
assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks()); assertEquals(ASSIGNED_TASKS, member.assignedTasks());
assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks()); assertEquals(TASKS_PENDING_REVOCATION, member.tasksPendingRevocation());
assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, member.activeTasksPendingRevocation());
assertEquals(STANDBY_TASKS_PENDING_REVOCATION, member.standbyTasksPendingRevocation());
assertEquals(WARMUP_TASKS_PENDING_REVOCATION, member.warmupTasksPendingRevocation());
assertNull(member.instanceId()); assertNull(member.instanceId());
assertNull(member.rackId()); assertNull(member.rackId());
assertNull(member.rebalanceTimeoutMs()); assertNull(member.rebalanceTimeoutMs());
@ -275,12 +265,8 @@ public class StreamsGroupMemberTest {
assertEquals(member.state(), updatedMember.state()); assertEquals(member.state(), updatedMember.state());
assertEquals(member.clientId(), updatedMember.clientId()); assertEquals(member.clientId(), updatedMember.clientId());
assertEquals(member.clientHost(), updatedMember.clientHost()); assertEquals(member.clientHost(), updatedMember.clientHost());
assertEquals(member.assignedActiveTasks(), updatedMember.assignedActiveTasks()); assertEquals(member.assignedTasks(), updatedMember.assignedTasks());
assertEquals(member.assignedStandbyTasks(), updatedMember.assignedStandbyTasks()); assertEquals(member.tasksPendingRevocation(), updatedMember.tasksPendingRevocation());
assertEquals(member.assignedWarmupTasks(), updatedMember.assignedWarmupTasks());
assertEquals(member.activeTasksPendingRevocation(), updatedMember.activeTasksPendingRevocation());
assertEquals(member.standbyTasksPendingRevocation(), updatedMember.standbyTasksPendingRevocation());
assertEquals(member.warmupTasksPendingRevocation(), updatedMember.warmupTasksPendingRevocation());
} }
@Test @Test
@ -306,25 +292,8 @@ public class StreamsGroupMemberTest {
assertEquals(member.processId(), updatedMember.processId()); assertEquals(member.processId(), updatedMember.processId());
assertEquals(member.userEndpoint(), updatedMember.userEndpoint()); assertEquals(member.userEndpoint(), updatedMember.userEndpoint());
assertEquals(member.clientTags(), updatedMember.clientTags()); assertEquals(member.clientTags(), updatedMember.clientTags());
assertEquals(member.assignedActiveTasks(), updatedMember.assignedActiveTasks()); assertEquals(member.assignedTasks(), updatedMember.assignedTasks());
assertEquals(member.assignedStandbyTasks(), updatedMember.assignedStandbyTasks()); assertEquals(member.tasksPendingRevocation(), updatedMember.tasksPendingRevocation());
assertEquals(member.assignedWarmupTasks(), updatedMember.assignedWarmupTasks());
assertEquals(member.activeTasksPendingRevocation(), updatedMember.activeTasksPendingRevocation());
assertEquals(member.standbyTasksPendingRevocation(), updatedMember.standbyTasksPendingRevocation());
assertEquals(member.warmupTasksPendingRevocation(), updatedMember.warmupTasksPendingRevocation());
}
@Test
public void testReturnUnmodifiableFields() {
final StreamsGroupMember member = createStreamsGroupMember();
assertThrows(UnsupportedOperationException.class, () -> member.clientTags().put("not allowed", ""));
assertThrows(UnsupportedOperationException.class, () -> member.assignedActiveTasks().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.assignedStandbyTasks().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.assignedWarmupTasks().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.activeTasksPendingRevocation().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.standbyTasksPendingRevocation().put("not allowed", Collections.emptySet()));
assertThrows(UnsupportedOperationException.class, () -> member.warmupTasksPendingRevocation().put("not allowed", Collections.emptySet()));
} }
@Test @Test
@ -333,7 +302,7 @@ public class StreamsGroupMemberTest {
List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12); List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12);
List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15); List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15);
List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18); List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18);
Assignment targetAssignment = new Assignment( TasksTuple targetAssignment = new TasksTuple(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))), mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))), mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))),
mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1))) mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1)))
@ -404,6 +373,45 @@ public class StreamsGroupMemberTest {
assertEquals(new StreamsGroupDescribeResponseData.Assignment(), streamsGroupDescribeMember.targetAssignment()); assertEquals(new StreamsGroupDescribeResponseData.Assignment(), streamsGroupDescribeMember.targetAssignment());
} }
@Test
public void testHasAssignedTasksChanged() {
StreamsGroupMember member1 = new StreamsGroupMember.Builder(MEMBER_ID)
.setAssignedTasks(new TasksTuple(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
))
.build();
StreamsGroupMember member2 = new StreamsGroupMember.Builder(MEMBER_ID)
.setAssignedTasks(new TasksTuple(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS4))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS5))),
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS6)))
))
.build();
assertTrue(StreamsGroupMember.hasAssignedTasksChanged(member1, member2));
StreamsGroupMember member3 = new StreamsGroupMember.Builder(MEMBER_ID)
.setAssignedTasks(new TasksTuple(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
))
.build();
StreamsGroupMember member4 = new StreamsGroupMember.Builder(MEMBER_ID)
.setAssignedTasks(new TasksTuple(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
))
.build();
assertFalse(StreamsGroupMember.hasAssignedTasksChanged(member3, member4));
}
private StreamsGroupMember createStreamsGroupMember() { private StreamsGroupMember createStreamsGroupMember() {
return new StreamsGroupMember.Builder(MEMBER_ID) return new StreamsGroupMember.Builder(MEMBER_ID)
.setMemberEpoch(MEMBER_EPOCH) .setMemberEpoch(MEMBER_EPOCH)
@ -418,12 +426,8 @@ public class StreamsGroupMemberTest {
.setProcessId(PROCESS_ID) .setProcessId(PROCESS_ID)
.setUserEndpoint(USER_ENDPOINT) .setUserEndpoint(USER_ENDPOINT)
.setClientTags(CLIENT_TAGS) .setClientTags(CLIENT_TAGS)
.setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS) .setAssignedTasks(ASSIGNED_TASKS)
.setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS) .setTasksPendingRevocation(TASKS_PENDING_REVOCATION)
.setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS)
.setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION)
.setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION)
.setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION)
.build(); .build();
} }
} }

View File

@ -17,24 +17,27 @@
package org.apache.kafka.coordinator.group.streams; package org.apache.kafka.coordinator.group.streams;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
public class TaskAssignmentTestUtil { public class TaskAssignmentTestUtil {
public static Assignment mkAssignment(final Map<String, Set<Integer>> activeTasks, public enum TaskRole {
final Map<String, Set<Integer>> standbyTasks, ACTIVE,
final Map<String, Set<Integer>> warmupTasks) { STANDBY,
return new Assignment( WARMUP
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)), }
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)),
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)) @SafeVarargs
); public static TasksTuple mkTasksTuple(TaskRole taskRole, Map.Entry<String, Set<Integer>>... entries) {
return switch (taskRole) {
case ACTIVE -> new TasksTuple(mkTasksPerSubtopology(entries), new HashMap<>(), new HashMap<>());
case STANDBY -> new TasksTuple(new HashMap<>(), mkTasksPerSubtopology(entries), new HashMap<>());
case WARMUP -> new TasksTuple(new HashMap<>(), new HashMap<>(), mkTasksPerSubtopology(entries));
};
} }
public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId, public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId,
@ -46,8 +49,7 @@ public class TaskAssignmentTestUtil {
} }
@SafeVarargs @SafeVarargs
public static Map<String, Set<Integer>> mkTasksPerSubtopology(Map.Entry<String, public static Map<String, Set<Integer>> mkTasksPerSubtopology(Map.Entry<String, Set<Integer>>... entries) {
Set<Integer>>... entries) {
Map<String, Set<Integer>> assignment = new HashMap<>(); Map<String, Set<Integer>> assignment = new HashMap<>();
for (Map.Entry<String, Set<Integer>> entry : entries) { for (Map.Entry<String, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue()); assignment.put(entry.getKey(), entry.getValue());

View File

@ -30,19 +30,21 @@ import java.util.Set;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology; import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AssignmentTest { public class TasksTupleTest {
static final String SUBTOPOLOGY_1 = "subtopology1"; private static final String SUBTOPOLOGY_1 = "subtopology1";
static final String SUBTOPOLOGY_2 = "subtopology2"; private static final String SUBTOPOLOGY_2 = "subtopology2";
static final String SUBTOPOLOGY_3 = "subtopology3"; private static final String SUBTOPOLOGY_3 = "subtopology3";
@Test @Test
public void testTasksCannotBeNull() { public void testTasksCannotBeNull() {
assertThrows(NullPointerException.class, () -> new Assignment(null, Collections.emptyMap(), Collections.emptyMap())); assertThrows(NullPointerException.class, () -> new TasksTuple(null, Collections.emptyMap(), Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), null, Collections.emptyMap())); assertThrows(NullPointerException.class, () -> new TasksTuple(Collections.emptyMap(), null, Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), Collections.emptyMap(), null)); assertThrows(NullPointerException.class, () -> new TasksTuple(Collections.emptyMap(), Collections.emptyMap(), null));
} }
@Test @Test
@ -56,14 +58,14 @@ public class AssignmentTest {
Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology( Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_3, 4, 5, 6) mkTasks(SUBTOPOLOGY_3, 4, 5, 6)
); );
Assignment assignment = new Assignment(activeTasks, standbyTasks, warmupTasks); TasksTuple tuple = new TasksTuple(activeTasks, standbyTasks, warmupTasks);
assertEquals(activeTasks, assignment.activeTasks()); assertEquals(activeTasks, tuple.activeTasks());
assertThrows(UnsupportedOperationException.class, () -> assignment.activeTasks().put("not allowed", Collections.emptySet())); assertThrows(UnsupportedOperationException.class, () -> tuple.activeTasks().put("not allowed", Collections.emptySet()));
assertEquals(standbyTasks, assignment.standbyTasks()); assertEquals(standbyTasks, tuple.standbyTasks());
assertThrows(UnsupportedOperationException.class, () -> assignment.standbyTasks().put("not allowed", Collections.emptySet())); assertThrows(UnsupportedOperationException.class, () -> tuple.standbyTasks().put("not allowed", Collections.emptySet()));
assertEquals(warmupTasks, assignment.warmupTasks()); assertEquals(warmupTasks, tuple.warmupTasks());
assertThrows(UnsupportedOperationException.class, () -> assignment.warmupTasks().put("not allowed", Collections.emptySet())); assertThrows(UnsupportedOperationException.class, () -> tuple.warmupTasks().put("not allowed", Collections.emptySet()));
} }
@Test @Test
@ -95,28 +97,83 @@ public class AssignmentTest {
.setStandbyTasks(standbyTasks) .setStandbyTasks(standbyTasks)
.setWarmupTasks(warmupTasks); .setWarmupTasks(warmupTasks);
Assignment assignment = Assignment.fromRecord(record); TasksTuple tuple = TasksTuple.fromTargetAssignmentRecord(record);
assertEquals( assertEquals(
mkTasksPerSubtopology( mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 1, 2, 3), mkTasks(SUBTOPOLOGY_1, 1, 2, 3),
mkTasks(SUBTOPOLOGY_2, 4, 5, 6) mkTasks(SUBTOPOLOGY_2, 4, 5, 6)
), ),
assignment.activeTasks() tuple.activeTasks()
); );
assertEquals( assertEquals(
mkTasksPerSubtopology( mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 7, 8, 9), mkTasks(SUBTOPOLOGY_1, 7, 8, 9),
mkTasks(SUBTOPOLOGY_2, 1, 2, 3) mkTasks(SUBTOPOLOGY_2, 1, 2, 3)
), ),
assignment.standbyTasks() tuple.standbyTasks()
); );
assertEquals( assertEquals(
mkTasksPerSubtopology( mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 4, 5, 6), mkTasks(SUBTOPOLOGY_1, 4, 5, 6),
mkTasks(SUBTOPOLOGY_2, 7, 8, 9) mkTasks(SUBTOPOLOGY_2, 7, 8, 9)
), ),
assignment.warmupTasks() tuple.warmupTasks()
); );
} }
@Test
public void testMerge() {
TasksTuple tuple1 = new TasksTuple(
Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3)),
Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6)),
Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9))
);
TasksTuple tuple2 = new TasksTuple(
Map.of(SUBTOPOLOGY_1, Set.of(10, 11)),
Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
);
TasksTuple mergedTuple = tuple1.merge(tuple2);
assertEquals(Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3, 10, 11)), mergedTuple.activeTasks());
assertEquals(Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6, 12, 13)), mergedTuple.standbyTasks());
assertEquals(Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9, 14, 15)), mergedTuple.warmupTasks());
}
@Test
public void testContainsAny() {
TasksTuple tuple1 = new TasksTuple(
Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3)),
Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6)),
Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9))
);
TasksTuple tuple2 = new TasksTuple(
Map.of(SUBTOPOLOGY_1, Set.of(3, 10, 11)),
Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
);
assertTrue(tuple1.containsAny(tuple2));
TasksTuple tuple3 = new TasksTuple(
Map.of(SUBTOPOLOGY_1, Set.of(10, 11)),
Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
);
assertFalse(tuple1.containsAny(tuple3));
}
@Test
public void testIsEmpty() {
TasksTuple emptyTuple = new TasksTuple(Map.of(), Map.of(), Map.of());
assertTrue(emptyTuple.isEmpty());
TasksTuple nonEmptyTuple = new TasksTuple(Map.of(SUBTOPOLOGY_1, Set.of(1)), Map.of(), Map.of());
assertFalse(nonEmptyTuple.isEmpty());
}
} }