KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders (#15139)

This originally was #14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.

I've cut out the second part of the change since the first part is much simpler.

Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)

Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Justine Olshan 2024-01-23 14:58:53 -08:00 committed by GitHub
parent 0ef89a7cc0
commit e00d36b9c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 85 additions and 15 deletions

View File

@ -301,7 +301,7 @@ class BrokerMetadataPublisher(
changes.deletes.forEach { topicPartition => changes.deletes.forEach { topicPartition =>
resignation(topicPartition.partition, None) resignation(topicPartition.partition, None)
} }
changes.leaders.forEach { (topicPartition, partitionInfo) => changes.electedLeaders.forEach { (topicPartition, partitionInfo) =>
election(topicPartition.partition, partitionInfo.partition.leaderEpoch) election(topicPartition.partition, partitionInfo.partition.leaderEpoch)
} }
changes.followers.forEach { (topicPartition, partitionInfo) => changes.followers.forEach { (topicPartition, partitionInfo) =>

View File

@ -26,8 +26,13 @@ import java.util.Set;
import java.util.Map; import java.util.Map;
public final class LocalReplicaChanges { public final class LocalReplicaChanges {
// partitions for which the broker is not a replica anymore
private final Set<TopicPartition> deletes; private final Set<TopicPartition> deletes;
// partitions for which the broker is now a leader (leader epoch bump on the leader)
private final Map<TopicPartition, PartitionInfo> electedLeaders;
// partitions for which the isr or replicas change if the broker is a leader (partition epoch bump on the leader)
private final Map<TopicPartition, PartitionInfo> leaders; private final Map<TopicPartition, PartitionInfo> leaders;
// partitions for which the broker is now a follower or follower with isr or replica updates (partition epoch bump on follower)
private final Map<TopicPartition, PartitionInfo> followers; private final Map<TopicPartition, PartitionInfo> followers;
// The topic name -> topic id map in leaders and followers changes // The topic name -> topic id map in leaders and followers changes
private final Map<String, Uuid> topicIds; private final Map<String, Uuid> topicIds;
@ -35,12 +40,14 @@ public final class LocalReplicaChanges {
LocalReplicaChanges( LocalReplicaChanges(
Set<TopicPartition> deletes, Set<TopicPartition> deletes,
Map<TopicPartition, PartitionInfo> electedLeaders,
Map<TopicPartition, PartitionInfo> leaders, Map<TopicPartition, PartitionInfo> leaders,
Map<TopicPartition, PartitionInfo> followers, Map<TopicPartition, PartitionInfo> followers,
Map<String, Uuid> topicIds, Map<String, Uuid> topicIds,
Map<TopicIdPartition, Uuid> directoryIds Map<TopicIdPartition, Uuid> directoryIds
) { ) {
this.deletes = deletes; this.deletes = deletes;
this.electedLeaders = electedLeaders;
this.leaders = leaders; this.leaders = leaders;
this.followers = followers; this.followers = followers;
this.topicIds = topicIds; this.topicIds = topicIds;
@ -51,6 +58,10 @@ public final class LocalReplicaChanges {
return deletes; return deletes;
} }
public Map<TopicPartition, PartitionInfo> electedLeaders() {
return electedLeaders;
}
public Map<TopicPartition, PartitionInfo> leaders() { public Map<TopicPartition, PartitionInfo> leaders() {
return leaders; return leaders;
} }
@ -66,8 +77,9 @@ public final class LocalReplicaChanges {
@Override @Override
public String toString() { public String toString() {
return String.format( return String.format(
"LocalReplicaChanges(deletes = %s, leaders = %s, followers = %s)", "LocalReplicaChanges(deletes = %s, newly elected leaders = %s, leaders = %s, followers = %s)",
deletes, deletes,
electedLeaders,
leaders, leaders,
followers followers
); );

View File

@ -120,15 +120,20 @@ public final class TopicDelta {
* Find the partitions that have change based on the replica given. * Find the partitions that have change based on the replica given.
* *
* The changes identified are: * The changes identified are:
* 1. partitions for which the broker is not a replica anymore * 1. deletes: partitions for which the broker is not a replica anymore
* 2. partitions for which the broker is now the leader * 2. electedLeaders: partitions for which the broker is now a leader (leader epoch bump on the leader)
* 3. partitions for which the broker is now a follower * 3. leaders: partitions for which the isr or replicas change if the broker is a leader (partition epoch bump on the leader)
* 4. followers: partitions for which the broker is now a follower or follower with isr or replica updates (partition epoch bump on follower)
*
* Leader epoch bumps are a strict subset of all partition epoch bumps, so all partitions in electedLeaders will be in leaders.
* *
* @param brokerId the broker id * @param brokerId the broker id
* @return the list of partitions which the broker should remove, become leader or become follower. * @return the list of partitions which the broker should remove, become leader, become/update leader, or become/update follower.
*/ */
@SuppressWarnings("checkstyle:cyclomaticComplexity")
public LocalReplicaChanges localChanges(int brokerId) { public LocalReplicaChanges localChanges(int brokerId) {
Set<TopicPartition> deletes = new HashSet<>(); Set<TopicPartition> deletes = new HashSet<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> electedLeaders = new HashMap<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>(); Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>(); Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>();
Map<String, Uuid> topicIds = new HashMap<>(); Map<String, Uuid> topicIds = new HashMap<>();
@ -143,10 +148,12 @@ public final class TopicDelta {
} else if (entry.getValue().leader == brokerId) { } else if (entry.getValue().leader == brokerId) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey()); PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) { if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
leaders.put( TopicPartition tp = new TopicPartition(name(), entry.getKey());
new TopicPartition(name(), entry.getKey()), LocalReplicaChanges.PartitionInfo partitionInfo = new LocalReplicaChanges.PartitionInfo(id(), entry.getValue());
new LocalReplicaChanges.PartitionInfo(id(), entry.getValue()) leaders.put(tp, partitionInfo);
); if (prevPartition == null || prevPartition.leaderEpoch != entry.getValue().leaderEpoch) {
electedLeaders.put(tp, partitionInfo);
}
topicIds.putIfAbsent(name(), id()); topicIds.putIfAbsent(name(), id());
} }
} else if ( } else if (
@ -180,7 +187,7 @@ public final class TopicDelta {
} }
} }
return new LocalReplicaChanges(deletes, leaders, followers, topicIds, directoryIds); return new LocalReplicaChanges(deletes, electedLeaders, leaders, followers, topicIds, directoryIds);
} }
@Override @Override

View File

@ -180,15 +180,19 @@ public final class TopicsDelta {
* Find the topic partitions that have change based on the replica given. * Find the topic partitions that have change based on the replica given.
* *
* The changes identified are: * The changes identified are:
* 1. topic partitions for which the broker is not a replica anymore * 1. deletes: partitions for which the broker is not a replica anymore
* 2. topic partitions for which the broker is now the leader * 2. electedLeaders: partitions for which the broker is now a leader (leader epoch bump on the leader)
* 3. topic partitions for which the broker is now a follower * 3. leaders: partitions for which the isr or replicas change if the broker is a leader (partition epoch bump on the leader)
* 4. followers: partitions for which the broker is now a follower or follower with isr or replica updates (partition epoch bump on follower)
*
* Leader epoch bumps are a strict subset of all partition epoch bumps, so all partitions in electedLeaders will be in leaders.
* *
* @param brokerId the broker id * @param brokerId the broker id
* @return the list of topic partitions which the broker should remove, become leader or become follower. * @return the list of topic partitions which the broker should remove, become leader or become follower.
*/ */
public LocalReplicaChanges localChanges(int brokerId) { public LocalReplicaChanges localChanges(int brokerId) {
Set<TopicPartition> deletes = new HashSet<>(); Set<TopicPartition> deletes = new HashSet<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> electedLeaders = new HashMap<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>(); Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>(); Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>();
Map<String, Uuid> topicIds = new HashMap<>(); Map<String, Uuid> topicIds = new HashMap<>();
@ -198,6 +202,7 @@ public final class TopicsDelta {
LocalReplicaChanges changes = delta.localChanges(brokerId); LocalReplicaChanges changes = delta.localChanges(brokerId);
deletes.addAll(changes.deletes()); deletes.addAll(changes.deletes());
electedLeaders.putAll(changes.electedLeaders());
leaders.putAll(changes.leaders()); leaders.putAll(changes.leaders());
followers.putAll(changes.followers()); followers.putAll(changes.followers());
topicIds.putAll(changes.topicIds()); topicIds.putAll(changes.topicIds());
@ -214,7 +219,7 @@ public final class TopicsDelta {
}); });
}); });
return new LocalReplicaChanges(deletes, leaders, followers, topicIds, directoryIds); return new LocalReplicaChanges(deletes, electedLeaders, leaders, followers, topicIds, directoryIds);
} }
@Override @Override

View File

@ -228,6 +228,10 @@ public class TopicsImageTest {
new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1))), new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1))),
changes.deletes() changes.deletes()
); );
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
changes.electedLeaders().keySet()
);
assertEquals( assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))), new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
changes.leaders().keySet() changes.leaders().keySet()
@ -281,6 +285,7 @@ public class TopicsImageTest {
LocalReplicaChanges changes = delta.localChanges(localId); LocalReplicaChanges changes = delta.localChanges(localId);
assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes()); assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes());
assertEquals(Collections.emptyMap(), changes.electedLeaders());
assertEquals(Collections.emptyMap(), changes.leaders()); assertEquals(Collections.emptyMap(), changes.leaders());
assertEquals(Collections.emptyMap(), changes.followers()); assertEquals(Collections.emptyMap(), changes.followers());
@ -290,6 +295,43 @@ public class TopicsImageTest {
testToImage(finalImage, Optional.of(imageRecords)); testToImage(finalImage, Optional.of(imageRecords));
} }
@Test
public void testUpdatedLeaders() {
int localId = 3;
Uuid zooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"zoo",
zooId,
newPartition(new int[] {localId, 1, 2})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
newTopicsByNameMap(topics));
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(zooId).setPartitionId(0).setIsr(Arrays.asList(localId, 1)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
LocalReplicaChanges changes = delta.localChanges(localId);
assertEquals(Collections.emptySet(), changes.deletes());
assertEquals(Collections.emptyMap(), changes.electedLeaders());
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))),
changes.leaders().keySet()
);
assertEquals(Collections.emptyMap(), changes.followers());
}
@Test @Test
public void testLocalReassignmentChanges() { public void testLocalReassignmentChanges() {
int localId = 3; int localId = 3;
@ -380,6 +422,10 @@ public class TopicsImageTest {
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 2), new TopicPartition("zoo", 3))), new HashSet<>(Arrays.asList(new TopicPartition("zoo", 2), new TopicPartition("zoo", 3))),
changes.deletes() changes.deletes()
); );
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0), new TopicPartition("zoo", 4))),
changes.electedLeaders().keySet()
);
assertEquals( assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0), new TopicPartition("zoo", 4))), new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0), new TopicPartition("zoo", 4))),
changes.leaders().keySet() changes.leaders().keySet()