mirror of https://github.com/apache/kafka.git
KAFKA-10413: Allow for even distribution of lost/new tasks when multiple Connect workers join at the same time (#9319)
First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker. Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks. Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks. Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group. Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future. Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks. Testing strategy : * System testing in a Kubernetes environment completed * New integration tests to test for balanced tasks * Updated unit tests. Co-authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthusamy@apple.com> Co-authored-by: Randall Hauch <rhauch@gmail.com> Co-authored-by: Konstantine Karantasis <konstantine@confluent.io> Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
This commit is contained in:
parent
1711cfa4eb
commit
e260f64a9c
|
@ -34,7 +34,6 @@ import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -445,16 +444,34 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
|
||||||
if (scheduledRebalance > 0 && now >= scheduledRebalance) {
|
if (scheduledRebalance > 0 && now >= scheduledRebalance) {
|
||||||
// delayed rebalance expired and it's time to assign resources
|
// delayed rebalance expired and it's time to assign resources
|
||||||
log.debug("Delayed rebalance expired. Reassigning lost tasks");
|
log.debug("Delayed rebalance expired. Reassigning lost tasks");
|
||||||
Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
|
List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
|
||||||
if (!candidateWorkersForReassignment.isEmpty()) {
|
if (!candidateWorkersForReassignment.isEmpty()) {
|
||||||
candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment);
|
candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (candidateWorkerLoad.isPresent()) {
|
if (!candidateWorkerLoad.isEmpty()) {
|
||||||
WorkerLoad workerLoad = candidateWorkerLoad.get();
|
log.debug("Assigning lost tasks to {} candidate workers: {}",
|
||||||
log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker());
|
candidateWorkerLoad.size(),
|
||||||
lostAssignments.connectors().forEach(workerLoad::assign);
|
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
|
||||||
lostAssignments.tasks().forEach(workerLoad::assign);
|
Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator();
|
||||||
|
for (String connector : lostAssignments.connectors()) {
|
||||||
|
// Loop over the the candidate workers as many times as it takes
|
||||||
|
if (!candidateWorkerIterator.hasNext()) {
|
||||||
|
candidateWorkerIterator = candidateWorkerLoad.iterator();
|
||||||
|
}
|
||||||
|
WorkerLoad worker = candidateWorkerIterator.next();
|
||||||
|
log.debug("Assigning connector id {} to member {}", connector, worker.worker());
|
||||||
|
worker.assign(connector);
|
||||||
|
}
|
||||||
|
candidateWorkerIterator = candidateWorkerLoad.iterator();
|
||||||
|
for (ConnectorTaskId task : lostAssignments.tasks()) {
|
||||||
|
if (!candidateWorkerIterator.hasNext()) {
|
||||||
|
candidateWorkerIterator = candidateWorkerLoad.iterator();
|
||||||
|
}
|
||||||
|
WorkerLoad worker = candidateWorkerIterator.next();
|
||||||
|
log.debug("Assigning task id {} to member {}", task, worker.worker());
|
||||||
|
worker.assign(task);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
|
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
|
||||||
newSubmissions.connectors().addAll(lostAssignments.connectors());
|
newSubmissions.connectors().addAll(lostAssignments.connectors());
|
||||||
|
@ -498,13 +515,13 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
|
private List<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
|
||||||
Map<String, WorkerLoad> activeWorkers = completeWorkerAssignment.stream()
|
Map<String, WorkerLoad> activeWorkers = completeWorkerAssignment.stream()
|
||||||
.collect(Collectors.toMap(WorkerLoad::worker, Function.identity()));
|
.collect(Collectors.toMap(WorkerLoad::worker, Function.identity()));
|
||||||
return candidateWorkersForReassignment.stream()
|
return candidateWorkersForReassignment.stream()
|
||||||
.map(activeWorkers::get)
|
.map(activeWorkers::get)
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.findFirst();
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -554,38 +571,37 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
|
||||||
// We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
|
// We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
|
||||||
log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
|
log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
|
||||||
int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
|
int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
|
||||||
log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
|
int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
|
||||||
|
log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
|
||||||
|
|
||||||
|
|
||||||
log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
|
log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
|
||||||
int floorTasks = totalActiveTasksNum / totalWorkersNum;
|
int floorTasks = totalActiveTasksNum / totalWorkersNum;
|
||||||
log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
|
int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
|
||||||
|
log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
|
||||||
|
int numToRevoke;
|
||||||
|
|
||||||
int numToRevoke = floorConnectors;
|
|
||||||
for (WorkerLoad existing : existingWorkers) {
|
for (WorkerLoad existing : existingWorkers) {
|
||||||
Iterator<String> connectors = existing.connectors().iterator();
|
Iterator<String> connectors = existing.connectors().iterator();
|
||||||
|
numToRevoke = existing.connectorsSize() - ceilConnectors;
|
||||||
for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
|
for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
|
||||||
ConnectorsAndTasks resources = revoking.computeIfAbsent(
|
ConnectorsAndTasks resources = revoking.computeIfAbsent(
|
||||||
existing.worker(),
|
existing.worker(),
|
||||||
w -> new ConnectorsAndTasks.Builder().build());
|
w -> new ConnectorsAndTasks.Builder().build());
|
||||||
resources.connectors().add(connectors.next());
|
resources.connectors().add(connectors.next());
|
||||||
}
|
}
|
||||||
if (numToRevoke == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
numToRevoke = floorTasks;
|
|
||||||
for (WorkerLoad existing : existingWorkers) {
|
for (WorkerLoad existing : existingWorkers) {
|
||||||
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
|
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
|
||||||
|
numToRevoke = existing.tasksSize() - ceilTasks;
|
||||||
|
log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
|
||||||
for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
|
for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
|
||||||
ConnectorsAndTasks resources = revoking.computeIfAbsent(
|
ConnectorsAndTasks resources = revoking.computeIfAbsent(
|
||||||
existing.worker(),
|
existing.worker(),
|
||||||
w -> new ConnectorsAndTasks.Builder().build());
|
w -> new ConnectorsAndTasks.Builder().build());
|
||||||
resources.tasks().add(tasks.next());
|
resources.tasks().add(tasks.next());
|
||||||
}
|
}
|
||||||
if (numToRevoke == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return revoking;
|
return revoking;
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -208,7 +209,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
|
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
|
||||||
"Connector tasks did not stop in time.");
|
"Connector tasks did not stop in time.");
|
||||||
|
|
||||||
waitForCondition(this::assertConnectorAndTasksAreUnique,
|
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
|
||||||
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +238,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
|
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
|
||||||
"Connector tasks did not start in time.");
|
"Connector tasks did not start in time.");
|
||||||
|
|
||||||
waitForCondition(this::assertConnectorAndTasksAreUnique,
|
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
|
||||||
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,7 +264,53 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
|
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
|
||||||
"Connect workers did not start in time.");
|
"Connect workers did not start in time.");
|
||||||
|
|
||||||
waitForCondition(this::assertConnectorAndTasksAreUnique,
|
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
|
||||||
|
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleWorkersRejoining() throws Exception {
|
||||||
|
// create test topic
|
||||||
|
connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
|
||||||
|
|
||||||
|
// setup up props for the source connector
|
||||||
|
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
|
||||||
|
|
||||||
|
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
|
||||||
|
"Connect workers did not start in time.");
|
||||||
|
|
||||||
|
// start a source connector
|
||||||
|
IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
|
||||||
|
|
||||||
|
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
|
||||||
|
"Connector tasks did not start in time.");
|
||||||
|
|
||||||
|
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
|
||||||
|
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
||||||
|
|
||||||
|
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||||
|
|
||||||
|
connect.removeWorker();
|
||||||
|
connect.removeWorker();
|
||||||
|
|
||||||
|
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 2,
|
||||||
|
"Connect workers did not stop in time.");
|
||||||
|
|
||||||
|
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||||
|
|
||||||
|
connect.addWorker();
|
||||||
|
connect.addWorker();
|
||||||
|
|
||||||
|
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
|
||||||
|
"Connect workers did not start in time.");
|
||||||
|
|
||||||
|
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + i, NUM_TASKS, "Connector tasks did not start in time.");
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
|
||||||
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,7 +329,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean assertConnectorAndTasksAreUnique() {
|
private boolean assertConnectorAndTasksAreUniqueAndBalanced() {
|
||||||
try {
|
try {
|
||||||
Map<String, Collection<String>> connectors = new HashMap<>();
|
Map<String, Collection<String>> connectors = new HashMap<>();
|
||||||
Map<String, Collection<String>> tasks = new HashMap<>();
|
Map<String, Collection<String>> tasks = new HashMap<>();
|
||||||
|
@ -296,7 +343,12 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
int maxConnectors = connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
|
int maxConnectors = connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
|
||||||
|
int minConnectors = connectors.values().stream().mapToInt(Collection::size).min().orElse(0);
|
||||||
int maxTasks = tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
|
int maxTasks = tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
|
||||||
|
int minTasks = tasks.values().stream().mapToInt(Collection::size).min().orElse(0);
|
||||||
|
|
||||||
|
log.debug("Connector balance: {}", formatAssignment(connectors));
|
||||||
|
log.debug("Task balance: {}", formatAssignment(tasks));
|
||||||
|
|
||||||
assertNotEquals("Found no connectors running!", maxConnectors, 0);
|
assertNotEquals("Found no connectors running!", maxConnectors, 0);
|
||||||
assertNotEquals("Found no tasks running!", maxTasks, 0);
|
assertNotEquals("Found no tasks running!", maxTasks, 0);
|
||||||
|
@ -306,6 +358,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
assertEquals("Task assignments are not unique: " + tasks,
|
assertEquals("Task assignments are not unique: " + tasks,
|
||||||
tasks.values().size(),
|
tasks.values().size(),
|
||||||
tasks.values().stream().distinct().collect(Collectors.toList()).size());
|
tasks.values().stream().distinct().collect(Collectors.toList()).size());
|
||||||
|
assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2);
|
||||||
|
assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), maxTasks - minTasks < 2);
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Could not check connector state info.", e);
|
log.error("Could not check connector state info.", e);
|
||||||
|
@ -313,4 +367,13 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String formatAssignment(Map<String, Collection<String>> assignment) {
|
||||||
|
StringBuilder result = new StringBuilder();
|
||||||
|
for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) {
|
||||||
|
result.append(String.format("\n%s=%s", worker, assignment.getOrDefault(worker,
|
||||||
|
Collections.emptyList())));
|
||||||
|
}
|
||||||
|
return result.toString();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -989,18 +989,24 @@ public class IncrementalCooperativeAssignorTest {
|
||||||
assignor.handleLostAssignments(lostAssignments, newSubmissions,
|
assignor.handleLostAssignments(lostAssignments, newSubmissions,
|
||||||
new ArrayList<>(configuredAssignment.values()), memberConfigs);
|
new ArrayList<>(configuredAssignment.values()), memberConfigs);
|
||||||
|
|
||||||
// newWorker joined first, so should be picked up first as a candidate for reassignment
|
// both the newWorkers would need to be considered for re assignment of connectors and tasks
|
||||||
|
List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
|
||||||
|
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
|
||||||
|
.connectors());
|
||||||
|
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
|
||||||
|
.connectors());
|
||||||
|
List<ConnectorTaskId> listOfTasksInLast2Workers = new ArrayList<>();
|
||||||
|
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
|
||||||
|
.tasks());
|
||||||
|
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
|
||||||
|
.tasks());
|
||||||
assertTrue("Wrong assignment of lost connectors",
|
assertTrue("Wrong assignment of lost connectors",
|
||||||
configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
|
listOfConnectorsInLast2Workers.containsAll(lostAssignments.connectors()));
|
||||||
.connectors()
|
|
||||||
.containsAll(lostAssignments.connectors()));
|
|
||||||
assertTrue("Wrong assignment of lost tasks",
|
assertTrue("Wrong assignment of lost tasks",
|
||||||
configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
|
listOfTasksInLast2Workers.containsAll(lostAssignments.tasks()));
|
||||||
.tasks()
|
|
||||||
.containsAll(lostAssignments.tasks()));
|
|
||||||
assertThat("Wrong set of workers for reassignments",
|
assertThat("Wrong set of workers for reassignments",
|
||||||
Collections.emptySet(),
|
Collections.emptySet(),
|
||||||
is(assignor.candidateWorkersForReassignment));
|
is(assignor.candidateWorkersForReassignment));
|
||||||
assertEquals(0, assignor.scheduledRebalance);
|
assertEquals(0, assignor.scheduledRebalance);
|
||||||
assertEquals(0, assignor.delay);
|
assertEquals(0, assignor.delay);
|
||||||
}
|
}
|
||||||
|
|
|
@ -302,23 +302,24 @@ public class WorkerCoordinatorIncrementalTest {
|
||||||
|
|
||||||
result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
|
result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
|
||||||
|
|
||||||
|
//Equally distributing tasks across member
|
||||||
leaderAssignment = deserializeAssignment(result, leaderId);
|
leaderAssignment = deserializeAssignment(result, leaderId);
|
||||||
assertAssignment(leaderId, offset,
|
assertAssignment(leaderId, offset,
|
||||||
Collections.emptyList(), 0,
|
Collections.emptyList(), 0,
|
||||||
Collections.emptyList(), 2,
|
Collections.emptyList(), 1,
|
||||||
leaderAssignment);
|
leaderAssignment);
|
||||||
|
|
||||||
memberAssignment = deserializeAssignment(result, memberId);
|
memberAssignment = deserializeAssignment(result, memberId);
|
||||||
assertAssignment(leaderId, offset,
|
assertAssignment(leaderId, offset,
|
||||||
Collections.emptyList(), 0,
|
Collections.emptyList(), 0,
|
||||||
Collections.emptyList(), 0,
|
Collections.emptyList(), 1,
|
||||||
memberAssignment);
|
memberAssignment);
|
||||||
|
|
||||||
ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
|
ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
|
||||||
assertAssignment(leaderId, offset,
|
assertAssignment(leaderId, offset,
|
||||||
Collections.emptyList(), 0,
|
Collections.emptyList(), 0,
|
||||||
Collections.emptyList(), 0,
|
Collections.emptyList(), 0,
|
||||||
anotherMemberAssignment);
|
anotherMemberAssignment);
|
||||||
|
|
||||||
verify(configStorage, times(configStorageCalls)).snapshot();
|
verify(configStorage, times(configStorageCalls)).snapshot();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue