MINOR: Add a Builder for KRaftMigrationDriver (#14062)

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Arthur 2023-07-25 16:05:04 -04:00 committed by GitHub
parent 8b027b6fef
commit e794bc719a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 161 additions and 88 deletions

View File

@ -256,21 +256,22 @@ class ControllerServer(
}
val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
val migrationDriver = new KRaftMigrationDriver(
config.nodeId,
controller.asInstanceOf[QuorumController].zkRecordConsumer(),
migrationClient,
propagator,
publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)),
sharedServer.faultHandlerFactory.build(
val migrationDriver = KRaftMigrationDriver.newBuilder()
.setNodeId(config.nodeId)
.setZkRecordConsumer(controller.asInstanceOf[QuorumController].zkRecordConsumer())
.setZkMigrationClient(migrationClient)
.setPropagator(propagator)
.setInitialZkLoadHandler(publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)))
.setFaultHandler(sharedServer.faultHandlerFactory.build(
"zk migration",
fatal = false,
() => {}
),
quorumFeatures,
configSchema,
quorumControllerMetrics
)
))
.setQuorumFeatures(quorumFeatures)
.setConfigSchema(configSchema)
.setControllerMetrics(quorumControllerMetrics)
.setTime(time)
.build()
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
}

View File

@ -112,7 +112,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private volatile MetadataImage image;
private volatile boolean firstPublish;
public KRaftMigrationDriver(
KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
@ -145,21 +145,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.recordRedactor = new RecordRedactor(configSchema);
}
public KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics
) {
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM);
public static Builder newBuilder() {
return new Builder();
}
public void start() {
eventQueue.prepend(new PollEvent());
}
@ -756,4 +745,109 @@ public class KRaftMigrationDriver implements MetadataPublisher {
operationConsumer.accept(logMsg, operation);
};
}
public static class Builder {
private Integer nodeId;
private ZkRecordConsumer zkRecordConsumer;
private MigrationClient zkMigrationClient;
private LegacyPropagator propagator;
private Consumer<MetadataPublisher> initialZkLoadHandler;
private FaultHandler faultHandler;
private QuorumFeatures quorumFeatures;
private KafkaConfigSchema configSchema;
private QuorumControllerMetrics controllerMetrics;
private Time time;
public Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
return this;
}
public Builder setZkRecordConsumer(ZkRecordConsumer zkRecordConsumer) {
this.zkRecordConsumer = zkRecordConsumer;
return this;
}
public Builder setZkMigrationClient(MigrationClient zkMigrationClient) {
this.zkMigrationClient = zkMigrationClient;
return this;
}
public Builder setPropagator(LegacyPropagator propagator) {
this.propagator = propagator;
return this;
}
public Builder setInitialZkLoadHandler(Consumer<MetadataPublisher> initialZkLoadHandler) {
this.initialZkLoadHandler = initialZkLoadHandler;
return this;
}
public Builder setFaultHandler(FaultHandler faultHandler) {
this.faultHandler = faultHandler;
return this;
}
public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
this.quorumFeatures = quorumFeatures;
return this;
}
public Builder setConfigSchema(KafkaConfigSchema configSchema) {
this.configSchema = configSchema;
return this;
}
public Builder setControllerMetrics(QuorumControllerMetrics controllerMetrics) {
this.controllerMetrics = controllerMetrics;
return this;
}
public Builder setTime(Time time) {
this.time = time;
return this;
}
public KRaftMigrationDriver build() {
if (nodeId == null) {
throw new IllegalStateException("You must specify the node ID of this controller.");
}
if (zkRecordConsumer == null) {
throw new IllegalStateException("You must specify the ZkRecordConsumer.");
}
if (zkMigrationClient == null) {
throw new IllegalStateException("You must specify the MigrationClient.");
}
if (propagator == null) {
throw new IllegalStateException("You must specify the MetadataPropagator.");
}
if (initialZkLoadHandler == null) {
throw new IllegalStateException("You must specify the initial ZK load callback.");
}
if (faultHandler == null) {
throw new IllegalStateException("You must specify the FaultHandler.");
}
if (configSchema == null) {
throw new IllegalStateException("You must specify the KafkaConfigSchema.");
}
if (controllerMetrics == null) {
throw new IllegalStateException("You must specify the QuorumControllerMetrics.");
}
if (time == null) {
throw new IllegalStateException("You must specify the Time.");
}
return new KRaftMigrationDriver(
nodeId,
zkRecordConsumer,
zkMigrationClient,
propagator,
initialZkLoadHandler,
faultHandler,
quorumFeatures,
configSchema,
controllerMetrics,
time
);
}
}
}

View File

@ -114,6 +114,22 @@ public class KRaftMigrationDriverTest {
}
};
/**
* Return a {@link org.apache.kafka.metadata.migration.KRaftMigrationDriver.Builder} that uses the mocks
* defined in this class.
*/
KRaftMigrationDriver.Builder defaultTestBuilder() {
return KRaftMigrationDriver.newBuilder()
.setNodeId(3000)
.setZkRecordConsumer(new NoOpRecordConsumer())
.setInitialZkLoadHandler(metadataPublisher -> { })
.setFaultHandler(new MockFaultHandler("test"))
.setQuorumFeatures(quorumFeatures)
.setConfigSchema(KafkaConfigSchema.EMPTY)
.setControllerMetrics(metrics)
.setTime(mockTime);
}
@BeforeEach
public void setup() {
apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
@ -226,18 +242,12 @@ public class KRaftMigrationDriverTest {
.setBrokersInZk(1, 2, 3)
.setConfigMigrationClient(configClient)
.build();
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator)
.setInitialZkLoadHandler(metadataPublisher -> { });
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -312,18 +322,11 @@ public class KRaftMigrationDriverTest {
}
};
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setFaultHandler(faultHandler)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -358,19 +361,10 @@ public class KRaftMigrationDriverTest {
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build();
apiVersions.remove("6");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> {
},
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -406,18 +400,11 @@ public class KRaftMigrationDriverTest {
CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(),
new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator)
.setFaultHandler(faultHandler);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -478,19 +465,10 @@ public class KRaftMigrationDriverTest {
.setTopicMigrationClient(topicClient)
.setConfigMigrationClient(configClient)
.build();
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
verifier.verify(driver, migrationClient, topicClient, configClient);
}
}