KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance (#12561)

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Luke Chen <showuon@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
vamossagar12 2022-11-16 02:56:21 +05:30 committed by GitHub
parent 46bee5bcf3
commit 09da44ed80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 613 additions and 308 deletions

View File

@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime.distributed;
import java.util.Arrays;
import java.util.Map.Entry;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
@ -64,26 +66,33 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
private final int maxDelay;
private ConnectorsAndTasks previousAssignment;
private final ConnectorsAndTasks previousRevocation;
private boolean canRevoke;
// visible for testing
private boolean revokedInPrevious;
protected final Set<String> candidateWorkersForReassignment;
protected long scheduledRebalance;
protected int delay;
protected int previousGenerationId;
protected Set<String> previousMembers;
private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
private int numSuccessiveRevokingRebalances;
public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
this.log = logContext.logger(IncrementalCooperativeAssignor.class);
this.time = time;
this.maxDelay = maxDelay;
this.previousAssignment = ConnectorsAndTasks.EMPTY;
this.previousRevocation = new ConnectorsAndTasks.Builder().build();
this.canRevoke = true;
this.scheduledRebalance = 0;
this.revokedInPrevious = false;
this.candidateWorkersForReassignment = new LinkedHashSet<>();
this.delay = 0;
this.previousGenerationId = -1;
this.previousMembers = Collections.emptySet();
this.numSuccessiveRevokingRebalances = 0;
// By default, initial interval is 1. The only corner case is when the user has set maxDelay to 0
// in which case, the exponential backoff delay should be 0 which would return the backoff delay to be 0 always
this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(maxDelay == 0 ? 0 : 1, 40, maxDelay, 0);
}
@Override
@ -226,7 +235,6 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
|| previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
previousAssignment = activeAssignments;
canRevoke = true;
}
previousRevocation.connectors().clear();
previousRevocation.tasks().clear();
@ -237,10 +245,9 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
ConnectorsAndTasks deleted = diff(previousAssignment, configured);
log.debug("Deleted assignments: {}", deleted);
// Derived set: The set of remaining active connectors-and-tasks is a derived set from the
// set difference of active - deleted
ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive);
// The connectors and tasks that are currently running on more than one worker each
ConnectorsAndTasks duplicated = duplicatedAssignments(memberAssignments);
log.trace("Duplicated assignments: {}", duplicated);
// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
// the set difference of previous - active - deleted
@ -249,128 +256,138 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
// Derived set: The set of new connectors-and-tasks is a derived set from the set
// difference of configured - previous - active
ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments);
log.debug("New assignments: {}", newSubmissions);
// A collection of the complete assignment
List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberAssignments, ConnectorsAndTasks.EMPTY);
log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);
// Per worker connector assignments without removing deleted connectors yet
Map<String, Collection<String>> connectorAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments);
// Per worker task assignments without removing deleted connectors yet
Map<String, Collection<ConnectorTaskId>> taskAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);
ConnectorsAndTasks created = diff(configured, previousAssignment, activeAssignments);
log.debug("Created: {}", created);
// A collection of the current assignment excluding the connectors-and-tasks to be deleted
List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberAssignments, deleted);
Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
log.debug("Connector and task to delete assignments: {}", toRevoke);
Map<String, ConnectorsAndTasks.Builder> toRevoke = new HashMap<>();
Map<String, ConnectorsAndTasks> deletedToRevoke = intersection(deleted, memberAssignments);
log.debug("Deleted connectors and tasks to revoke from each worker: {}", deletedToRevoke);
addAll(toRevoke, deletedToRevoke);
// Revoking redundant connectors/tasks if the workers have duplicate assignments
toRevoke.putAll(computeDuplicatedAssignments(memberAssignments, connectorAssignments, taskAssignments));
log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);
Map<String, ConnectorsAndTasks> duplicatedToRevoke = intersection(duplicated, memberAssignments);
log.debug("Duplicated connectors and tasks to revoke from each worker: {}", duplicatedToRevoke);
addAll(toRevoke, duplicatedToRevoke);
// Recompute the complete assignment excluding the deleted connectors-and-tasks
completeWorkerAssignment = workerAssignment(memberAssignments, deleted);
connectorAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
taskAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
// Compute the assignment that will be applied across the cluster after this round of rebalance
// Later on, new submissions and lost-and-reassigned connectors and tasks will be added to these assignments,
// and load-balancing revocations will be removed from them.
List<WorkerLoad> nextWorkerAssignment = workerLoads(memberAssignments);
removeAll(nextWorkerAssignment, deletedToRevoke);
removeAll(nextWorkerAssignment, duplicatedToRevoke);
handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
// Collect the lost assignments that are ready to be reassigned because the workers that were
// originally responsible for them appear to have left the cluster instead of rejoining within
// the scheduled rebalance delay. These assignments will be re-allocated to the existing workers
// in the cluster later on
ConnectorsAndTasks.Builder lostAssignmentsToReassignBuilder = new ConnectorsAndTasks.Builder();
handleLostAssignments(lostAssignments, lostAssignmentsToReassignBuilder, nextWorkerAssignment);
ConnectorsAndTasks lostAssignmentsToReassign = lostAssignmentsToReassignBuilder.build();
// Do not revoke resources for re-assignment while a delayed rebalance is active
// Also we do not revoke in two consecutive rebalances by the same leader
canRevoke = delay == 0 && canRevoke;
if (delay == 0) {
Map<String, ConnectorsAndTasks> loadBalancingRevocations =
performLoadBalancingRevocations(configured, nextWorkerAssignment);
// Compute the connectors-and-tasks to be revoked for load balancing without taking into
// account the deleted ones.
log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
if (canRevoke) {
Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
performTaskRevocation(activeAssignments, currentWorkerAssignment);
log.debug("Connector and task to revoke assignments: {}", toRevoke);
toExplicitlyRevoke.forEach(
(worker, assignment) -> {
ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
worker,
v -> new ConnectorsAndTasks.Builder().build());
existing.connectors().addAll(assignment.connectors());
existing.tasks().addAll(assignment.tasks());
// If this round and the previous round involved revocation, we will calculate a delay for
// the next round when revoking rebalance would be allowed. Note that delay could be 0, in which
// case we would always revoke.
if (revokedInPrevious && !loadBalancingRevocations.isEmpty()) {
numSuccessiveRevokingRebalances++; // Should we consider overflow for this?
log.debug("Consecutive revoking rebalances observed. Computing delay and next scheduled rebalance.");
delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
if (delay != 0) {
scheduledRebalance = time.milliseconds() + delay;
log.debug("Skipping revocations in the current round with a delay of {}ms. Next scheduled rebalance:{}",
delay, scheduledRebalance);
} else {
log.debug("Revoking assignments immediately since scheduled.rebalance.max.delay.ms is set to 0");
addAll(toRevoke, loadBalancingRevocations);
// Remove all newly-revoked connectors and tasks from the next assignment, both to
// ensure that they are not included in the assignments during this round, and to produce
// an accurate allocation of all newly-created and lost-and-reassigned connectors and tasks
// that will have to be distributed across the cluster during this round
removeAll(nextWorkerAssignment, loadBalancingRevocations);
}
);
canRevoke = toExplicitlyRevoke.size() == 0;
} else if (!loadBalancingRevocations.isEmpty()) {
// We had a revocation in this round but not in the previous round. Let's store that state.
log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
addAll(toRevoke, loadBalancingRevocations);
removeAll(nextWorkerAssignment, loadBalancingRevocations);
revokedInPrevious = true;
} else if (revokedInPrevious) {
// No revocations in this round but the previous round had one. Probably the workers
// have converged to a balanced load. We can reset the rebalance clock
log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " +
"balanced load. Resetting the exponential backoff clock");
revokedInPrevious = false;
numSuccessiveRevokingRebalances = 0;
} else {
// no-op
log.debug("No revocations in previous and current round.");
}
} else {
canRevoke = delay == 0;
log.debug("Delayed rebalance is active. Delaying {}ms before revoking connectors and tasks: {}", delay, toRevoke);
revokedInPrevious = false;
}
assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
assignTasks(completeWorkerAssignment, newSubmissions.tasks());
log.debug("Current complete assignments: {}", currentWorkerAssignment);
log.debug("New complete assignments: {}", completeWorkerAssignment);
// The complete set of connectors and tasks that should be newly-assigned during this round
ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
.addConnectors(created.connectors())
.addTasks(created.tasks())
.addConnectors(lostAssignmentsToReassign.connectors())
.addTasks(lostAssignmentsToReassign.tasks())
.build();
assignConnectors(nextWorkerAssignment, toAssign.connectors());
assignTasks(nextWorkerAssignment, toAssign.tasks());
Map<String, Collection<String>> nextConnectorAssignments = nextWorkerAssignment.stream()
.collect(Collectors.toMap(
WorkerLoad::worker,
WorkerLoad::connectors
));
Map<String, Collection<ConnectorTaskId>> nextTaskAssignments = nextWorkerAssignment.stream()
.collect(Collectors.toMap(
WorkerLoad::worker,
WorkerLoad::tasks
));
Map<String, Collection<String>> currentConnectorAssignments =
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
Map<String, Collection<String>> incrementalConnectorAssignments =
diff(connectorAssignments, currentConnectorAssignments);
diff(nextConnectorAssignments, currentConnectorAssignments);
Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
diff(taskAssignments, currentTaskAssignments);
diff(nextTaskAssignments, currentTaskAssignments);
previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
Map<String, ConnectorsAndTasks> revoked = buildAll(toRevoke);
previousAssignment = computePreviousAssignment(revoked, nextConnectorAssignments, nextTaskAssignments, lostAssignments);
previousGenerationId = currentGenerationId;
previousMembers = memberAssignments.keySet();
log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
Map<String, Collection<String>> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors);
Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks);
Map<String, Collection<String>> revokedConnectors = transformValues(revoked, ConnectorsAndTasks::connectors);
Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(revoked, ConnectorsAndTasks::tasks);
return new ClusterAssignment(
incrementalConnectorAssignments,
incrementalTaskAssignments,
revokedConnectors,
revokedTasks,
diff(connectorAssignments, revokedConnectors),
diff(taskAssignments, revokedTasks)
diff(nextConnectorAssignments, revokedConnectors),
diff(nextTaskAssignments, revokedTasks)
);
}
private Map<String, ConnectorsAndTasks> computeDeleted(ConnectorsAndTasks deleted,
Map<String, Collection<String>> connectorAssignments,
Map<String, Collection<ConnectorTaskId>> taskAssignments) {
// Connector to worker reverse lookup map
Map<String, String> connectorOwners = WorkerCoordinator.invertAssignment(connectorAssignments);
// Task to worker reverse lookup map
Map<ConnectorTaskId, String> taskOwners = WorkerCoordinator.invertAssignment(taskAssignments);
Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
// Add the connectors that have been deleted to the revoked set
deleted.connectors().forEach(c ->
toRevoke.computeIfAbsent(
connectorOwners.get(c),
v -> new ConnectorsAndTasks.Builder().build()
).connectors().add(c));
// Add the tasks that have been deleted to the revoked set
deleted.tasks().forEach(t ->
toRevoke.computeIfAbsent(
taskOwners.get(t),
v -> new ConnectorsAndTasks.Builder().build()
).tasks().add(t));
log.debug("Connectors and tasks to delete assignments: {}", toRevoke);
return toRevoke;
}
private ConnectorsAndTasks computePreviousAssignment(Map<String, ConnectorsAndTasks> toRevoke,
Map<String, Collection<String>> connectorAssignments,
Map<String, Collection<ConnectorTaskId>> taskAssignments,
@ -421,47 +438,12 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
return new ConnectorsAndTasks.Builder().with(duplicatedConnectors, duplicatedTasks).build();
}
private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments,
Map<String, Collection<String>> connectorAssignments,
Map<String, Collection<ConnectorTaskId>> taskAssignment) {
ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberAssignments);
log.debug("Duplicated assignments: {}", duplicatedAssignments);
Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
if (!duplicatedAssignments.connectors().isEmpty()) {
connectorAssignments.entrySet().stream()
.forEach(entry -> {
Set<String> duplicatedConnectors = new HashSet<>(duplicatedAssignments.connectors());
duplicatedConnectors.retainAll(entry.getValue());
if (!duplicatedConnectors.isEmpty()) {
toRevoke.computeIfAbsent(
entry.getKey(),
v -> new ConnectorsAndTasks.Builder().build()
).connectors().addAll(duplicatedConnectors);
}
});
}
if (!duplicatedAssignments.tasks().isEmpty()) {
taskAssignment.entrySet().stream()
.forEach(entry -> {
Set<ConnectorTaskId> duplicatedTasks = new HashSet<>(duplicatedAssignments.tasks());
duplicatedTasks.retainAll(entry.getValue());
if (!duplicatedTasks.isEmpty()) {
toRevoke.computeIfAbsent(
entry.getKey(),
v -> new ConnectorsAndTasks.Builder().build()
).tasks().addAll(duplicatedTasks);
}
});
}
return toRevoke;
}
// visible for testing
protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
ConnectorsAndTasks newSubmissions,
ConnectorsAndTasks.Builder lostAssignmentsToReassign,
List<WorkerLoad> completeWorkerAssignment) {
if (lostAssignments.isEmpty()) {
// There are no lost assignments and there have been no successive revoking rebalances
if (lostAssignments.isEmpty() && !revokedInPrevious) {
resetDelay();
return;
}
@ -478,8 +460,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
+ "missing assignments that the leader is detecting are probably due to some "
+ "workers failing to receive the new assignments in the previous rebalance. "
+ "Will reassign missing tasks as new tasks");
newSubmissions.connectors().addAll(lostAssignments.connectors());
newSubmissions.tasks().addAll(lostAssignments.tasks());
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
return;
}
@ -516,10 +498,13 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
}
} else {
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
newSubmissions.connectors().addAll(lostAssignments.connectors());
newSubmissions.tasks().addAll(lostAssignments.tasks());
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
}
resetDelay();
// Resetting the flag as now we can permit successive revoking rebalances.
// since we have gone through the full rebalance delay
revokedInPrevious = false;
} else {
candidateWorkersForReassignment
.addAll(candidateWorkersForReassignment(completeWorkerAssignment));
@ -566,89 +551,6 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
.collect(Collectors.toList());
}
/**
* Task revocation is based on a rough estimation of the lower average number of tasks before
* and after new workers join the group. If no new workers join, no revocation takes place.
* Based on this estimation, tasks are revoked until the new floor average is reached for
* each existing worker. The revoked tasks, once assigned to the new workers will maintain
* a balanced load among the group.
*
* @param activeAssignments
* @param completeWorkerAssignment
* @return
*/
private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks activeAssignments,
Collection<WorkerLoad> completeWorkerAssignment) {
int totalActiveConnectorsNum = activeAssignments.connectors().size();
int totalActiveTasksNum = activeAssignments.tasks().size();
Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
.filter(wl -> wl.size() > 0)
.collect(Collectors.toList());
int existingWorkersNum = existingWorkers.size();
int totalWorkersNum = completeWorkerAssignment.size();
int newWorkersNum = totalWorkersNum - existingWorkersNum;
if (log.isDebugEnabled()) {
completeWorkerAssignment.forEach(wl -> log.debug(
"Per worker current load size; worker: {} connectors: {} tasks: {}",
wl.worker(), wl.connectorsSize(), wl.tasksSize()));
}
Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
// If there are no new workers, or no existing workers to revoke tasks from return early
// after logging the status
if (!(newWorkersNum > 0 && existingWorkersNum > 0)) {
log.debug("No task revocation required; workers with existing load: {} workers with "
+ "no load {} total workers {}",
existingWorkersNum, newWorkersNum, totalWorkersNum);
// This is intentionally empty but mutable, because the map is used to include deleted
// connectors and tasks as well
return revoking;
}
log.debug("Task revocation is required; workers with existing load: {} workers with "
+ "no load {} total workers {}",
existingWorkersNum, newWorkersNum, totalWorkersNum);
// We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
int floorTasks = totalActiveTasksNum / totalWorkersNum;
int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
int numToRevoke;
for (WorkerLoad existing : existingWorkers) {
Iterator<String> connectors = existing.connectors().iterator();
numToRevoke = existing.connectorsSize() - ceilConnectors;
for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
w -> new ConnectorsAndTasks.Builder().build());
resources.connectors().add(connectors.next());
}
}
for (WorkerLoad existing : existingWorkers) {
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
numToRevoke = existing.tasksSize() - ceilTasks;
log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
w -> new ConnectorsAndTasks.Builder().build());
resources.tasks().add(tasks.next());
}
}
return revoking;
}
private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error,
String leaderId, String leaderUrl, long maxOffset,
ClusterAssignment clusterAssignment,
@ -715,6 +617,143 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
).build();
}
/**
* Revoke connectors and tasks from each worker in the cluster until no worker is running more than it
* would be with a perfectly-balanced assignment.
* @param configured the set of configured connectors and tasks across the entire cluster
* @param workers the workers in the cluster, whose assignments should not include any deleted or duplicated connectors or tasks
* that are already due to be revoked from the worker in this rebalance
* @return which connectors and tasks should be revoked from which workers; never null, but may be empty
* if no load-balancing revocations are necessary or possible
*/
private Map<String, ConnectorsAndTasks> performLoadBalancingRevocations(
ConnectorsAndTasks configured,
Collection<WorkerLoad> workers
) {
if (log.isTraceEnabled()) {
workers.forEach(wl -> log.trace(
"Per worker current load size; worker: {} connectors: {} tasks: {}",
wl.worker(), wl.connectorsSize(), wl.tasksSize()));
}
if (workers.stream().allMatch(WorkerLoad::isEmpty)) {
log.trace("No load-balancing revocations required; all workers are either new "
+ "or will have all currently-assigned connectors and tasks revoked during this round"
);
return Collections.emptyMap();
}
if (configured.isEmpty()) {
log.trace("No load-balancing revocations required; no connectors are currently configured on this cluster");
return Collections.emptyMap();
}
Map<String, ConnectorsAndTasks.Builder> result = new HashMap<>();
Map<String, Set<String>> connectorRevocations = loadBalancingRevocations(
"connector",
configured.connectors().size(),
workers,
WorkerLoad::connectors
);
Map<String, Set<ConnectorTaskId>> taskRevocations = loadBalancingRevocations(
"task",
configured.tasks().size(),
workers,
WorkerLoad::tasks
);
connectorRevocations.forEach((worker, revoked) ->
result.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder()).addConnectors(revoked)
);
taskRevocations.forEach((worker, revoked) ->
result.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder()).addTasks(revoked)
);
return buildAll(result);
}
private <E> Map<String, Set<E>> loadBalancingRevocations(
String allocatedResourceName,
int totalToAllocate,
Collection<WorkerLoad> workers,
Function<WorkerLoad, Collection<E>> workerAllocation
) {
int totalWorkers = workers.size();
// The minimum instances of this resource that should be assigned to each worker
int minAllocatedPerWorker = totalToAllocate / totalWorkers;
// How many workers are going to have to be allocated exactly one extra instance
// (since the total number to allocate may not be a perfect multiple of the number of workers)
int workersToAllocateExtra = totalToAllocate % totalWorkers;
// Useful function to determine exactly how many instances of the resource a given worker is currently allocated
Function<WorkerLoad, Integer> workerAllocationSize = workerAllocation.andThen(Collection::size);
long workersAllocatedMinimum = workers.stream()
.map(workerAllocationSize)
.filter(n -> n == minAllocatedPerWorker)
.count();
long workersAllocatedSingleExtra = workers.stream()
.map(workerAllocationSize)
.filter(n -> n == minAllocatedPerWorker + 1)
.count();
if (workersAllocatedSingleExtra == workersToAllocateExtra
&& workersAllocatedMinimum + workersAllocatedSingleExtra == totalWorkers) {
log.trace(
"No load-balancing {} revocations required; the current allocations, when combined with any newly-created {}s, should be balanced",
allocatedResourceName,
allocatedResourceName
);
return Collections.emptyMap();
}
Map<String, Set<E>> result = new HashMap<>();
// How many workers we've allocated a single extra resource instance to
int allocatedExtras = 0;
// Calculate how many (and which) connectors/tasks to revoke from each worker here
for (WorkerLoad worker : workers) {
int currentAllocationSizeForWorker = workerAllocationSize.apply(worker);
if (currentAllocationSizeForWorker <= minAllocatedPerWorker) {
// This worker isn't allocated more than the minimum; no need to revoke anything
continue;
}
int maxAllocationForWorker;
if (allocatedExtras < workersToAllocateExtra) {
// We'll allocate one of the extra resource instances to this worker
allocatedExtras++;
if (currentAllocationSizeForWorker == minAllocatedPerWorker + 1) {
// If the worker's running exactly one more than the minimum, and we're allowed to
// allocate an extra to it, there's no need to revoke anything
continue;
}
maxAllocationForWorker = minAllocatedPerWorker + 1;
} else {
maxAllocationForWorker = minAllocatedPerWorker;
}
Set<E> revokedFromWorker = new LinkedHashSet<>();
result.put(worker.worker(), revokedFromWorker);
Iterator<E> currentWorkerAllocation = workerAllocation.apply(worker).iterator();
// Revoke resources from the worker until it isn't allocated any more than it should be
for (int numRevoked = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; numRevoked++) {
if (!currentWorkerAllocation.hasNext()) {
// Should never happen, but better to log a warning and move on than die and fail the whole rebalance if it does
log.warn(
"Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; " +
"worker appears to still be allocated {} instances, which is more than the intended allocation of {}",
allocatedResourceName,
worker.worker(),
workerAllocationSize.apply(worker),
maxAllocationForWorker
);
break;
}
E revocation = currentWorkerAllocation.next();
revokedFromWorker.add(revocation);
}
}
return result;
}
private int calculateDelay(long now) {
long diff = scheduledRebalance - now;
return diff > 0 ? (int) Math.min(diff, maxDelay) : 0;
@ -783,7 +822,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks> memberAssignments,
ConnectorsAndTasks toExclude) {
ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
.with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks()))
.with(toExclude.connectors(), toExclude.tasks())
.build();
return memberAssignments.entrySet().stream()
@ -798,6 +837,43 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
).collect(Collectors.toList());
}
private static void addAll(Map<String, ConnectorsAndTasks.Builder> base, Map<String, ConnectorsAndTasks> toAdd) {
toAdd.forEach((worker, assignment) -> base
.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder())
.addConnectors(assignment.connectors())
.addTasks(assignment.tasks())
);
}
private static <K> Map<K, ConnectorsAndTasks> buildAll(Map<K, ConnectorsAndTasks.Builder> builders) {
return transformValues(builders, ConnectorsAndTasks.Builder::build);
}
private static List<WorkerLoad> workerLoads(Map<String, ConnectorsAndTasks> memberAssignments) {
return memberAssignments.entrySet().stream()
.map(e -> new WorkerLoad.Builder(e.getKey()).with(e.getValue().connectors(), e.getValue().tasks()).build())
.collect(Collectors.toList());
}
private static void removeAll(List<WorkerLoad> workerLoads, Map<String, ConnectorsAndTasks> toRemove) {
workerLoads.forEach(workerLoad -> {
String worker = workerLoad.worker();
ConnectorsAndTasks toRemoveFromWorker = toRemove.getOrDefault(worker, ConnectorsAndTasks.EMPTY);
workerLoad.connectors().removeAll(toRemoveFromWorker.connectors());
workerLoad.tasks().removeAll(toRemoveFromWorker.tasks());
});
}
private static Map<String, ConnectorsAndTasks> intersection(ConnectorsAndTasks connectorsAndTasks, Map<String, ConnectorsAndTasks> assignments) {
return transformValues(assignments, assignment -> {
Collection<String> connectors = new HashSet<>(assignment.connectors());
connectors.retainAll(connectorsAndTasks.connectors());
Collection<ConnectorTaskId> tasks = new HashSet<>(assignment.tasks());
tasks.retainAll(connectorsAndTasks.tasks());
return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
});
}
static class ClusterAssignment {
private final Map<String, Collection<String>> newlyAssignedConnectors;

View File

@ -37,9 +37,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
@ -455,30 +457,31 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
}
public static class Builder {
private Collection<String> withConnectors;
private Collection<ConnectorTaskId> withTasks;
private Set<String> withConnectors = new LinkedHashSet<>();
private Set<ConnectorTaskId> withTasks = new LinkedHashSet<>();
public Builder() {
}
public ConnectorsAndTasks.Builder withCopies(Collection<String> connectors,
Collection<ConnectorTaskId> tasks) {
withConnectors = new ArrayList<>(connectors);
withTasks = new ArrayList<>(tasks);
public ConnectorsAndTasks.Builder with(Collection<String> connectors,
Collection<ConnectorTaskId> tasks) {
withConnectors = new LinkedHashSet<>(connectors);
withTasks = new LinkedHashSet<>(tasks);
return this;
}
public ConnectorsAndTasks.Builder with(Collection<String> connectors,
Collection<ConnectorTaskId> tasks) {
withConnectors = new ArrayList<>(connectors);
withTasks = new ArrayList<>(tasks);
public ConnectorsAndTasks.Builder addConnectors(Collection<String> connectors) {
this.withConnectors.addAll(connectors);
return this;
}
public ConnectorsAndTasks.Builder addTasks(Collection<ConnectorTaskId> tasks) {
this.withTasks.addAll(tasks);
return this;
}
public ConnectorsAndTasks build() {
return new ConnectorsAndTasks(
withConnectors != null ? withConnectors : new ArrayList<>(),
withTasks != null ? withTasks : new ArrayList<>());
return new ConnectorsAndTasks(withConnectors, withTasks);
}
}

View File

@ -22,7 +22,6 @@ import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -188,7 +187,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
}
@Test
@Ignore // TODO: To be re-enabled once we can make it less flaky (KAFKA-8391)
public void testDeleteConnector() throws Exception {
// create test topic
connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
@ -270,7 +268,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
}
@Ignore // TODO: To be re-enabled once we can make it less flaky (KAFKA-12495, KAFKA-12283)
@Test
public void testMultipleWorkersRejoining() throws Exception {
// create test topic

View File

@ -89,6 +89,9 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testTaskAssignmentWhenWorkerJoins() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
// First assignment with 1 worker and 2 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
@ -107,17 +110,229 @@ public class IncrementalCooperativeAssignorTest {
// Third assignment after revocations
performStandardRebalance();
assertNoRevocations();
assertDelay(0);
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
// A fourth rebalance should not change assignments
time.sleep(assignor.delay);
// A fourth rebalance after delay should not change assignments
performStandardRebalance();
assertDelay(0);
assertEmptyAssignment();
}
@Test
public void testAssignmentsWhenWorkersJoinAfterRevocations() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
addNewConnector("connector3", 4);
// First assignment with 1 worker and 3 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1");
assertConnectorAllocations(3);
assertTaskAllocations(12);
assertBalancedAndCompleteAllocation();
// Second assignment with a second worker joining and all connectors running on previous worker
// We should revoke.
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertWorkers("worker1", "worker2");
assertConnectorAllocations(0, 2);
assertTaskAllocations(0, 6);
// Third assignment immediately after revocations, and a third worker joining.
// This is a successive revoking rebalance. We should not perform any revocations
// in this round
addNewEmptyWorkers("worker3");
performStandardRebalance();
assertTrue(assignor.delay > 0);
assertWorkers("worker1", "worker2", "worker3");
assertConnectorAllocations(0, 1, 2);
assertTaskAllocations(3, 3, 6);
// Fourth assignment and a fourth worker joining
// after first revoking rebalance is expired. We should revoke.
time.sleep(assignor.delay);
addNewEmptyWorkers("worker4");
performStandardRebalance();
assertWorkers("worker1", "worker2", "worker3", "worker4");
assertConnectorAllocations(0, 0, 1, 1);
assertTaskAllocations(0, 3, 3, 3);
// Fifth assignment and a fifth worker joining after a revoking rebalance.
// We shouldn't revoke and set a delay > initial interval
addNewEmptyWorkers("worker5");
performStandardRebalance();
assertTrue(assignor.delay > 40);
assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
assertConnectorAllocations(0, 0, 1, 1, 1);
assertTaskAllocations(1, 2, 3, 3, 3);
// Sixth assignment with sixth worker joining after the expiry.
// Should revoke
time.sleep(assignor.delay);
addNewEmptyWorkers("worker6");
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
assertConnectorAllocations(0, 0, 0, 1, 1, 1);
assertTaskAllocations(0, 1, 2, 2, 2, 2);
// Follow up rebalance since there were revocations
performStandardRebalance();
assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
assertConnectorAllocations(0, 0, 0, 1, 1, 1);
assertTaskAllocations(2, 2, 2, 2, 2, 2);
assertBalancedAndCompleteAllocation();
}
@Test
public void testImmediateRevocationsWhenMaxDelayIs0() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();
addNewConnector("connector3", 4);
// First assignment with 1 worker and 3 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1");
assertConnectorAllocations(3);
assertTaskAllocations(12);
assertBalancedAndCompleteAllocation();
// Second assignment with a second worker joining and all connectors running on previous worker
// We should revoke.
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertWorkers("worker1", "worker2");
assertConnectorAllocations(0, 2);
assertTaskAllocations(0, 6);
// Third assignment immediately after revocations, and a third worker joining.
// This is a successive revoking rebalance but we should still revoke as rebalance delay is 0
addNewEmptyWorkers("worker3");
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1", "worker2", "worker3");
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(3, 3, 4);
// Follow up rebalance post revocations
performStandardRebalance();
assertWorkers("worker1", "worker2", "worker3");
assertNoRevocations();
assertConnectorAllocations(1, 1, 1);
assertTaskAllocations(4, 4, 4);
assertBalancedAndCompleteAllocation();
}
@Test
public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
// Customize assignor for this test case
rebalanceDelay = 1;
initAssignor();
addNewConnector("connector3", 4);
// First assignment with 1 worker and 3 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1");
assertConnectorAllocations(3);
assertTaskAllocations(12);
assertBalancedAndCompleteAllocation();
// Second assignment with a second worker joining and all connectors running on previous worker
// We should revoke.
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertWorkers("worker1", "worker2");
assertConnectorAllocations(0, 2);
assertTaskAllocations(0, 6);
// Third assignment immediately after revocations, and a third worker joining.
// This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms
addNewEmptyWorkers("worker3");
performStandardRebalance();
assertDelay(1);
assertWorkers("worker1", "worker2", "worker3");
assertConnectorAllocations(0, 1, 2);
assertTaskAllocations(3, 3, 6);
}
@Test
public void testWorkerJoiningDuringDelayedRebalance() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
addNewConnector("connector3", 4);
// First assignment with 1 worker and 3 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1");
assertConnectorAllocations(3);
assertTaskAllocations(12);
assertBalancedAndCompleteAllocation();
// Second assignment with a second worker joining and all connectors running on previous worker
// We should revoke.
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertWorkers("worker1", "worker2");
assertConnectorAllocations(0, 2);
assertTaskAllocations(0, 6);
// Third assignment immediately after revocations, and a third worker joining.
// This is a successive revoking rebalance. We should not perform any revocations
// in this round, but can allocate the connectors and tasks revoked previously
addNewEmptyWorkers("worker3");
performStandardRebalance();
assertTrue(assignor.delay > 0);
assertWorkers("worker1", "worker2", "worker3");
assertNoRevocations();
assertConnectorAllocations(0, 1, 2);
assertTaskAllocations(3, 3, 6);
// Fourth assignment and a fourth worker joining
// while delayed rebalance is active. We should not revoke
time.sleep(assignor.delay / 2);
addNewEmptyWorkers("worker4");
performStandardRebalance();
assertTrue(assignor.delay > 0);
assertWorkers("worker1", "worker2", "worker3", "worker4");
assertNoRevocations();
assertConnectorAllocations(0, 0, 1, 2);
assertTaskAllocations(0, 3, 3, 6);
// Fifth assignment and a fifth worker joining
// after the delay has expired. We should perform load-balancing
// revocations
time.sleep(assignor.delay);
addNewEmptyWorkers("worker5");
performStandardRebalance();
assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
assertDelay(0);
assertConnectorAllocations(0, 0, 0, 1, 1);
assertTaskAllocations(0, 0, 2, 3, 3);
// Sixth and final rebalance, as a follow-up to the revocations in the previous round.
// Should allocate all previously-revoked connectors and tasks evenly across the cluster
performStandardRebalance();
assertDelay(0);
assertNoRevocations();
assertConnectorAllocations(0, 0, 1, 1, 1);
assertTaskAllocations(2, 2, 2, 3, 3);
assertBalancedAndCompleteAllocation();
}
@Test
public void testTaskAssignmentWhenWorkerLeavesPermanently() {
// Customize assignor for this test case
@ -204,10 +419,11 @@ public class IncrementalCooperativeAssignorTest {
time.sleep(rebalanceDelay / 4);
// Fifth assignment with the same two workers. The delay has expired, so the lost
// assignments ought to be assigned to the worker that has appeared as returned.
// Fifth assignment with the same two workers. The delay has expired, so there
// should be revocations giving back the assignments to the reappearing worker
performStandardRebalance();
assertDelay(0);
assertNoRevocations();
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
@ -215,10 +431,6 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testTaskAssignmentWhenLeaderLeavesPermanently() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
// First assignment with 3 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2", "worker3");
performStandardRebalance();
@ -251,10 +463,6 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testTaskAssignmentWhenLeaderBounces() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
// First assignment with 3 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2", "worker3");
performStandardRebalance();
@ -271,7 +479,6 @@ public class IncrementalCooperativeAssignorTest {
// The fact that the leader bounces means that the assignor starts from a clean slate
initAssignor();
// Capture needs to be reset to point to the new assignor
performStandardRebalance();
assertDelay(0);
assertWorkers("worker2", "worker3");
@ -292,6 +499,7 @@ public class IncrementalCooperativeAssignorTest {
// Fourth assignment after revocations
performStandardRebalance();
assertDelay(0);
assertNoRevocations();
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(2, 3, 3);
assertBalancedAndCompleteAllocation();
@ -299,10 +507,6 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testTaskAssignmentWhenFirstAssignmentAttemptFails() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performFailedRebalance();
@ -348,7 +552,16 @@ public class IncrementalCooperativeAssignorTest {
assertTaskAllocations(0, 4, 4);
// Third assignment happens with members returning the same assignments (memberConfigs)
// as the first time.
// as the first time. Since this is a consecutive revoking rebalance, delay should be non-zero
// but no revoking rebalances
performStandardRebalance();
assertTrue(assignor.delay > 0);
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(0, 4, 4);
// Wait for delay ms before triggering another rebalance
time.sleep(assignor.delay);
// Fourth assignment after revocations.
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(0, 1, 1);
@ -356,7 +569,6 @@ public class IncrementalCooperativeAssignorTest {
// Fourth assignment after revocations
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(2, 3, 3);
assertBalancedAndCompleteAllocation();
@ -389,15 +601,22 @@ public class IncrementalCooperativeAssignorTest {
assertTaskAllocations(0, 4, 4);
// Third assignment happens with members returning the same assignments (memberConfigs)
// as the first time.
// as the first time. Since this is a consecutive revoking rebalance, there should be delay
// not leading to revocations.
performRebalanceWithMismatchedGeneration();
assertDelay(0);
assertTrue(assignor.delay > 0);
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(0, 4, 4);
// Wait for delay to get revoking rebalances
time.sleep(assignor.delay);
// Fourth assignment after revocations
performStandardRebalance();
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(0, 3, 3);
// Fourth assignment after revocations
// Fifth and final rebalance
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(2, 3, 3);
assertBalancedAndCompleteAllocation();
@ -502,11 +721,9 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -520,10 +737,10 @@ public class IncrementalCooperativeAssignorTest {
String flakyWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(flakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
.with(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -538,7 +755,7 @@ public class IncrementalCooperativeAssignorTest {
// A new worker (probably returning worker) has joined
configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -551,7 +768,7 @@ public class IncrementalCooperativeAssignorTest {
time.sleep(rebalanceDelay);
// The new worker has still no assignments
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertTrue("Wrong assignment of lost connectors",
@ -584,11 +801,9 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -602,10 +817,10 @@ public class IncrementalCooperativeAssignorTest {
String removedWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(removedWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
.with(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -619,7 +834,7 @@ public class IncrementalCooperativeAssignorTest {
rebalanceDelay /= 2;
// No new worker has joined
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -630,13 +845,14 @@ public class IncrementalCooperativeAssignorTest {
time.sleep(rebalanceDelay);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
new ArrayList<>(configuredAssignment.values()));
assertTrue("Wrong assignment of lost connectors",
newSubmissions.connectors().containsAll(lostAssignments.connectors()));
lostAssignmentsToReassign.build().connectors().containsAll(lostAssignments.connectors()));
assertTrue("Wrong assignment of lost tasks",
newSubmissions.tasks().containsAll(lostAssignments.tasks()));
lostAssignmentsToReassign.build().tasks().containsAll(lostAssignments.tasks()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
@ -659,11 +875,9 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -677,13 +891,13 @@ public class IncrementalCooperativeAssignorTest {
String flakyWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(flakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
.with(lostLoad.connectors(), lostLoad.tasks()).build();
String newWorker = "worker3";
configuredAssignment.put(newWorker, new WorkerLoad.Builder(newWorker).build());
// Lost assignments detected - A new worker also has joined that is not the returning worker
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -698,7 +912,7 @@ public class IncrementalCooperativeAssignorTest {
// Now two new workers have joined
configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
Set<String> expectedWorkers = new HashSet<>();
@ -717,7 +931,7 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put(newWorker, workerLoad(newWorker, 8, 2, 12, 4));
// we don't reflect these new assignments in memberConfigs currently because they are not
// used in handleLostAssignments method
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
// both the newWorkers would need to be considered for re assignment of connectors and tasks
@ -757,11 +971,9 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -775,10 +987,10 @@ public class IncrementalCooperativeAssignorTest {
String veryFlakyWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
.with(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -793,7 +1005,7 @@ public class IncrementalCooperativeAssignorTest {
// A new worker (probably returning worker) has joined
configuredAssignment.put(veryFlakyWorker, new WorkerLoad.Builder(veryFlakyWorker).build());
assignor.handleLostAssignments(lostAssignments, newSubmissions,
assignor.handleLostAssignments(lostAssignments, new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
@ -807,13 +1019,14 @@ public class IncrementalCooperativeAssignorTest {
// The returning worker leaves permanently after joining briefly during the delay
configuredAssignment.remove(veryFlakyWorker);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
new ArrayList<>(configuredAssignment.values()));
assertTrue("Wrong assignment of lost connectors",
newSubmissions.connectors().containsAll(lostAssignments.connectors()));
lostAssignmentsToReassign.build().connectors().containsAll(lostAssignments.connectors()));
assertTrue("Wrong assignment of lost tasks",
newSubmissions.tasks().containsAll(lostAssignments.tasks()));
lostAssignmentsToReassign.build().tasks().containsAll(lostAssignments.tasks()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
@ -842,12 +1055,7 @@ public class IncrementalCooperativeAssignorTest {
// Third assignment after revocations
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(1, 1);
assertTaskAllocations(2, 4);
// fourth rebalance after revocations
performStandardRebalance();
assertDelay(0);
assertNoRevocations();
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
@ -860,6 +1068,10 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
// First assignment with 1 worker and 2 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
@ -877,22 +1089,18 @@ public class IncrementalCooperativeAssignorTest {
assertDelay(0);
assertWorkers("worker1", "worker2");
assertConnectorAllocations(0, 1);
assertTaskAllocations(0, 4);
assertTaskAllocations(0, 2);
// Third assignment after revocations
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(0, 1);
assertTaskAllocations(0, 2);
// fourth rebalance after revocations
performStandardRebalance();
assertDelay(0);
assertNoRevocations();
assertConnectorAllocations(0, 1);
assertTaskAllocations(2, 2);
assertBalancedAndCompleteAllocation();
// Fifth rebalance should not change assignments
time.sleep(assignor.delay);
// Fifth rebalance after delay should not change assignments
performStandardRebalance();
assertDelay(0);
assertEmptyAssignment();
@ -1015,7 +1223,11 @@ public class IncrementalCooperativeAssignorTest {
generationId++;
int lastCompletedGenerationId = generationMismatch ? generationId - 2 : generationId - 1;
try {
Map<String, ConnectorsAndTasks> memberAssignmentsCopy = new HashMap<>(memberAssignments);
Map<String, ConnectorsAndTasks> memberAssignmentsCopy = memberAssignments.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new ConnectorsAndTasks.Builder().with(e.getValue().connectors(), e.getValue().tasks()).build()
));
returnedAssignments = assignor.performTaskAssignment(configState(), lastCompletedGenerationId, generationId, memberAssignmentsCopy);
} catch (RuntimeException e) {
if (assignmentFailure) {
@ -1037,7 +1249,7 @@ public class IncrementalCooperativeAssignorTest {
}
private void addNewWorker(String worker, List<String> connectors, List<ConnectorTaskId> tasks) {
ConnectorsAndTasks assignment = new ConnectorsAndTasks.Builder().withCopies(connectors, tasks).build();
ConnectorsAndTasks assignment = new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
assertNull(
"Worker " + worker + " already exists",
memberAssignments.put(worker, assignment)
@ -1129,14 +1341,14 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(
"Complete connector assignment for worker " + worker + " does not match expectations " +
"based on prior assignment and new revocations and assignments",
workerAssignment.connectors(),
returnedAssignments.allAssignedConnectors().get(worker)
new HashSet<>(workerAssignment.connectors()),
new HashSet<>(returnedAssignments.allAssignedConnectors().get(worker))
);
assertEquals(
"Complete task assignment for worker " + worker + " does not match expectations " +
"based on prior assignment and new revocations and assignments",
workerAssignment.tasks(),
returnedAssignments.allAssignedTasks().get(worker)
new HashSet<>(workerAssignment.tasks()),
new HashSet<>(returnedAssignments.allAssignedTasks().get(worker))
);
});
}
@ -1213,6 +1425,23 @@ public class IncrementalCooperativeAssignorTest {
.collect(Collectors.toList());
}
private void assertNoRevocations() {
returnedAssignments.newlyRevokedConnectors().forEach((worker, revocations) ->
assertEquals(
"Expected no revocations to take place during this round, but connector revocations were issued for worker " + worker,
Collections.emptySet(),
new HashSet<>(revocations)
)
);
returnedAssignments.newlyRevokedTasks().forEach((worker, revocations) ->
assertEquals(
"Expected no revocations to take place during this round, but task revocations were issued for worker " + worker,
Collections.emptySet(),
new HashSet<>(revocations)
)
);
}
private void assertDelay(int expectedDelay) {
assertEquals(
"Wrong rebalance delay",