KAFKA-15003: Fix ZK sync logic for partition assignments (#13735)

Fixed the metadata change events in the Migration component to check correctly for the diff in
existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check
exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes
using a snapshot in the event of Controller failover.

Add migration client and integration tests to verify the change.

Co-authored-by: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>
This commit is contained in:
David Arthur 2023-06-01 15:32:41 -07:00 committed by Colin P. McCabe
parent 47551ea369
commit f499662923
9 changed files with 479 additions and 35 deletions

View File

@ -113,24 +113,9 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
zkClient.defaultAcls(path),
CreateMode.PERSISTENT)
}
val createPartitionsZNode = {
val path = TopicPartitionsZNode.path(topicName)
CreateRequest(
path,
null,
zkClient.defaultAcls(path),
CreateMode.PERSISTENT)
}
val createPartitionZNodeReqs = createTopicPartitionZNodesRequests(topicName, partitions, state)
val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
val topicPartition = new TopicPartition(topicName, partitionId)
Seq(
createTopicPartition(topicPartition),
createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
)
}
val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
val requests = Seq(createTopicZNode) ++ createPartitionZNodeReqs
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
if (resultCodes(TopicZNode.path(topicName)).equals(Code.NODEEXISTS)) {
@ -145,6 +130,31 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
}
}
private def createTopicPartitionZNodesRequests(
topicName: String,
partitions: util.Map[Integer, PartitionRegistration],
state: ZkMigrationLeadershipState
): Seq[CreateRequest] = {
val createPartitionsZNode = {
val path = TopicPartitionsZNode.path(topicName)
CreateRequest(
path,
null,
zkClient.defaultAcls(path),
CreateMode.PERSISTENT)
}
val createPartitionZNodeReqs = partitions.asScala.toSeq.flatMap { case (partitionId, partition) =>
val topicPartition = new TopicPartition(topicName, partitionId)
Seq(
createTopicPartition(topicPartition),
createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
)
}
Seq(createPartitionsZNode) ++ createPartitionZNodeReqs
}
private def recursiveChildren(path: String, acc: ArrayBuffer[String]): Unit = {
val topicChildZNodes = zkClient.retryRequestUntilConnected(GetChildrenRequest(path, registerWatch = false))
topicChildZNodes.children.foreach { child =>
@ -159,6 +169,30 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
buffer.toSeq
}
override def updateTopic(
topicName: String,
topicId: Uuid,
partitions: util.Map[Integer, PartitionRegistration],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val assignments = partitions.asScala.map { case (partitionId, partition) =>
new TopicPartition(topicName, partitionId) ->
ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
}
val request = SetDataRequest(
TopicZNode.path(topicName),
TopicZNode.encode(Some(topicId), assignments),
ZkVersion.MatchAnyVersion
)
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
state.withMigrationZkVersion(migrationZkVersion)
} else {
throw new MigrationClientException(s"Failed to update topic metadata: $topicName. ZK transaction had results $resultCodes")
}
}
override def deleteTopic(
topicName: String,
state: ZkMigrationLeadershipState
@ -181,6 +215,21 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
}
}
override def createTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]], state: ZkMigrationLeadershipState)
:ZkMigrationLeadershipState = wrapZkException {
val requests = topicPartitions.asScala.toSeq.flatMap { case (topicName, partitions) =>
createTopicPartitionZNodesRequests(topicName, partitions, state)
}
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) || code.equals(Code.NODEEXISTS) }) {
state.withMigrationZkVersion(migrationZkVersion)
} else {
throw new MigrationClientException(s"Failed to create partition states: $topicPartitions. ZK transaction had results $resultCodes")
}
}
override def updateTopicPartitions(
topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
state: ZkMigrationLeadershipState
@ -204,6 +253,30 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
}
}
override def deleteTopicPartitions(
topicPartitions: util.Map[String, util.Set[Integer]],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = {
val requests = topicPartitions.asScala.flatMap { case (topicName, partitionIds) =>
partitionIds.asScala.map { partitionId =>
val topicPartition = new TopicPartition(topicName, partitionId)
val path = TopicPartitionZNode.path(topicPartition)
DeleteRequest(path, ZkVersion.MatchAnyVersion)
}
}
if (requests.isEmpty) {
state
} else {
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) {
state.withMigrationZkVersion(migrationZkVersion)
} else {
throw new MigrationClientException(s"Failed to delete partition states: $topicPartitions. ZK transaction had results $resultCodes")
}
}
}
private def createTopicPartition(
topicPartition: TopicPartition
): CreateRequest = wrapZkException {

View File

@ -26,7 +26,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.{PasswordEncoder, TestUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, READ, WRITE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
@ -385,6 +385,89 @@ class ZkMigrationIntegrationTest {
}
}
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
))
def testNewAndChangedTopicsInDualWrite(zkCluster: ClusterInstance): Unit = {
// Create a topic in ZK mode
val topicName = "test"
var admin = zkCluster.createAdminClient()
val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
zkCluster.waitForReadyBrokers()
readyFuture.get(30, TimeUnit.SECONDS)
// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
// Alter the metadata
log.info("Create new topic with AdminClient")
admin = zkCluster.createAdminClient()
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic(topicName, 2, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(60, TimeUnit.SECONDS)
val existingPartitions = Seq(new TopicPartition(topicName, 0), new TopicPartition(topicName, 1))
// Verify the changes made to KRaft are seen in ZK
verifyTopicPartitionMetadata(topicName, existingPartitions, zkClient)
log.info("Create new partitions with AdminClient")
admin.createPartitions(Map(topicName -> NewPartitions.increaseTo(3)).asJava).all().get(60, TimeUnit.SECONDS)
// Verify the changes seen in Zk.
verifyTopicPartitionMetadata(topicName, existingPartitions ++ Seq(new TopicPartition(topicName, 2)), zkClient)
} finally {
zkCluster.stop()
kraftCluster.close()
}
}
def verifyTopicPartitionMetadata(topicName: String, partitions: Seq[TopicPartition], zkClient: KafkaZkClient): Unit = {
val (topicIdReplicaAssignment, success) = TestUtils.computeUntilTrue(
zkClient.getReplicaAssignmentAndTopicIdForTopics(Set(topicName)).headOption) {
x => x.exists(_.assignment.size == partitions.size)
}
assertTrue(success, "Unable to find topic metadata in Zk")
TestUtils.waitUntilTrue(() =>{
val lisrMap = zkClient.getTopicPartitionStates(partitions.toSeq)
lisrMap.size == partitions.size &&
lisrMap.forall { case (tp, lisr) =>
lisr.leaderAndIsr.leader >= 0 &&
topicIdReplicaAssignment.exists(_.assignment(tp).replicas == lisr.leaderAndIsr.isr)
}
}, "Unable to find topic partition metadata")
}
def allocateProducerId(bootstrapServers: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", bootstrapServers)

View File

@ -101,7 +101,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
}
@Test
def testCreateNewPartitions(): Unit = {
def testCreateNewTopic(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
val partitions = Map(
@ -364,4 +364,60 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
}
}
}
@Test
def testUpdateExistingTopicWithNewAndChangedPartitions(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
val topicId = Uuid.randomUuid()
val partitions = Map(
0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
// Change assignment in partitions and update the topic assignment. See the change is
// reflected.
val changedPartitions = Map(
0 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
1 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
migrationState = migrationClient.topicClient().updateTopic("test", topicId, changedPartitions, migrationState)
assertEquals(2, migrationState.migrationZkVersion())
// Read the changed partition with zkClient.
val topicReplicaAssignmentFromZk = zkClient.getReplicaAssignmentAndTopicIdForTopics(Set("test"))
assertEquals(1, topicReplicaAssignmentFromZk.size)
assertEquals(Some(topicId), topicReplicaAssignmentFromZk.head.topicId);
topicReplicaAssignmentFromZk.head.assignment.foreach { case (tp, assignment) =>
tp.partition() match {
case p if p <=1 =>
assertEquals(changedPartitions.get(p).replicas.toSeq, assignment.replicas)
assertEquals(changedPartitions.get(p).addingReplicas.toSeq, assignment.addingReplicas)
assertEquals(changedPartitions.get(p).removingReplicas.toSeq, assignment.removingReplicas)
case p => fail(s"Found unknown partition $p")
}
}
// Add a new Partition.
val newPartition = Map(
2 -> new PartitionRegistration(Array(2, 3, 4), Array(2, 3, 4), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
).map { case (k, v) => int2Integer(k) -> v }.asJava
migrationState = migrationClient.topicClient().createTopicPartitions(Map("test" -> newPartition).asJava, migrationState)
assertEquals(3, migrationState.migrationZkVersion())
// Read new partition from Zk.
val newPartitionFromZk = zkClient.getTopicPartitionState(new TopicPartition("test", 2))
assertTrue(newPartitionFromZk.isDefined)
newPartitionFromZk.foreach { part =>
val expectedPartition = newPartition.get(2)
assertEquals(expectedPartition.leader, part.leaderAndIsr.leader)
// Since KRaft increments partition epoch on change.
assertEquals(expectedPartition.partitionEpoch + 1, part.leaderAndIsr.partitionEpoch)
assertEquals(expectedPartition.leaderEpoch, part.leaderAndIsr.leaderEpoch)
assertEquals(expectedPartition.leaderRecoveryState, part.leaderAndIsr.leaderRecoveryState)
assertEquals(expectedPartition.isr.toList, part.leaderAndIsr.isr)
}
}
}

View File

@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Represents changes to a topic in the metadata image.
@ -49,6 +50,14 @@ public final class TopicDelta {
return partitionChanges;
}
public Map<Integer, PartitionRegistration> newPartitions() {
return partitionChanges
.entrySet()
.stream()
.filter(entry -> !image.partitions().containsKey(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public String name() {
return image.name();
}
@ -92,6 +101,20 @@ public final class TopicDelta {
return new TopicImage(image.name(), image.id(), newPartitions);
}
public boolean hasPartitionsWithAssignmentChanges() {
for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) {
int partitionId = entry.getKey();
// New Partition.
if (!image.partitions().containsKey(partitionId))
return true;
PartitionRegistration previousPartition = image.partitions().get(partitionId);
PartitionRegistration currentPartition = entry.getValue();
if (!previousPartition.hasSameAssignment(currentPartition))
return true;
}
return false;
}
/**
* Find the partitions that have change based on the replica given.
*

View File

@ -235,4 +235,10 @@ public class PartitionRegistration {
builder.append(")");
return builder.toString();
}
public boolean hasSameAssignment(PartitionRegistration registration) {
return Arrays.equals(this.replicas, registration.replicas) &&
Arrays.equals(this.addingReplicas, registration.addingReplicas) &&
Arrays.equals(this.removingReplicas, registration.removingReplicas);
}
}

View File

@ -64,8 +64,10 @@ public class KRaftMigrationZkWriter {
private static final String UPDATE_PRODUCER_ID = "UpdateProducerId";
private static final String CREATE_TOPIC = "CreateTopic";
private static final String UPDATE_TOPIC = "UpdateTopic";
private static final String DELETE_TOPIC = "DeleteTopic";
private static final String UPDATE_PARTITON = "UpdatePartition";
private static final String DELETE_PARTITION = "DeletePartition";
private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig";
private static final String UPDATE_TOPIC_CONFIG = "UpdateTopicConfig";
@ -98,7 +100,7 @@ public class KRaftMigrationZkWriter {
KRaftMigrationOperationConsumer operationConsumer
) {
if (delta.topicsDelta() != null) {
handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, delta.topicsDelta(), operationConsumer);
handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer);
}
if (delta.configsDelta() != null) {
handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer);
@ -121,8 +123,13 @@ public class KRaftMigrationZkWriter {
*/
void handleTopicsSnapshot(TopicsImage topicsImage, KRaftMigrationOperationConsumer operationConsumer) {
Map<Uuid, String> deletedTopics = new HashMap<>();
Set<Uuid> createdTopics = new HashSet<>(topicsImage.topicsById().keySet());
Set<Uuid> topicsInZk = new HashSet<>();
Set<Uuid> newTopics = new HashSet<>(topicsImage.topicsById().keySet());
Set<Uuid> changedTopics = new HashSet<>();
Map<Uuid, Set<Integer>> partitionsInZk = new HashMap<>();
Map<String, Set<Integer>> extraneousPartitionsInZk = new HashMap<>();
Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new HashMap<>();
Map<Uuid, Map<Integer, PartitionRegistration>> newPartitions = new HashMap<>();
migrationClient.topicClient().iterateTopics(
EnumSet.of(
@ -136,7 +143,8 @@ public class KRaftMigrationZkWriter {
// If KRaft does not have this topic, it was deleted
deletedTopics.put(topicId, topicName);
} else {
createdTopics.remove(topicId);
if (!newTopics.remove(topicId)) return;
topicsInZk.add(topicId);
}
}
@ -144,19 +152,52 @@ public class KRaftMigrationZkWriter {
public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
TopicImage topic = topicsImage.getTopic(topicIdPartition.topicId());
if (topic == null) {
return; // topic deleted in KRaft
return; // The topic was deleted in KRaft. Handled by deletedTopics
}
// If there is failure in previous Zk writes, We could end up with Zookeeper
// containing with partial or without any partitions for existing topics. So
// accumulate the partition ids to check for any missing partitions in Zk.
partitionsInZk
.computeIfAbsent(topic.id(), __ -> new HashSet<>())
.add(topicIdPartition.partition());
// Check if the KRaft partition state changed
PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
if (!kraftPartition.equals(partitionRegistration)) {
changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap<>())
.put(topicIdPartition.partition(), kraftPartition);
if (kraftPartition != null) {
if (!kraftPartition.equals(partitionRegistration)) {
changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap<>())
.put(topicIdPartition.partition(), kraftPartition);
}
// Check if partition assignment has changed. This will need topic update.
if (!kraftPartition.hasSameAssignment(partitionRegistration)) {
changedTopics.add(topic.id());
}
}
}
});
createdTopics.forEach(topicId -> {
// Check for any partition changes in existing topics.
topicsInZk.forEach(topicId -> {
TopicImage topic = topicsImage.getTopic(topicId);
Set<Integer> topicPartitionsInZk = partitionsInZk.computeIfAbsent(topicId, __ -> new HashSet<>());
if (!topicPartitionsInZk.equals(topic.partitions().keySet())) {
Map<Integer, PartitionRegistration> newTopicPartitions = new HashMap<>(topic.partitions());
// Compute KRaft partitions that are not in ZK
topicPartitionsInZk.forEach(newTopicPartitions::remove);
newPartitions.put(topicId, newTopicPartitions);
// Compute ZK partitions that are not in KRaft
topicPartitionsInZk.removeAll(topic.partitions().keySet());
if (!topicPartitionsInZk.isEmpty()) {
extraneousPartitionsInZk.put(topic.name(), topicPartitionsInZk);
}
changedTopics.add(topicId);
}
});
newTopics.forEach(topicId -> {
TopicImage topic = topicsImage.getTopic(topicId);
operationConsumer.accept(
CREATE_TOPIC,
@ -165,6 +206,15 @@ public class KRaftMigrationZkWriter {
);
});
changedTopics.forEach(topicId -> {
TopicImage topic = topicsImage.getTopic(topicId);
operationConsumer.accept(
UPDATE_TOPIC,
"Changed Topic " + topic.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().updateTopic(topic.name(), topicId, topic.partitions(), migrationState)
);
});
deletedTopics.forEach((topicId, topicName) -> {
operationConsumer.accept(
DELETE_TOPIC,
@ -179,19 +229,39 @@ public class KRaftMigrationZkWriter {
);
});
changedPartitions.forEach((topicId, paritionMap) -> {
newPartitions.forEach((topicId, partitionMap) -> {
TopicImage topic = topicsImage.getTopic(topicId);
operationConsumer.accept(
UPDATE_PARTITON,
"Creating additional partitions for Topic " + topic.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().updateTopicPartitions(
Collections.singletonMap(topic.name(), partitionMap),
migrationState));
});
changedPartitions.forEach((topicId, partitionMap) -> {
TopicImage topic = topicsImage.getTopic(topicId);
operationConsumer.accept(
UPDATE_PARTITON,
"Updating Partitions for Topic " + topic.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().updateTopicPartitions(
Collections.singletonMap(topic.name(), paritionMap),
Collections.singletonMap(topic.name(), partitionMap),
migrationState));
});
extraneousPartitionsInZk.forEach((topicName, partitions) -> {
operationConsumer.accept(
DELETE_PARTITION,
"Deleting extraneous Partitions " + partitions + " for Topic " + topicName,
migrationState -> migrationClient.topicClient().deleteTopicPartitions(
Collections.singletonMap(topicName, partitions),
migrationState));
});
}
void handleTopicsDelta(
Function<Uuid, String> deletedTopicNameResolver,
TopicsImage topicsImage,
TopicsDelta topicsDelta,
KRaftMigrationOperationConsumer operationConsumer
) {
@ -212,12 +282,36 @@ public class KRaftMigrationZkWriter {
topicDelta.partitionChanges(),
migrationState));
} else {
operationConsumer.accept(
UPDATE_PARTITON,
"Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().updateTopicPartitions(
Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
migrationState));
if (topicDelta.hasPartitionsWithAssignmentChanges())
operationConsumer.accept(
UPDATE_TOPIC,
"Updating Topic " + topicDelta.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().updateTopic(
topicDelta.name(),
topicId,
topicsImage.getTopic(topicId).partitions(),
migrationState));
Map<Integer, PartitionRegistration> newPartitions = topicDelta.newPartitions();
Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges();
if (!newPartitions.isEmpty()) {
operationConsumer.accept(
UPDATE_PARTITON,
"Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().createTopicPartitions(
Collections.singletonMap(topicDelta.name(), newPartitions),
migrationState));
newPartitions.keySet().forEach(changedPartitions::remove);
}
if (!changedPartitions.isEmpty()) {
// Need a final for the lambda
final Map<Integer, PartitionRegistration> finalChangedPartitions = changedPartitions;
operationConsumer.accept(
UPDATE_PARTITON,
"Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId,
migrationState -> migrationClient.topicClient().updateTopicPartitions(
Collections.singletonMap(topicDelta.name(), finalChangedPartitions),
migrationState));
}
}
});
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface TopicMigrationClient {
@ -53,8 +54,25 @@ public interface TopicMigrationClient {
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState updateTopic(
String topicName,
Uuid topicId,
Map<Integer, PartitionRegistration> topicPartitions,
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState createTopicPartitions(
Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState updateTopicPartitions(
Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState deleteTopicPartitions(
Map<String, Set<Integer>> topicPartitions,
ZkMigrationLeadershipState state
);
}

View File

@ -30,12 +30,18 @@ import java.util.Set;
public class CapturingTopicMigrationClient implements TopicMigrationClient {
public List<String> deletedTopics = new ArrayList<>();
public List<String> createdTopics = new ArrayList<>();
public LinkedHashMap<String, Map<Integer, PartitionRegistration>> updatedTopics = new LinkedHashMap<>();
public LinkedHashMap<String, Set<Integer>> newTopicPartitions = new LinkedHashMap<>();
public LinkedHashMap<String, Set<Integer>> updatedTopicPartitions = new LinkedHashMap<>();
public LinkedHashMap<String, Set<Integer>> deletedTopicPartitions = new LinkedHashMap<>();
public void reset() {
createdTopics.clear();
updatedTopicPartitions.clear();
deletedTopics.clear();
updatedTopics.clear();
deletedTopicPartitions.clear();
}
@ -56,6 +62,25 @@ public class CapturingTopicMigrationClient implements TopicMigrationClient {
return state;
}
@Override
public ZkMigrationLeadershipState updateTopic(
String topicName,
Uuid topicId,
Map<Integer, PartitionRegistration> topicPartitions,
ZkMigrationLeadershipState state
) {
updatedTopics.put(topicName, topicPartitions);
return state;
}
@Override
public ZkMigrationLeadershipState createTopicPartitions(Map<String, Map<Integer, PartitionRegistration>> topicPartitions, ZkMigrationLeadershipState state) {
topicPartitions.forEach((topicName, partitionMap) ->
newTopicPartitions.put(topicName, partitionMap.keySet())
);
return state;
}
@Override
public ZkMigrationLeadershipState updateTopicPartitions(Map<String, Map<Integer, PartitionRegistration>> topicPartitions, ZkMigrationLeadershipState state) {
topicPartitions.forEach((topicName, partitionMap) ->
@ -63,4 +88,10 @@ public class CapturingTopicMigrationClient implements TopicMigrationClient {
);
return state;
}
@Override
public ZkMigrationLeadershipState deleteTopicPartitions(Map<String, Set<Integer>> topicPartitions, ZkMigrationLeadershipState state) {
deletedTopicPartitions.putAll(topicPartitions);
return state;
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.AclsImageTest;
import org.apache.kafka.image.ClientQuotasImage;
@ -33,16 +35,72 @@ import org.apache.kafka.image.TopicsImageTest;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KRaftMigrationZkWriterTest {
@Test
public void testExtraneousZkPartitions() {
CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
@Override
public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
Map<Integer, List<Integer>> assignments = new HashMap<>();
assignments.put(0, Arrays.asList(2, 3, 4));
assignments.put(1, Arrays.asList(3, 4, 5));
assignments.put(2, Arrays.asList(2, 4, 5));
assignments.put(3, Arrays.asList(1, 2, 3)); // This one is not in KRaft
visitor.visitTopic("foo", TopicsImageTest.FOO_UUID, assignments);
// Skip partition 1, visit 3 (the extra one)
IntStream.of(0, 2, 3).forEach(partitionId -> {
visitor.visitPartition(
new TopicIdPartition(TopicsImageTest.FOO_UUID, new TopicPartition("foo", partitionId)),
TopicsImageTest.IMAGE1.getPartition(TopicsImageTest.FOO_UUID, partitionId)
);
});
}
};
CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
.setBrokersInZk(0)
.setTopicMigrationClient(topicClient)
.setConfigMigrationClient(configClient)
.build();
KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
MetadataImage image = new MetadataImage(
MetadataProvenance.EMPTY,
FeaturesImage.EMPTY,
ClusterImage.EMPTY,
TopicsImageTest.IMAGE1, // This includes "foo" with 3 partitions
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY,
ProducerIdsImage.EMPTY,
AclsImage.EMPTY,
ScramImage.EMPTY
);
writer.handleSnapshot(image, (opType, opLog, operation) -> {
operation.apply(ZkMigrationLeadershipState.EMPTY);
});
assertEquals(topicClient.updatedTopics.get("foo").size(), 3);
assertEquals(topicClient.deletedTopicPartitions.get("foo"), Collections.singleton(3));
assertEquals(topicClient.updatedTopicPartitions.get("foo"), Collections.singleton(1));
}
/**
* If ZK is empty, ensure that the writer will sync all metadata from the MetadataImage to ZK
*/
@ -137,6 +195,8 @@ public class KRaftMigrationZkWriterTest {
(logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
writer.handleSnapshot(image, consumer);
assertEquals(1, opCounts.remove("CreateTopic"));
assertEquals(1, opCounts.remove("UpdatePartition"));
assertEquals(1, opCounts.remove("UpdateTopic"));
assertEquals(0, opCounts.size());
assertEquals("bar", topicClient.createdTopics.get(0));
}