KAFKA-16216: Reduce batch size for initial metadata load during ZK migration

During migration from ZK mode to KRaft mode, there is a step where the kcontrollers load all of the
data from ZK into the metadata log. Previously, we were using a batch size of 1000 for this, but
200 seems better. This PR also adds an internal configuration to control this batch size, for
testing purposes.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2024-02-01 15:29:07 -08:00 committed by Colin P. McCabe
parent 4f0a405908
commit 12ce9c7f98
5 changed files with 36 additions and 13 deletions

View File

@ -304,6 +304,7 @@ class ControllerServer(
.setQuorumFeatures(quorumFeatures) .setQuorumFeatures(quorumFeatures)
.setConfigSchema(configSchema) .setConfigSchema(configSchema)
.setControllerMetrics(quorumControllerMetrics) .setControllerMetrics(quorumControllerMetrics)
.setMinMigrationBatchSize(config.migrationMetadataMinBatchSize)
.setTime(time) .setTime(time)
.build() .build()
migrationDriver.start() migrationDriver.start()

View File

@ -166,6 +166,7 @@ object KafkaConfig {
/** ZK to KRaft Migration configs */ /** ZK to KRaft Migration configs */
val MigrationEnabledProp = "zookeeper.metadata.migration.enable" val MigrationEnabledProp = "zookeeper.metadata.migration.enable"
val MigrationMetadataMinBatchSizeProp = "zookeeper.metadata.migration.min.batch.size"
/** Enable eligible leader replicas configs */ /** Enable eligible leader replicas configs */
val ElrEnabledProp = "eligible.leader.replicas.enable" val ElrEnabledProp = "eligible.leader.replicas.enable"
@ -1029,6 +1030,8 @@ object KafkaConfig {
.defineInternal(ServerMaxStartupTimeMsProp, LONG, Defaults.SERVER_MAX_STARTUP_TIME_MS, atLeast(0), MEDIUM, ServerMaxStartupTimeMsDoc) .defineInternal(ServerMaxStartupTimeMsProp, LONG, Defaults.SERVER_MAX_STARTUP_TIME_MS, atLeast(0), MEDIUM, ServerMaxStartupTimeMsDoc)
.define(MigrationEnabledProp, BOOLEAN, false, HIGH, "Enable ZK to KRaft migration") .define(MigrationEnabledProp, BOOLEAN, false, HIGH, "Enable ZK to KRaft migration")
.define(ElrEnabledProp, BOOLEAN, false, HIGH, "Enable the Eligible leader replicas") .define(ElrEnabledProp, BOOLEAN, false, HIGH, "Enable the Eligible leader replicas")
.defineInternal(MigrationMetadataMinBatchSizeProp, INT, Defaults.MIGRATION_METADATA_MIN_BATCH_SIZE, atLeast(1),
MEDIUM, "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft")
/************* Authorizer Configuration ***********/ /************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AUTHORIZER_CLASS_NAME, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc) .define(AuthorizerClassNameProp, STRING, Defaults.AUTHORIZER_CLASS_NAME, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)
@ -1538,6 +1541,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp) val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
val migrationMetadataMinBatchSize: Int = getInt(KafkaConfig.MigrationMetadataMinBatchSizeProp)
val elrEnabled: Boolean = getBoolean(KafkaConfig.ElrEnabledProp) val elrEnabled: Boolean = getBoolean(KafkaConfig.ElrEnabledProp)

View File

@ -90,8 +90,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
*/ */
final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000; final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
final static int MIGRATION_MIN_BATCH_SIZE = 1_000;
private final Time time; private final Time time;
private final Logger log; private final Logger log;
private final int nodeId; private final int nodeId;
@ -110,6 +108,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
* MetadataPublisher with MetadataLoader. * MetadataPublisher with MetadataLoader.
*/ */
private final Consumer<MetadataPublisher> initialZkLoadHandler; private final Consumer<MetadataPublisher> initialZkLoadHandler;
private final int minBatchSize;
private volatile MigrationDriverState migrationState; private volatile MigrationDriverState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState; private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image; private volatile MetadataImage image;
@ -125,6 +124,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
QuorumFeatures quorumFeatures, QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema, KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics, QuorumControllerMetrics controllerMetrics,
int minBatchSize,
Time time Time time
) { ) {
this.nodeId = nodeId; this.nodeId = nodeId;
@ -146,6 +146,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.quorumFeatures = quorumFeatures; this.quorumFeatures = quorumFeatures;
this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient); this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient);
this.recordRedactor = new RecordRedactor(configSchema); this.recordRedactor = new RecordRedactor(configSchema);
this.minBatchSize = minBatchSize;
} }
public static Builder newBuilder() { public static Builder newBuilder() {
@ -680,7 +681,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}, MIGRATION_MIN_BATCH_SIZE); }, minBatchSize);
} }
class MigrateMetadataEvent extends MigrationEvent { class MigrateMetadataEvent extends MigrationEvent {
@ -854,6 +855,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private QuorumFeatures quorumFeatures; private QuorumFeatures quorumFeatures;
private KafkaConfigSchema configSchema; private KafkaConfigSchema configSchema;
private QuorumControllerMetrics controllerMetrics; private QuorumControllerMetrics controllerMetrics;
private Integer minBatchSize;
private Time time; private Time time;
public Builder setNodeId(int nodeId) { public Builder setNodeId(int nodeId) {
@ -906,6 +908,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
return this; return this;
} }
public Builder setMinMigrationBatchSize(int minBatchSize) {
this.minBatchSize = minBatchSize;
return this;
}
public KRaftMigrationDriver build() { public KRaftMigrationDriver build() {
if (nodeId == null) { if (nodeId == null) {
throw new IllegalStateException("You must specify the node ID of this controller."); throw new IllegalStateException("You must specify the node ID of this controller.");
@ -934,6 +941,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
if (time == null) { if (time == null) {
throw new IllegalStateException("You must specify the Time."); throw new IllegalStateException("You must specify the Time.");
} }
if (minBatchSize == null) {
minBatchSize = 200;
}
return new KRaftMigrationDriver( return new KRaftMigrationDriver(
nodeId, nodeId,
zkRecordConsumer, zkRecordConsumer,
@ -944,6 +954,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
quorumFeatures, quorumFeatures,
configSchema, configSchema,
controllerMetrics, controllerMetrics,
minBatchSize,
time time
); );
} }

View File

@ -807,21 +807,26 @@ public class KRaftMigrationDriverTest {
} }
static Stream<Arguments> batchSizes() { static Stream<Arguments> batchSizes() {
int defaultBatchSize = 200;
return Stream.of( return Stream.of(
Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0), Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(0, 0, 0, 0), 0, 0),
Arguments.of(Arrays.asList(0, 0, 1, 0), 1, 1), Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(0, 0, 1, 0), 1, 1),
Arguments.of(Arrays.asList(1, 1, 1, 1), 1, 4), Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(1, 1, 1, 1), 1, 4),
Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE - 1), 1, 999), Arguments.of(Optional.of(1000), Collections.singletonList(999), 1, 999),
Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 1, 1000), Arguments.of(Optional.of(1000), Collections.singletonList(1000), 1, 1000),
Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE + 1), 1, 1001), Arguments.of(Optional.of(1000), Collections.singletonList(1001), 1, 1001),
Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, 1), 2, 1001), Arguments.of(Optional.of(1000), Arrays.asList(1000, 1), 2, 1001),
Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0), Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(0, 0, 0, 0), 0, 0),
Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 3, 3000) Arguments.of(Optional.of(1000), Arrays.asList(1000, 1000, 1000), 3, 3000),
Arguments.of(Optional.of(defaultBatchSize), Collections.singletonList(defaultBatchSize + 1), 1, 201),
Arguments.of(Optional.of(defaultBatchSize), Arrays.asList(defaultBatchSize, 1), 2, 201),
Arguments.of(Optional.empty(), Collections.singletonList(defaultBatchSize + 1), 1, 201),
Arguments.of(Optional.empty(), Arrays.asList(defaultBatchSize, 1), 2, 201)
); );
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("batchSizes") @MethodSource("batchSizes")
public void testCoalesceMigrationRecords(List<Integer> batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception { public void testCoalesceMigrationRecords(Optional<Integer> configBatchSize, List<Integer> batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception {
List<List<ApiMessageAndVersion>> batchesPassedToController = new ArrayList<>(); List<List<ApiMessageAndVersion>> batchesPassedToController = new ArrayList<>();
NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() { NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
@Override @Override
@ -851,6 +856,7 @@ public class KRaftMigrationDriverTest {
.setZkRecordConsumer(recordConsumer) .setZkRecordConsumer(recordConsumer)
.setPropagator(metadataPropagator) .setPropagator(metadataPropagator)
.setFaultHandler(faultHandler); .setFaultHandler(faultHandler);
configBatchSize.ifPresent(builder::setMinMigrationBatchSize);
try (KRaftMigrationDriver driver = builder.build()) { try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY; MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image); MetadataDelta delta = new MetadataDelta(image);

View File

@ -74,6 +74,7 @@ public class Defaults {
/** ********* KRaft mode configs *********/ /** ********* KRaft mode configs *********/
public static final int EMPTY_NODE_ID = -1; public static final int EMPTY_NODE_ID = -1;
public static final long SERVER_MAX_STARTUP_TIME_MS = Long.MAX_VALUE; public static final long SERVER_MAX_STARTUP_TIME_MS = Long.MAX_VALUE;
public static final int MIGRATION_METADATA_MIN_BATCH_SIZE = 200;
/** ********* Authorizer Configuration *********/ /** ********* Authorizer Configuration *********/
public static final String AUTHORIZER_CLASS_NAME = ""; public static final String AUTHORIZER_CLASS_NAME = "";