MINOR: clean up some replication code (#10564)

Centralize leader and ISR changes in generateLeaderAndIsrUpdates.
Consolidate handleNodeDeactivated and handleNodeActivated into this
function.

Rename BrokersToIsrs#noLeaderIterator to BrokersToIsrs#partitionsWithNoLeader.
Create BrokersToIsrs#partitionsLedByBroker, BrokersToIsrs#partitionsWithBrokerInIsr

In ReplicationControlManagerTest, createTestTopic should be a member
function of ReplicationControlTestContext.  It should invoke
ReplicationControlTestContext#replay so that records are applied to all
parts of the test context.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2021-04-29 11:20:30 -07:00 committed by GitHub
parent a855f6ac37
commit 8d38189edd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 241 additions and 176 deletions

View File

@ -311,10 +311,18 @@ public class BrokersToIsrs {
return new PartitionsOnReplicaIterator(topicMap, leadersOnly);
}
PartitionsOnReplicaIterator noLeaderIterator() {
PartitionsOnReplicaIterator partitionsWithNoLeader() {
return iterator(NO_LEADER, true);
}
PartitionsOnReplicaIterator partitionsLedByBroker(int brokerId) {
return iterator(brokerId, true);
}
PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
return iterator(brokerId, false);
}
boolean hasLeaderships(int brokerId) {
return iterator(brokerId, true).hasNext();
}

View File

@ -374,6 +374,10 @@ public class ConfigurationControlManager {
configData.remove(new ConfigResource(Type.TOPIC, name));
}
boolean uncleanLeaderElectionEnabledForTopic(String name) {
return false; // TODO: support configuring unclean leader election.
}
class ConfigurationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<Entry<ConfigResource, TimelineHashMap<String, String>>> iterator;

View File

@ -72,9 +72,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
@ -172,47 +174,54 @@ public class ReplicationControlManager {
StringBuilder builder = new StringBuilder();
String prefix = "";
if (!Arrays.equals(replicas, prev.replicas)) {
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
builder.append(prefix).append("replicas: ").
append(Arrays.toString(prev.replicas)).
append(" -> ").append(Arrays.toString(replicas));
prefix = ", ";
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
}
if (!Arrays.equals(isr, prev.isr)) {
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
builder.append(prefix).append("isr: ").
append(Arrays.toString(prev.isr)).
append(" -> ").append(Arrays.toString(isr));
prefix = ", ";
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
}
if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
builder.append(prefix).append("oldRemovingReplicas=").
append(Arrays.toString(prev.removingReplicas));
builder.append(prefix).append("removingReplicas: ").
append(Arrays.toString(prev.removingReplicas)).
append(" -> ").append(Arrays.toString(removingReplicas));
prefix = ", ";
builder.append(prefix).append("newRemovingReplicas=").
append(Arrays.toString(removingReplicas));
}
if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
builder.append(prefix).append("oldAddingReplicas=").
append(Arrays.toString(prev.addingReplicas));
builder.append(prefix).append("addingReplicas: ").
append(Arrays.toString(prev.addingReplicas)).
append(" -> ").append(Arrays.toString(addingReplicas));
prefix = ", ";
builder.append(prefix).append("newAddingReplicas=").
append(Arrays.toString(addingReplicas));
}
if (leader != prev.leader) {
builder.append(prefix).append("oldLeader=").append(prev.leader);
builder.append(prefix).append("leader: ").
append(prev.leader).append(" -> ").append(leader);
prefix = ", ";
builder.append(prefix).append("newLeader=").append(leader);
}
if (leaderEpoch != prev.leaderEpoch) {
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
builder.append(prefix).append("leaderEpoch: ").
append(prev.leaderEpoch).append(" -> ").append(leaderEpoch);
prefix = ", ";
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
}
if (partitionEpoch != prev.partitionEpoch) {
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
prefix = ", ";
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
builder.append(prefix).append("partitionEpoch: ").
append(prev.partitionEpoch).append(" -> ").append(partitionEpoch);
}
return builder.toString();
}
void maybeLogPartitionChange(Logger log, String description, PartitionControlInfo prev) {
if (!electionWasClean(leader, prev.isr)) {
log.info("UNCLEAN partition change for {}: {}", description, diff(prev));
} else if (log.isDebugEnabled()) {
log.debug("partition change for {}: {}", description, diff(prev));
}
}
boolean hasLeader() {
return leader != NO_LEADER;
}
@ -231,7 +240,13 @@ public class ReplicationControlManager {
public boolean equals(Object o) {
if (!(o instanceof PartitionControlInfo)) return false;
PartitionControlInfo other = (PartitionControlInfo) o;
return diff(other).isEmpty();
return Arrays.equals(replicas, other.replicas) &&
Arrays.equals(isr, other.isr) &&
Arrays.equals(removingReplicas, other.removingReplicas) &&
Arrays.equals(addingReplicas, other.addingReplicas) &&
leader == other.leader &&
leaderEpoch == other.leaderEpoch &&
partitionEpoch == other.partitionEpoch;
}
@Override
@ -310,7 +325,7 @@ public class ReplicationControlManager {
topicsByName.put(record.name(), record.topicId());
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
log.info("Created topic {} with ID {}.", record.name(), record.topicId());
log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
}
public void replay(PartitionRecord record) {
@ -321,22 +336,18 @@ public class ReplicationControlManager {
}
PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId());
String description = topicInfo.name + "-" + record.partitionId() +
" with topic ID " + record.topicId();
if (prevPartInfo == null) {
log.info("Created partition {}:{} with {}.", record.topicId(),
record.partitionId(), newPartInfo.toString());
log.info("Created partition {} and {}.", description, newPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
} else {
String diff = newPartInfo.diff(prevPartInfo);
if (!diff.isEmpty()) {
log.info("Modified partition {}:{}: {}.", record.topicId(),
record.partitionId(), diff);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader,
newPartInfo.leader);
}
} else if (!newPartInfo.equals(prevPartInfo)) {
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
}
}
@ -356,7 +367,9 @@ public class ReplicationControlManager {
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader,
newPartitionInfo.leader);
log.debug("Applied ISR change record: {}", record.toString());
String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
record.topicId();
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
}
public void replay(RemoveTopicRecord record) {
@ -723,7 +736,8 @@ public class ReplicationControlManager {
if (brokerRegistration == null) {
throw new RuntimeException("Can't find broker registration for broker " + brokerId);
}
handleNodeDeactivated(brokerId, records);
generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0));
}
@ -740,60 +754,12 @@ public class ReplicationControlManager {
*/
void handleBrokerUnregistered(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
handleNodeDeactivated(brokerId, records);
generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0));
}
/**
* Handle a broker being deactivated. This means we remove it from any ISR that has
* more than one element. We do not remove the broker from ISRs where it is the only
* member since this would preclude clean leader election in the future.
* It is removed as the leader for all partitions it leads.
*
* @param brokerId The broker id.
* @param records The record list to append to.
*/
void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> records) {
Iterator<TopicIdPartition> iterator = brokersToIsrs.iterator(brokerId, false);
while (iterator.hasNext()) {
TopicIdPartition topicIdPartition = iterator.next();
TopicControlInfo topic = topics.get(topicIdPartition.topicId());
if (topic == null) {
throw new RuntimeException("Topic ID " + topicIdPartition.topicId() + " existed in " +
"isrMembers, but not in the topics map.");
}
PartitionControlInfo partition = topic.parts.get(topicIdPartition.partitionId());
if (partition == null) {
throw new RuntimeException("Partition " + topicIdPartition +
" existed in isrMembers, but not in the partitions map.");
}
PartitionChangeRecord record = new PartitionChangeRecord().
setPartitionId(topicIdPartition.partitionId()).
setTopicId(topic.id);
int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
if (newIsr.length == 0) {
// We don't want to shrink the ISR to size 0. So, leave the node in the ISR.
if (record.leader() != NO_LEADER) {
// The partition is now leaderless, so set its leader to -1.
record.setLeader(-1);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
} else {
record.setIsr(Replicas.toList(newIsr));
if (partition.leader == brokerId) {
// The fenced node will no longer be the leader.
int newLeader = bestLeader(partition.replicas, newIsr, false);
record.setLeader(newLeader);
} else {
// Bump the partition leader epoch.
record.setLeader(partition.leader);
}
records.add(new ApiMessageAndVersion(record, (short) 0));
}
}
}
/**
* Generate the appropriate records to handle a broker becoming unfenced.
*
@ -808,43 +774,12 @@ public class ReplicationControlManager {
void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
setId(brokerId).setEpoch(brokerEpoch), (short) 0));
handleNodeActivated(brokerId, records);
}
/**
* Handle a broker being activated. This means we check if it can become the leader
* for any partition that currently has no leader (aka offline partition).
*
* @param brokerId The broker id.
* @param records The record list to append to.
*/
void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
Iterator<TopicIdPartition> iterator = brokersToIsrs.noLeaderIterator();
while (iterator.hasNext()) {
TopicIdPartition topicIdPartition = iterator.next();
TopicControlInfo topic = topics.get(topicIdPartition.topicId());
if (topic == null) {
throw new RuntimeException("Topic ID " + topicIdPartition.topicId() + " existed in " +
"isrMembers, but not in the topics map.");
}
PartitionControlInfo partition = topic.parts.get(topicIdPartition.partitionId());
if (partition == null) {
throw new RuntimeException("Partition " + topicIdPartition +
" existed in isrMembers, but not in the partitions map.");
}
// TODO: if this partition is configured for unclean leader election,
// check the replica set rather than the ISR.
if (Replicas.contains(partition.isr, brokerId)) {
records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
setPartitionId(topicIdPartition.partitionId()).
setTopicId(topic.id).
setLeader(brokerId), (short) 0));
}
}
generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithNoLeader());
}
ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
boolean unclean = electionIsUnclean(request.electionType());
boolean uncleanOk = electionTypeIsUnclean(request.electionType());
List<ApiMessageAndVersion> records = new ArrayList<>();
ElectLeadersResponseData response = new ElectLeadersResponseData();
for (TopicPartitions topic : request.topicPartitions()) {
@ -852,7 +787,7 @@ public class ReplicationControlManager {
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
ApiError error = electLeader(topic.topic(), partitionId, unclean, records);
ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
@ -862,7 +797,7 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
static boolean electionIsUnclean(byte electionType) {
static boolean electionTypeIsUnclean(byte electionType) {
ElectionType type;
try {
type = ElectionType.valueOf(electionType);
@ -872,7 +807,7 @@ public class ReplicationControlManager {
return type == ElectionType.UNCLEAN;
}
ApiError electLeader(String topic, int partitionId, boolean unclean,
ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
@ -889,7 +824,8 @@ public class ReplicationControlManager {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, unclean);
int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, uncleanOk,
r -> clusterControl.unfenced(r));
if (newLeader == NO_LEADER) {
// If we can't find any leader for the partition, return an error.
return new ApiError(Errors.LEADER_NOT_AVAILABLE,
@ -907,13 +843,13 @@ public class ReplicationControlManager {
}
PartitionChangeRecord record = new PartitionChangeRecord().
setPartitionId(partitionId).
setTopicId(topicId);
if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) {
// If the election was unclean, we may have to forcibly add the replica to
// the ISR. This can result in data loss!
setTopicId(topicId).
setLeader(newLeader);
if (!electionWasClean(newLeader, partitionInfo.isr)) {
// If the election was unclean, we have to forcibly set the ISR to just the
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(newLeader));
}
record.setLeader(newLeader);
records.add(new ApiMessageAndVersion(record, (short) 0));
return ApiError.NONE;
}
@ -936,10 +872,8 @@ public class ReplicationControlManager {
handleBrokerUnfenced(brokerId, brokerEpoch, records);
break;
case CONTROLLED_SHUTDOWN:
// Note: we always bump the leader epoch of each partition that the
// shutting down broker is in here. This prevents the broker from
// getting re-added to the ISR later.
handleNodeDeactivated(brokerId, records);
generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]",
brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
break;
case SHUTDOWN_NOW:
handleBrokerFenced(brokerId, records);
@ -957,22 +891,27 @@ public class ReplicationControlManager {
return ControllerResult.of(records, reply);
}
int bestLeader(int[] replicas, int[] isr, boolean unclean) {
static boolean isGoodLeader(int[] isr, int leader) {
return Replicas.contains(isr, leader);
}
static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,
Function<Integer, Boolean> isAcceptableLeader) {
int bestUnclean = NO_LEADER;
for (int i = 0; i < replicas.length; i++) {
int replica = replicas[i];
if (Replicas.contains(isr, replica)) {
return replica;
}
}
if (unclean) {
for (int i = 0; i < replicas.length; i++) {
int replica = replicas[i];
if (clusterControl.unfenced(replica)) {
if (isAcceptableLeader.apply(replica)) {
if (bestUnclean == NO_LEADER) bestUnclean = replica;
if (Replicas.contains(isr, replica)) {
return replica;
}
}
}
return NO_LEADER;
return uncleanOk ? bestUnclean : NO_LEADER;
}
static boolean electionWasClean(int newLeader, int[] isr) {
return newLeader == NO_LEADER || Replicas.contains(isr, newLeader);
}
public ControllerResult<Void> unregisterBroker(int brokerId) {
@ -1119,6 +1058,83 @@ public class ReplicationControlManager {
}
}
/**
* Iterate over a sequence of partitions and generate ISR changes and/or leader
* changes if necessary.
*
* @param context A human-readable context string used in log4j logging.
* @param brokerToRemove NO_LEADER if no broker is being removed; the ID of the
* broker to remove from the ISR and leadership, otherwise.
* @param brokerToAdd NO_LEADER if no broker is being added; the ID of the
* broker which is now eligible to be a leader, otherwise.
* @param records A list of records which we will append to.
* @param iterator The iterator containing the partitions to examine.
*/
void generateLeaderAndIsrUpdates(String context,
int brokerToRemove,
int brokerToAdd,
List<ApiMessageAndVersion> records,
Iterator<TopicIdPartition> iterator) {
int oldSize = records.size();
Function<Integer, Boolean> isAcceptableLeader =
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.unfenced(r));
while (iterator.hasNext()) {
TopicIdPartition topicIdPart = iterator.next();
TopicControlInfo topic = topics.get(topicIdPart.topicId());
if (topic == null) {
throw new RuntimeException("Topic ID " + topicIdPart.topicId() +
" existed in isrMembers, but not in the topics map.");
}
PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId());
if (partition == null) {
throw new RuntimeException("Partition " + topicIdPart +
" existed in isrMembers, but not in the partitions map.");
}
int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
int newLeader;
if (isGoodLeader(newIsr, partition.leader)) {
// If the current leader is good, don't change.
newLeader = partition.leader;
} else {
// Choose a new leader.
boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, isAcceptableLeader);
}
if (!electionWasClean(newLeader, newIsr)) {
// After an unclean leader election, the ISR is reset to just the new leader.
newIsr = new int[] {newLeader};
} else if (newIsr.length == 0) {
// We never want to shrink the ISR to size 0.
newIsr = partition.isr;
}
PartitionChangeRecord record = new PartitionChangeRecord().
setPartitionId(topicIdPart.partitionId()).
setTopicId(topic.id);
if (newLeader != partition.leader) record.setLeader(newLeader);
if (!Arrays.equals(newIsr, partition.isr)) record.setIsr(Replicas.toList(newIsr));
if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
records.add(new ApiMessageAndVersion(record, (short) 0));
}
}
if (records.size() != oldSize) {
if (log.isDebugEnabled()) {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize);
iter.hasNext(); ) {
ApiMessageAndVersion apiMessageAndVersion = iter.next();
PartitionChangeRecord record = (PartitionChangeRecord) apiMessageAndVersion.message();
bld.append(prefix).append(topics.get(record.topicId()).name).append("-").
append(record.partitionId());
prefix = ", ";
}
log.debug("{}: changing partition(s): {}", context, bld.toString());
} else if (log.isInfoEnabled()) {
log.info("{}: changing {} partition(s)", context, records.size() - oldSize);
}
}
}
class ReplicationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<TopicControlInfo> iterator;

View File

@ -101,9 +101,9 @@ public class BrokersToIsrsTest {
assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
toSet(brokersToIsrs.iterator(3, true)));
assertEquals(toSet(), toSet(brokersToIsrs.iterator(2, true)));
assertEquals(toSet(), toSet(brokersToIsrs.noLeaderIterator()));
assertEquals(toSet(), toSet(brokersToIsrs.partitionsWithNoLeader()));
brokersToIsrs.update(UUIDS[0], 2, new int[]{1, 2, 3}, new int[]{1, 2, 3}, 3, -1);
assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
toSet(brokersToIsrs.noLeaderIterator()));
toSet(brokersToIsrs.partitionsWithNoLeader()));
}
}

View File

@ -185,7 +185,7 @@ public class QuorumControllerTest {
topicPartitionFuture = active.appendReadEvent(
"debugGetPartition", () -> {
Iterator<TopicIdPartition> iterator = active.
replicationControl().brokersToIsrs().noLeaderIterator();
replicationControl().brokersToIsrs().partitionsWithNoLeader();
assertTrue(iterator.hasNext());
return iterator.next();
});

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
@ -105,6 +106,27 @@ public class ReplicationControlManagerTest {
ReplicationControlTestContext() {
clusterControl.activate();
}
CreatableTopicResult createTestTopic(String name, int[][] replicas) throws Exception {
assertFalse(replicas.length == 0);
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
topic.setNumPartitions(-1).setReplicationFactor((short) -1);
for (int i = 0; i < replicas.length; i++) {
topic.assignments().add(new CreatableReplicaAssignment().
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
}
request.topics().add(topic);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
CreatableTopicResult topicResult = result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals((short) 0, topicResult.errorCode());
assertEquals(replicas.length, topicResult.numPartitions());
assertEquals(replicas[0].length, topicResult.replicationFactor());
replay(result.records());
return topicResult;
}
}
private static void registerBroker(int brokerId, ReplicationControlTestContext ctx) {
@ -125,7 +147,7 @@ public class ReplicationControlManagerTest {
setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).setCurrentMetadataOffset(1).
setWantFence(false).setWantShutDown(false), 0);
assertEquals(new BrokerHeartbeatReply(true, false, false, false), result.response());
ControllerTestUtils.replayAll(ctx.clusterControl, result.records());
ctx.replay(result.records());
}
@Test
@ -157,7 +179,7 @@ public class ReplicationControlManagerTest {
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(result2.response().topics().find("foo").topicId()));
assertEquals(expectedResponse2, result2.response());
ControllerTestUtils.replayAll(replicationControl, result2.records());
ctx.replay(result2.records());
assertEquals(new PartitionControlInfo(new int[] {2, 0, 1},
new int[] {2, 0, 1}, null, null, 2, 0, 0),
replicationControl.getPartition(
@ -197,29 +219,6 @@ public class ReplicationControlManagerTest {
assertEquals(expectedTopicErrors, topicErrors);
}
private static CreatableTopicResult createTestTopic(
ReplicationControlManager replicationControl, String name,
int[][] replicas) throws Exception {
assertFalse(replicas.length == 0);
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
topic.setNumPartitions(-1).setReplicationFactor((short) -1);
for (int i = 0; i < replicas.length; i++) {
topic.assignments().add(new CreatableReplicaAssignment().
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
}
request.topics().add(topic);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
CreatableTopicResult topicResult = result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals((short) 0, topicResult.errorCode());
assertEquals(replicas.length, topicResult.numPartitions());
assertEquals(replicas[0].length, topicResult.replicationFactor());
ControllerTestUtils.replayAll(replicationControl, result.records());
return topicResult;
}
@Test
public void testRemoveLeaderships() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@ -228,7 +227,7 @@ public class ReplicationControlManagerTest {
registerBroker(i, ctx);
unfenceBroker(i, ctx);
}
CreatableTopicResult result = createTestTopic(replicationControl, "foo",
CreatableTopicResult result = ctx.createTestTopic("foo",
new int[][] {
new int[] {0, 1, 2},
new int[] {1, 2, 3},
@ -241,8 +240,8 @@ public class ReplicationControlManagerTest {
assertEquals(expectedPartitions, ControllerTestUtils.
iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
List<ApiMessageAndVersion> records = new ArrayList<>();
replicationControl.handleNodeDeactivated(0, records);
ControllerTestUtils.replayAll(replicationControl, records);
replicationControl.handleBrokerFenced(0, records);
ctx.replay(records);
assertEquals(Collections.emptySet(), ControllerTestUtils.
iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
}
@ -255,7 +254,7 @@ public class ReplicationControlManagerTest {
registerBroker(i, ctx);
unfenceBroker(i, ctx);
}
CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
@ -287,7 +286,7 @@ public class ReplicationControlManagerTest {
registerBroker(i, ctx);
unfenceBroker(i, ctx);
}
CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
@ -652,4 +651,42 @@ public class ReplicationControlManagerTest {
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
OptionalInt.of(3))).getMessage());
}
@Test
public void testElectionWasClean() {
assertTrue(ReplicationControlManager.electionWasClean(1, new int[] {1, 2}));
assertFalse(ReplicationControlManager.electionWasClean(1, new int[] {0, 2}));
assertFalse(ReplicationControlManager.electionWasClean(1, new int[] {}));
assertTrue(ReplicationControlManager.electionWasClean(3, new int[] {1, 2, 3, 4, 5, 6}));
}
@Test
public void testPartitionControlInfoMergeAndDiff() {
PartitionControlInfo a = new PartitionControlInfo(
new int[]{1, 2, 3}, new int[]{1, 2}, null, null, 1, 0, 0);
PartitionControlInfo b = new PartitionControlInfo(
new int[]{1, 2, 3}, new int[]{3}, null, null, 3, 1, 1);
PartitionControlInfo c = new PartitionControlInfo(
new int[]{1, 2, 3}, new int[]{1}, null, null, 1, 0, 1);
assertEquals(b, a.merge(new PartitionChangeRecord().
setLeader(3).setIsr(Arrays.asList(3))));
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
b.diff(a));
assertEquals("isr: [1, 2] -> [1], partitionEpoch: 0 -> 1",
c.diff(a));
}
@Test
public void testBestLeader() {
assertEquals(2, ReplicationControlManager.bestLeader(
new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
assertEquals(3, ReplicationControlManager.bestLeader(
new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
assertEquals(4, ReplicationControlManager.bestLeader(
new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
assertEquals(-1, ReplicationControlManager.bestLeader(
new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
assertEquals(4, ReplicationControlManager.bestLeader(
new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
}
}