From 12ce9c7f98c1617824d7bd86f9cc1f4560646e26 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 1 Feb 2024 15:29:07 -0800 Subject: [PATCH] KAFKA-16216: Reduce batch size for initial metadata load during ZK migration During migration from ZK mode to KRaft mode, there is a step where the kcontrollers load all of the data from ZK into the metadata log. Previously, we were using a batch size of 1000 for this, but 200 seems better. This PR also adds an internal configuration to control this batch size, for testing purposes. Reviewers: Colin P. McCabe --- .../scala/kafka/server/ControllerServer.scala | 1 + .../main/scala/kafka/server/KafkaConfig.scala | 4 +++ .../migration/KRaftMigrationDriver.java | 17 +++++++++--- .../migration/KRaftMigrationDriverTest.java | 26 ++++++++++++------- .../apache/kafka/server/config/Defaults.java | 1 + 5 files changed, 36 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 8f14d814b63..a882e54e52d 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -304,6 +304,7 @@ class ControllerServer( .setQuorumFeatures(quorumFeatures) .setConfigSchema(configSchema) .setControllerMetrics(quorumControllerMetrics) + .setMinMigrationBatchSize(config.migrationMetadataMinBatchSize) .setTime(time) .build() migrationDriver.start() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f7967ed8457..c6f51a000e2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -166,6 +166,7 @@ object KafkaConfig { /** ZK to KRaft Migration configs */ val MigrationEnabledProp = "zookeeper.metadata.migration.enable" + val MigrationMetadataMinBatchSizeProp = "zookeeper.metadata.migration.min.batch.size" /** Enable eligible leader replicas configs */ val ElrEnabledProp = "eligible.leader.replicas.enable" @@ -1029,6 +1030,8 @@ object KafkaConfig { .defineInternal(ServerMaxStartupTimeMsProp, LONG, Defaults.SERVER_MAX_STARTUP_TIME_MS, atLeast(0), MEDIUM, ServerMaxStartupTimeMsDoc) .define(MigrationEnabledProp, BOOLEAN, false, HIGH, "Enable ZK to KRaft migration") .define(ElrEnabledProp, BOOLEAN, false, HIGH, "Enable the Eligible leader replicas") + .defineInternal(MigrationMetadataMinBatchSizeProp, INT, Defaults.MIGRATION_METADATA_MIN_BATCH_SIZE, atLeast(1), + MEDIUM, "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft") /************* Authorizer Configuration ***********/ .define(AuthorizerClassNameProp, STRING, Defaults.AUTHORIZER_CLASS_NAME, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc) @@ -1538,6 +1541,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp) + val migrationMetadataMinBatchSize: Int = getInt(KafkaConfig.MigrationMetadataMinBatchSizeProp) val elrEnabled: Boolean = getBoolean(KafkaConfig.ElrEnabledProp) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 534d0940195..f33f0790e0a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -90,8 +90,6 @@ public class KRaftMigrationDriver implements MetadataPublisher { */ final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000; - final static int MIGRATION_MIN_BATCH_SIZE = 1_000; - private final Time time; private final Logger log; private final int nodeId; @@ -110,6 +108,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { * MetadataPublisher with MetadataLoader. */ private final Consumer initialZkLoadHandler; + private final int minBatchSize; private volatile MigrationDriverState migrationState; private volatile ZkMigrationLeadershipState migrationLeadershipState; private volatile MetadataImage image; @@ -125,6 +124,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { QuorumFeatures quorumFeatures, KafkaConfigSchema configSchema, QuorumControllerMetrics controllerMetrics, + int minBatchSize, Time time ) { this.nodeId = nodeId; @@ -146,6 +146,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { this.quorumFeatures = quorumFeatures; this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient); this.recordRedactor = new RecordRedactor(configSchema); + this.minBatchSize = minBatchSize; } public static Builder newBuilder() { @@ -680,7 +681,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata throw new RuntimeException(e); } - }, MIGRATION_MIN_BATCH_SIZE); + }, minBatchSize); } class MigrateMetadataEvent extends MigrationEvent { @@ -854,6 +855,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { private QuorumFeatures quorumFeatures; private KafkaConfigSchema configSchema; private QuorumControllerMetrics controllerMetrics; + private Integer minBatchSize; private Time time; public Builder setNodeId(int nodeId) { @@ -906,6 +908,11 @@ public class KRaftMigrationDriver implements MetadataPublisher { return this; } + public Builder setMinMigrationBatchSize(int minBatchSize) { + this.minBatchSize = minBatchSize; + return this; + } + public KRaftMigrationDriver build() { if (nodeId == null) { throw new IllegalStateException("You must specify the node ID of this controller."); @@ -934,6 +941,9 @@ public class KRaftMigrationDriver implements MetadataPublisher { if (time == null) { throw new IllegalStateException("You must specify the Time."); } + if (minBatchSize == null) { + minBatchSize = 200; + } return new KRaftMigrationDriver( nodeId, zkRecordConsumer, @@ -944,6 +954,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { quorumFeatures, configSchema, controllerMetrics, + minBatchSize, time ); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 2b29574ab86..67df5612096 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -807,21 +807,26 @@ public class KRaftMigrationDriverTest { } static Stream batchSizes() { + int defaultBatchSize = 200; return Stream.of( - Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0), - Arguments.of(Arrays.asList(0, 0, 1, 0), 1, 1), - Arguments.of(Arrays.asList(1, 1, 1, 1), 1, 4), - Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE - 1), 1, 999), - Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 1, 1000), - Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE + 1), 1, 1001), - Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, 1), 2, 1001), - Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0), - Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 3, 3000) + Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(0, 0, 0, 0), 0, 0), + Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(0, 0, 1, 0), 1, 1), + Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(1, 1, 1, 1), 1, 4), + Arguments.of(Optional.of(1000), Collections.singletonList(999), 1, 999), + Arguments.of(Optional.of(1000), Collections.singletonList(1000), 1, 1000), + Arguments.of(Optional.of(1000), Collections.singletonList(1001), 1, 1001), + Arguments.of(Optional.of(1000), Arrays.asList(1000, 1), 2, 1001), + Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(0, 0, 0, 0), 0, 0), + Arguments.of(Optional.of(1000), Arrays.asList(1000, 1000, 1000), 3, 3000), + Arguments.of(Optional.of(defaultBatchSize), Collections.singletonList(defaultBatchSize + 1), 1, 201), + Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(defaultBatchSize, 1), 2, 201), + Arguments.of(Optional.empty(), Collections.singletonList(defaultBatchSize + 1), 1, 201), + Arguments.of(Optional.empty(), Arrays.asList(defaultBatchSize, 1), 2, 201) ); } @ParameterizedTest @MethodSource("batchSizes") - public void testCoalesceMigrationRecords(List batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception { + public void testCoalesceMigrationRecords(Optional configBatchSize, List batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception { List> batchesPassedToController = new ArrayList<>(); NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() { @Override @@ -851,6 +856,7 @@ public class KRaftMigrationDriverTest { .setZkRecordConsumer(recordConsumer) .setPropagator(metadataPropagator) .setFaultHandler(faultHandler); + configBatchSize.ifPresent(builder::setMinMigrationBatchSize); try (KRaftMigrationDriver driver = builder.build()) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java index 1c502e1564b..e55f641790f 100644 --- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java +++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java @@ -74,6 +74,7 @@ public class Defaults { /** ********* KRaft mode configs *********/ public static final int EMPTY_NODE_ID = -1; public static final long SERVER_MAX_STARTUP_TIME_MS = Long.MAX_VALUE; + public static final int MIGRATION_METADATA_MIN_BATCH_SIZE = 200; /** ********* Authorizer Configuration *********/ public static final String AUTHORIZER_CLASS_NAME = "";