diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index db66b117412..3a98fbaf587 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -256,21 +256,22 @@ class ControllerServer( } val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder) val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config) - val migrationDriver = new KRaftMigrationDriver( - config.nodeId, - controller.asInstanceOf[QuorumController].zkRecordConsumer(), - migrationClient, - propagator, - publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)), - sharedServer.faultHandlerFactory.build( + val migrationDriver = KRaftMigrationDriver.newBuilder() + .setNodeId(config.nodeId) + .setZkRecordConsumer(controller.asInstanceOf[QuorumController].zkRecordConsumer()) + .setZkMigrationClient(migrationClient) + .setPropagator(propagator) + .setInitialZkLoadHandler(publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher))) + .setFaultHandler(sharedServer.faultHandlerFactory.build( "zk migration", fatal = false, () => {} - ), - quorumFeatures, - configSchema, - quorumControllerMetrics - ) + )) + .setQuorumFeatures(quorumFeatures) + .setConfigSchema(configSchema) + .setControllerMetrics(quorumControllerMetrics) + .setTime(time) + .build() migrationDriver.start() migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator)) } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 740c3e4b1dd..aed201f9005 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -112,7 +112,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { private volatile MetadataImage image; private volatile boolean firstPublish; - public KRaftMigrationDriver( + KRaftMigrationDriver( int nodeId, ZkRecordConsumer zkRecordConsumer, MigrationClient zkMigrationClient, @@ -145,21 +145,10 @@ public class KRaftMigrationDriver implements MetadataPublisher { this.recordRedactor = new RecordRedactor(configSchema); } - public KRaftMigrationDriver( - int nodeId, - ZkRecordConsumer zkRecordConsumer, - MigrationClient zkMigrationClient, - LegacyPropagator propagator, - Consumer initialZkLoadHandler, - FaultHandler faultHandler, - QuorumFeatures quorumFeatures, - KafkaConfigSchema configSchema, - QuorumControllerMetrics controllerMetrics - ) { - this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM); + public static Builder newBuilder() { + return new Builder(); } - public void start() { eventQueue.prepend(new PollEvent()); } @@ -756,4 +745,109 @@ public class KRaftMigrationDriver implements MetadataPublisher { operationConsumer.accept(logMsg, operation); }; } + + public static class Builder { + private Integer nodeId; + private ZkRecordConsumer zkRecordConsumer; + private MigrationClient zkMigrationClient; + private LegacyPropagator propagator; + private Consumer initialZkLoadHandler; + private FaultHandler faultHandler; + private QuorumFeatures quorumFeatures; + private KafkaConfigSchema configSchema; + private QuorumControllerMetrics controllerMetrics; + private Time time; + + public Builder setNodeId(int nodeId) { + this.nodeId = nodeId; + return this; + } + + public Builder setZkRecordConsumer(ZkRecordConsumer zkRecordConsumer) { + this.zkRecordConsumer = zkRecordConsumer; + return this; + } + + public Builder setZkMigrationClient(MigrationClient zkMigrationClient) { + this.zkMigrationClient = zkMigrationClient; + return this; + } + + public Builder setPropagator(LegacyPropagator propagator) { + this.propagator = propagator; + return this; + } + + public Builder setInitialZkLoadHandler(Consumer initialZkLoadHandler) { + this.initialZkLoadHandler = initialZkLoadHandler; + return this; + } + + public Builder setFaultHandler(FaultHandler faultHandler) { + this.faultHandler = faultHandler; + return this; + } + + public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) { + this.quorumFeatures = quorumFeatures; + return this; + } + + public Builder setConfigSchema(KafkaConfigSchema configSchema) { + this.configSchema = configSchema; + return this; + } + + public Builder setControllerMetrics(QuorumControllerMetrics controllerMetrics) { + this.controllerMetrics = controllerMetrics; + return this; + } + + public Builder setTime(Time time) { + this.time = time; + return this; + } + + public KRaftMigrationDriver build() { + if (nodeId == null) { + throw new IllegalStateException("You must specify the node ID of this controller."); + } + if (zkRecordConsumer == null) { + throw new IllegalStateException("You must specify the ZkRecordConsumer."); + } + if (zkMigrationClient == null) { + throw new IllegalStateException("You must specify the MigrationClient."); + } + if (propagator == null) { + throw new IllegalStateException("You must specify the MetadataPropagator."); + } + if (initialZkLoadHandler == null) { + throw new IllegalStateException("You must specify the initial ZK load callback."); + } + if (faultHandler == null) { + throw new IllegalStateException("You must specify the FaultHandler."); + } + if (configSchema == null) { + throw new IllegalStateException("You must specify the KafkaConfigSchema."); + } + if (controllerMetrics == null) { + throw new IllegalStateException("You must specify the QuorumControllerMetrics."); + } + if (time == null) { + throw new IllegalStateException("You must specify the Time."); + } + return new KRaftMigrationDriver( + nodeId, + zkRecordConsumer, + zkMigrationClient, + propagator, + initialZkLoadHandler, + faultHandler, + quorumFeatures, + configSchema, + controllerMetrics, + time + ); + } + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 8b42447a022..fd11be71370 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -114,6 +114,22 @@ public class KRaftMigrationDriverTest { } }; + /** + * Return a {@link org.apache.kafka.metadata.migration.KRaftMigrationDriver.Builder} that uses the mocks + * defined in this class. + */ + KRaftMigrationDriver.Builder defaultTestBuilder() { + return KRaftMigrationDriver.newBuilder() + .setNodeId(3000) + .setZkRecordConsumer(new NoOpRecordConsumer()) + .setInitialZkLoadHandler(metadataPublisher -> { }) + .setFaultHandler(new MockFaultHandler("test")) + .setQuorumFeatures(quorumFeatures) + .setConfigSchema(KafkaConfigSchema.EMPTY) + .setControllerMetrics(metrics) + .setTime(mockTime); + } + @BeforeEach public void setup() { apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); @@ -226,18 +242,12 @@ public class KRaftMigrationDriverTest { .setBrokersInZk(1, 2, 3) .setConfigMigrationClient(configClient) .build(); - try (KRaftMigrationDriver driver = new KRaftMigrationDriver( - 3000, - new NoOpRecordConsumer(), - migrationClient, - metadataPropagator, - metadataPublisher -> { }, - new MockFaultHandler("test"), - quorumFeatures, - KafkaConfigSchema.EMPTY, - metrics, - mockTime - )) { + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setPropagator(metadataPropagator) + .setInitialZkLoadHandler(metadataPublisher -> { }); + + try (KRaftMigrationDriver driver = builder.build()) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); @@ -312,18 +322,11 @@ public class KRaftMigrationDriverTest { } }; MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration"); - try (KRaftMigrationDriver driver = new KRaftMigrationDriver( - 3000, - new NoOpRecordConsumer(), - migrationClient, - metadataPropagator, - metadataPublisher -> { }, - faultHandler, - quorumFeatures, - KafkaConfigSchema.EMPTY, - metrics, - mockTime - )) { + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setFaultHandler(faultHandler) + .setPropagator(metadataPropagator); + try (KRaftMigrationDriver driver = builder.build()) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); @@ -358,19 +361,10 @@ public class KRaftMigrationDriverTest { CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build(); apiVersions.remove("6"); - try (KRaftMigrationDriver driver = new KRaftMigrationDriver( - 3000, - new NoOpRecordConsumer(), - migrationClient, - metadataPropagator, - metadataPublisher -> { - }, - new MockFaultHandler("test"), - quorumFeatures, - KafkaConfigSchema.EMPTY, - metrics, - mockTime - )) { + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setPropagator(metadataPropagator); + try (KRaftMigrationDriver driver = builder.build()) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); @@ -406,18 +400,11 @@ public class KRaftMigrationDriverTest { CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient()); MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration"); - try (KRaftMigrationDriver driver = new KRaftMigrationDriver( - 3000, - new NoOpRecordConsumer(), - migrationClient, - metadataPropagator, - metadataPublisher -> { }, - faultHandler, - quorumFeatures, - KafkaConfigSchema.EMPTY, - metrics, - mockTime - )) { + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setPropagator(metadataPropagator) + .setFaultHandler(faultHandler); + try (KRaftMigrationDriver driver = builder.build()) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); @@ -478,19 +465,10 @@ public class KRaftMigrationDriverTest { .setTopicMigrationClient(topicClient) .setConfigMigrationClient(configClient) .build(); - - try (KRaftMigrationDriver driver = new KRaftMigrationDriver( - 3000, - new NoOpRecordConsumer(), - migrationClient, - metadataPropagator, - metadataPublisher -> { }, - new MockFaultHandler("test"), - quorumFeatures, - KafkaConfigSchema.EMPTY, - metrics, - mockTime - )) { + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setPropagator(metadataPropagator); + try (KRaftMigrationDriver driver = builder.build()) { verifier.verify(driver, migrationClient, topicClient, configClient); } }