KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Yash Mayya <yash.mayya@gmail.com>
This commit is contained in:
Chris Egerton 2023-11-09 10:48:43 -05:00 committed by GitHub
parent 81cceedf7e
commit 39c6170aa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 10 deletions

View File

@ -48,6 +48,7 @@ import java.util.stream.IntStream;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
@ -337,10 +338,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
// The complete set of connectors and tasks that should be newly-assigned during this round
ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
.addConnectors(created.connectors())
.addTasks(created.tasks())
.addConnectors(lostAssignmentsToReassign.connectors())
.addTasks(lostAssignmentsToReassign.tasks())
.addAll(created)
.addAll(lostAssignmentsToReassign)
.build();
assignConnectors(nextWorkerAssignment, toAssign.connectors());
@ -460,8 +459,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
+ "missing assignments that the leader is detecting are probably due to some "
+ "workers failing to receive the new assignments in the previous rebalance. "
+ "Will reassign missing tasks as new tasks");
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
lostAssignmentsToReassign.addAll(lostAssignments);
return;
} else if (maxDelay == 0) {
log.debug("Scheduled rebalance delays are disabled ({} = 0); "
+ "reassigning all lost connectors and tasks immediately",
SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG
);
lostAssignmentsToReassign.addAll(lostAssignments);
return;
}
@ -498,8 +503,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
}
} else {
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
lostAssignmentsToReassign.addAll(lostAssignments);
}
resetDelay();
// Resetting the flag as now we can permit successive revoking rebalances.
@ -840,8 +844,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
private static void addAll(Map<String, ConnectorsAndTasks.Builder> base, Map<String, ConnectorsAndTasks> toAdd) {
toAdd.forEach((worker, assignment) -> base
.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder())
.addConnectors(assignment.connectors())
.addTasks(assignment.tasks())
.addAll(assignment)
);
}

View File

@ -480,6 +480,12 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
return this;
}
public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
return this
.addConnectors(connectorsAndTasks.connectors())
.addTasks(connectorsAndTasks.tasks());
}
public ConnectorsAndTasks build() {
return new ConnectorsAndTasks(withConnectors, withTasks);
}

View File

@ -1037,6 +1037,83 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(0, assignor.delay);
}
@Test
public void testLostAssignmentHandlingWhenScheduledDelayIsDisabled() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();
assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
String veryFlakyWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.with(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - Immediately reassigned
ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
assertEquals("Wrong assignment of lost connectors",
lostAssignments.connectors(), lostAssignmentsToReassign.build().connectors());
assertEquals("Wrong assignment of lost tasks",
lostAssignments.tasks(), lostAssignmentsToReassign.build().tasks());
}
@Test
public void testScheduledDelayIsDisabled() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1", "worker2");
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
// Second assignment with only one worker remaining in the group. The worker that left the
// group was a follower. Re-assignments take place immediately
removeWorkers("worker2");
performStandardRebalance();
assertDelay(rebalanceDelay);
assertWorkers("worker1");
assertConnectorAllocations(2);
assertTaskAllocations(8);
assertBalancedAndCompleteAllocation();
}
@Test
public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
// First assignment with 1 worker and 2 connectors configured but not yet assigned