KAFKA-14656: Send UMR first during ZK migration (#13159)

When in migration-from-ZK mode and sending RPCs to ZK-based brokers, the KRaft controller must send
full UpdateMetadataRequests prior to sending full LeaderAndIsrRequests. If the controller sends the
requests in the other order, and the ZK-based broker does not already know about some of the nodes
referenced in the LeaderAndIsrRequest, it will reject the request.

This PR includes an integration test, and a number of other small fixes for dual-write.

Co-authored-by: Akhilesh C <akhileshchg@users.noreply.github.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2023-01-31 01:31:45 -05:00 committed by GitHub
parent eb7d5cbf15
commit 89a4735c35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 335 additions and 57 deletions

View File

@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
*/
abstract class InterBrokerSendThread(
name: String,
var networkClient: KafkaClient,
@volatile var networkClient: KafkaClient,
requestTimeoutMs: Int,
time: Time,
isInterruptible: Boolean = true

View File

@ -92,14 +92,20 @@ class MigrationPropagator(
val oldZkBrokers = zkBrokers -- changedZkBrokers
val brokersChanged = !delta.clusterDelta().changedBrokers().isEmpty
// First send metadata about the live/dead brokers to all the zk brokers.
if (changedZkBrokers.nonEmpty) {
// Update new Zk brokers about all the metadata.
requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq, image.topics().partitions().keySet().asScala)
// Send these requests first to make sure, we don't add all the partition metadata to the
// old brokers as well.
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()
}
if (brokersChanged) {
requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
}
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()
// Now send LISR, UMR and StopReplica requests for both new zk brokers and existing zk
// brokers based on the topic changes.
if (changedZkBrokers.nonEmpty) {
// For new the brokers, check if there are partition assignments and add LISR appropriately.
image.topics().partitions().asScala.foreach { case (tp, partitionRegistration) =>
val replicas = partitionRegistration.replicas.toSet
@ -118,9 +124,8 @@ class MigrationPropagator(
}
}
// If there are new brokers (including KRaft brokers) or if there are changes in topic
// metadata, let's send UMR about the changes to the old Zk brokers.
if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty || !delta.topicsDelta().changedTopics().isEmpty) {
// If there are changes in topic metadata, let's send UMR about the changes to the old Zk brokers.
if (!delta.topicsDelta().deletedTopicIds().isEmpty || !delta.topicsDelta().changedTopics().isEmpty) {
requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
}
@ -175,14 +180,20 @@ class MigrationPropagator(
override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage, zkControllerEpoch: Int): Unit = {
publishMetadata(image)
requestBatch.newBatch()
val zkBrokers = image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
val partitions = image.topics().partitions()
// First send all the metadata before sending any other requests to make sure subsequent
// requests are handled correctly.
requestBatch.newBatch()
requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, partitions.keySet().asScala)
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()
// When we need to send RPCs from the image, we're sending 'full' requests meaning we let
// every broker know about all the metadata and all the LISR requests it needs to handle.
// Note that we cannot send StopReplica requests from the image. We don't have any state
// about brokers that host a replica but are not part of the replica set known by the Controller.
val zkBrokers = image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
val partitions = image.topics().partitions()
partitions.asScala.foreach{ case (tp, partitionRegistration) =>
val leaderIsrAndControllerEpochOpt = MigrationControllerChannelContext.partitionLeadershipInfo(image, tp)
leaderIsrAndControllerEpochOpt match {
@ -194,7 +205,6 @@ class MigrationPropagator(
case None => None
}
}
requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, partitions.keySet().asScala)
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
}

View File

@ -315,12 +315,12 @@ class BrokerToControllerRequestThread(
private def maybeResetNetworkClient(controllerInformation: ControllerInformation): Unit = {
if (isNetworkClientForZkController != controllerInformation.isZkController) {
debug("Controller changed to " + (if (isNetworkClientForZkController) "kraft" else "zk") + " mode. " +
"Resetting network client")
s"Resetting network client with new controller information : ${controllerInformation}")
// Close existing network client.
if (networkClient != null) {
networkClient.initiateClose()
networkClient.close()
}
val oldClient = networkClient
oldClient.initiateClose()
oldClient.close()
isNetworkClientForZkController = controllerInformation.isZkController
updateControllerAddress(controllerInformation.node.orNull)
controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava))
@ -382,6 +382,7 @@ class BrokerToControllerRequestThread(
}
private[server] def handleResponse(queueItem: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
debug(s"Request ${queueItem.request} received $response")
if (response.authenticationException != null) {
error(s"Request ${queueItem.request} failed due to authentication error with controller",
response.authenticationException)
@ -394,9 +395,16 @@ class BrokerToControllerRequestThread(
updateControllerAddress(null)
requestQueue.putFirst(queueItem)
} else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
debug(s"Request ${queueItem.request} received NOT_CONTROLLER exception. Disconnecting the " +
s"connection to the stale controller ${activeControllerAddress().map(_.idString).getOrElse("null")}")
// just close the controller connection and wait for metadata cache update in doWork
activeControllerAddress().foreach { controllerAddress =>
networkClient.disconnect(controllerAddress.idString)
try {
// We don't care if disconnect has an error, just log it and get a new network client
networkClient.disconnect(controllerAddress.idString)
} catch {
case t: Throwable => error("Had an error while disconnecting from NetworkClient.", t)
}
updateControllerAddress(null)
}

View File

@ -208,7 +208,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
.setBrokerEpoch(-1)
.setBrokerId(producerIdBlock.assignedBrokerId)
.setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
.setNextProducerId(producerIdBlock.firstProducerId()), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
case None => // Nothing to migrate
}
}
@ -364,24 +364,27 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
}
}
def writeClientQuotas(entity: ClientQuotaEntity,
quotas: util.Map[String, Double],
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val entityMap = entity.entries().asScala
val hasUser = entityMap.contains(ConfigType.User)
val hasClient = entityMap.contains(ConfigType.Client)
val hasIp = entityMap.contains(ConfigType.Ip)
override def writeClientQuotas(
entity: util.Map[String, String],
quotas: util.Map[String, java.lang.Double],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = {
val entityMap = entity.asScala
val hasUser = entityMap.contains(ClientQuotaEntity.USER)
val hasClient = entityMap.contains(ClientQuotaEntity.CLIENT_ID)
val hasIp = entityMap.contains(ClientQuotaEntity.IP)
val props = new Properties()
// We store client quota values as strings in the ZK JSON
quotas.forEach { case (key, value) => props.put(key, value.toString) }
val (configType, path) = if (hasUser && !hasClient) {
(Some(ConfigType.User), Some(entityMap(ConfigType.User)))
(Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
} else if (hasUser && hasClient) {
(Some(ConfigType.User), Some(s"${entityMap(ConfigType.User)}/clients/${entityMap(ConfigType.Client)}"))
(Some(ConfigType.User), Some(s"${entityMap(ClientQuotaEntity.USER)}/clients/${entityMap(ClientQuotaEntity.CLIENT_ID)}"))
} else if (hasClient) {
(Some(ConfigType.Client), Some(entityMap(ConfigType.Client)))
(Some(ConfigType.Client), Some(entityMap(ClientQuotaEntity.CLIENT_ID)))
} else if (hasIp) {
(Some(ConfigType.Ip), Some(entityMap(ConfigType.Ip)))
(Some(ConfigType.Ip), Some(entityMap(ClientQuotaEntity.IP)))
} else {
(None, None)
}
@ -399,7 +402,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
// If we didn't update the migration state, we failed to write the client quota. Try again
// after recursively create its parent znodes
val createPath = if (hasUser && hasClient) {
s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ConfigType.User)}/clients"
s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ClientQuotaEntity.USER)}/clients"
} else {
ConfigEntityTypeZNode.path(configType.get)
}
@ -414,7 +417,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
}
}
def writeProducerId(nextProducerId: Long, state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def writeProducerId(nextProducerId: Long, state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(
new ProducerIdsBlock(-1, nextProducerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
@ -423,9 +426,9 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
state.withMigrationZkVersion(migrationZkVersion)
}
def writeConfigs(resource: ConfigResource,
configs: util.Map[String, String],
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def writeConfigs(resource: ConfigResource,
configs: util.Map[String, String],
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val configType = resource.`type`() match {
case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)

View File

@ -27,6 +27,7 @@ import kafka.test.ClusterInstance;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
@ -319,6 +320,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
clusterReference.get().killBroker(i);
}
clusterReference.get().restartDeadBrokers(true);
clusterReference.get().adminClientConfig().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
}
@Override

View File

@ -16,26 +16,38 @@
*/
package kafka.zk
import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.config.TopicConfig
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasResult, AlterConfigOp, AlterConfigsResult, ConfigEntry, NewTopic}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
import org.slf4j.LoggerFactory
import java.util
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ZkMigrationIntegrationTest {
val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
class MetadataDeltaVerifier {
val metadataDelta = new MetadataDelta(MetadataImage.EMPTY)
var offset = 0
@ -102,4 +114,146 @@ class ZkMigrationIntegrationTest {
migrationState = migrationClient.releaseControllerLeadership(migrationState)
}
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
))
def testDualWrite(zkCluster: ClusterInstance): Unit = {
// Create a topic in ZK mode
var admin = zkCluster.createAdminClient()
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test", 2, 3.toShort)
.configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(60, TimeUnit.SECONDS)
admin.close()
// Verify the configs exist in ZK
val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
val propsBefore = zkClient.getEntityConfigs(ConfigType.Topic, "test")
assertEquals("102400", propsBefore.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
assertEquals("300000", propsBefore.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Allocate a transactional producer ID while in ZK mode
allocateProducerId(zkCluster.bootstrapServers())
val producerIdBlock = readProducerIdBlock(zkClient)
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
zkCluster.waitForReadyBrokers()
readyFuture.get(30, TimeUnit.SECONDS)
// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
// Alter the metadata
log.info("Updating metadata with AdminClient")
admin = zkCluster.createAdminClient()
alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
// Verify the changes made to KRaft are seen in ZK
log.info("Verifying metadata changes with ZK")
verifyTopicConfigs(zkClient)
verifyClientQuotas(zkClient)
allocateProducerId(zkCluster.bootstrapServers())
verifyProducerId(producerIdBlock, zkClient)
} finally {
zkCluster.stop()
kraftCluster.close()
}
}
def allocateProducerId(bootstrapServers: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", bootstrapServers)
props.put("transactional.id", "some-transaction-id")
val producer = new KafkaProducer[String, String](props, new StringSerializer(), new StringSerializer())
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[String, String]("test", "", "one"))
producer.commitTransaction()
producer.flush()
producer.close()
}
def readProducerIdBlock(zkClient: KafkaZkClient): ProducerIdsBlock = {
val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
}
def alterTopicConfig(admin: Admin): AlterConfigsResult = {
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test")
val alterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "204800"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, null), AlterConfigOp.OpType.DELETE)
).asJavaCollection
admin.incrementalAlterConfigs(Map(topicResource -> alterConfigs).asJava)
}
def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
val quotas = new util.ArrayList[ClientQuotaAlteration]()
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("user" -> "user1").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
admin.alterClientQuotas(quotas)
}
def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
val propsAfter = zkClient.getEntityConfigs(ConfigType.Topic, "test")
assertEquals("204800", propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
}
}
def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
assertEquals("1000.0", zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("consumer_byte_rate"))
assertEquals("800.0", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
assertEquals("100.0", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
assertEquals("10.0", zkClient.getEntityConfigs(ConfigType.Ip, "8.8.8.8").getProperty("connection_creation_rate"))
}
}
def verifyProducerId(firstProducerIdBlock: ProducerIdsBlock, zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
val producerIdBlock = readProducerIdBlock(zkClient)
assertTrue(firstProducerIdBlock.firstProducerId() < producerIdBlock.firstProducerId())
}
}
}

View File

@ -358,7 +358,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
time = brokerTime(config.brokerId),
threadNamePrefix = None,
startup = false,
enableZkApiForwarding = isZkMigrationTest()
enableZkApiForwarding = isZkMigrationTest() || (config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled)
)
}
}

View File

@ -164,11 +164,11 @@ class ZkMigrationClientTest extends QuorumTestHarness {
adminZkClient: AdminZkClient,
migrationState: ZkMigrationLeadershipState,
entity: Map[String, String],
quotas: Map[String, Double],
quotas: Map[String, java.lang.Double],
zkEntityType: String,
zkEntityName: String): ZkMigrationLeadershipState = {
val nextMigrationState = migrationClient.writeClientQuotas(
new ClientQuotaEntity(entity.asJava),
entity.asJava,
quotas.asJava,
migrationState)
val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
@ -187,25 +187,25 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ConfigType.User -> "user1"),
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
ConfigType.User, "user1")
assertEquals(1, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ConfigType.User -> "user1"),
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
ConfigType.User, "user1")
assertEquals(2, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ConfigType.User -> "user1"),
Map(ClientQuotaEntity.USER -> "user1"),
Map.empty,
ConfigType.User, "user1")
assertEquals(3, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ConfigType.User -> "user1"),
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
ConfigType.User, "user1")
assertEquals(4, migrationState.migrationZkVersion())
@ -215,14 +215,14 @@ class ZkMigrationClientTest extends QuorumTestHarness {
def testWriteNewClientQuotas(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ConfigType.User -> "user2"),
Map(ClientQuotaEntity.USER -> "user2"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
ConfigType.User, "user2")
assertEquals(1, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ConfigType.User -> "user2", ConfigType.Client -> "clientA"),
Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
ConfigType.User, "user2/clients/clientA")

View File

@ -64,10 +64,12 @@ public class ProducerIdControlManager {
}
void replay(ProducerIdsRecord record) {
long currentNextProducerId = nextProducerBlock.get().firstProducerId();
if (record.nextProducerId() <= currentNextProducerId) {
// During a migration, we may be calling replay() without ever having called generateNextProducerId(),
// so the next producer block could be EMPTY
ProducerIdsBlock nextBlock = nextProducerBlock.get();
if (nextBlock != ProducerIdsBlock.EMPTY && record.nextProducerId() <= nextBlock.firstProducerId()) {
throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
" is not greater than current next Producer ID (" + currentNextProducerId + ")");
" is not greater than current next Producer ID (" + nextBlock.firstProducerId() + ")");
} else {
nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
brokerEpoch.set(record.brokerEpoch());

View File

@ -52,6 +52,10 @@ public final class ClientQuotaImage {
return quotas;
}
public Map<String, Double> quotaMap() {
return Collections.unmodifiableMap(quotas);
}
public void write(
ClientQuotaEntity entity,
ImageWriter writer,

View File

@ -58,6 +58,10 @@ public final class ConfigurationImage {
return properties;
}
public Map<String, String> toMap() {
return Collections.unmodifiableMap(data);
}
public void write(
ConfigResource configResource,
ImageWriter writer,

View File

@ -61,6 +61,19 @@ public final class ConfigurationsImage {
}
}
/**
* Return the underlying config data for a given resource as an immutable map. This does not apply
* configuration overrides or include entity defaults for the resource type.
*/
public Map<String, String> configMapForResource(ConfigResource configResource) {
ConfigurationImage configurationImage = data.get(configResource);
if (configurationImage != null) {
return configurationImage.toMap();
} else {
return Collections.emptyMap();
}
}
public void write(ImageWriter writer, ImageWriterOptions options) {
for (Entry<ConfigResource, ConfigurationImage> entry : data.entrySet()) {
ConfigResource configResource = entry.getKey();

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
@ -28,18 +30,22 @@ import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@ -403,7 +409,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
AtomicInteger count = new AtomicInteger(0);
zkMigrationClient.readAllMetadata(batch -> {
try {
log.info("Migrating {} records from ZK: {}", batch.size(), batch);
if (log.isTraceEnabled()) {
log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch));
} else {
log.info("Migrating {} records from ZK", batch.size());
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
count.addAndGet(batch.size());
future.get();
@ -446,9 +456,14 @@ public class KRaftMigrationDriver implements MetadataPublisher {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (migrationState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
"at offset and epoch {}", image.highestOffsetAndEpoch());
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're not doing any Zk writes.
transitionTo(MigrationState.DUAL_WRITE);
} else {
log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
migrationLeadershipState.offsetAndEpoch());
}
}
}
@ -470,10 +485,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
@Override
public void run() throws Exception {
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";
if (migrationState != MigrationState.DUAL_WRITE) {
log.trace("Received metadata change, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper");
log.trace("Received metadata {}, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper", metadataType);
return;
}
if (delta.featuresDelta() != null) {
@ -499,16 +515,60 @@ public class KRaftMigrationDriver implements MetadataPublisher {
});
}
// For configs and client quotas, we need to send all of the data to the ZK client since we persist
// everything for a given entity in a single ZK node.
if (delta.configsDelta() != null) {
delta.configsDelta().changes().forEach((configResource, configDelta) ->
apply("Updating config resource " + configResource, migrationState ->
zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
}
if (delta.clientQuotasDelta() != null) {
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
});
}
if (delta.producerIdsDelta() != null) {
apply("Updating next producer ID", migrationState ->
zkMigrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), migrationState));
}
apply("Write MetadataDelta to Zk", state -> zkMigrationClient.writeMetadataDeltaToZookeeper(delta, image, state));
// TODO: Unhappy path: Probably relinquish leadership and let new controller
// retry the write?
log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
migrationLeadershipState.zkControllerEpoch());
} else {
String metadataType = isSnapshot ? "snapshot" : "delta";
log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
}
}
}
static String recordBatchToString(Collection<ApiMessageAndVersion> batch) {
String batchString = batch.stream().map(apiMessageAndVersion -> {
if (apiMessageAndVersion.message().apiKey() == MetadataRecordType.CONFIG_RECORD.id()) {
StringBuilder sb = new StringBuilder();
sb.append("ApiMessageAndVersion(");
ConfigRecord record = (ConfigRecord) apiMessageAndVersion.message();
sb.append("ConfigRecord(");
sb.append("resourceType=");
sb.append(record.resourceType());
sb.append(", resourceName=");
sb.append(record.resourceName());
sb.append(", name=");
sb.append(record.name());
sb.append(")");
sb.append(" at version ");
sb.append(apiMessageAndVersion.version());
sb.append(")");
return sb.toString();
} else {
return apiMessageAndVersion.toString();
}
}).collect(Collectors.joining(","));
return "[" + batchString + "]";
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.PartitionRegistration;
@ -85,6 +86,23 @@ public interface MigrationClient {
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState writeConfigs(
ConfigResource configResource,
Map<String, String> configMap,
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState writeClientQuotas(
Map<String, String> clientQuotaEntity,
Map<String, Double> quotas,
ZkMigrationLeadershipState state
);
ZkMigrationLeadershipState writeProducerId(
long nextProducerId,
ZkMigrationLeadershipState state
);
void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer);
Set<Integer> readBrokerIds();