diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index a1a3741b6a3..abd214f7e5d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -145,9 +145,13 @@ public class QuorumControllerTest { public void testCreateAndClose() throws Throwable { MockControllerMetrics metrics = new MockControllerMetrics(); try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); - QuorumControllerTestEnv controlEnv = - new QuorumControllerTestEnv(logEnv, builder -> builder.setMetrics(metrics)) + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setMetrics(metrics); + }). + build() ) { } assertTrue(metrics.isClosed(), "metrics were not closed"); @@ -159,10 +163,13 @@ public class QuorumControllerTest { @Test public void testConfigurationOperations() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - }) + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). @@ -197,10 +204,13 @@ public class QuorumControllerTest { @Test public void testDelayedConfigurationOperations() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - }) + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). @@ -211,9 +221,10 @@ public class QuorumControllerTest { } } - private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, - QuorumController controller) - throws Throwable { + private void testDelayedConfigurationOperations( + LocalLogManagerTestEnv logEnv, + QuorumController controller + ) throws Throwable { logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L)); CompletableFuture> future1 = controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap( @@ -237,14 +248,15 @@ public class QuorumControllerTest { long sessionTimeoutMillis = 1000; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, - b -> { - b.setConfigSchema(SCHEMA); - }, - OptionalLong.of(sessionTimeoutMillis), - OptionalLong.empty(), - SIMPLE_BOOTSTRAP); + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). + setBootstrapMetadata(SIMPLE_BOOTSTRAP). + build(); ) { ListenerCollection listeners = new ListenerCollection(); listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); @@ -334,14 +346,16 @@ public class QuorumControllerTest { long leaderImbalanceCheckIntervalNs = 1_000_000_000; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, - b -> { - b.setConfigSchema(SCHEMA); - }, - OptionalLong.of(sessionTimeoutMillis), - OptionalLong.of(leaderImbalanceCheckIntervalNs), - SIMPLE_BOOTSTRAP); + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). + setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)). + setBootstrapMetadata(SIMPLE_BOOTSTRAP). + build(); ) { ListenerCollection listeners = new ListenerCollection(); listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); @@ -467,11 +481,14 @@ public class QuorumControllerTest { long maxIdleIntervalNs = 1_000; long maxReplicationDelayMs = 60_000; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty()); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - b.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs)); - }); + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs)); + }). + build(); ) { ListenerCollection listeners = new ListenerCollection(); listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); @@ -509,61 +526,65 @@ public class QuorumControllerTest { @Test public void testUnregisterBroker() throws Throwable { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - })) { - ListenerCollection listeners = new ListenerCollection(); - listeners.add(new Listener().setName("PLAINTEXT"). - setHost("localhost").setPort(9092)); - QuorumController active = controlEnv.activeController(); - CompletableFuture reply = active.registerBroker( - ANONYMOUS_CONTEXT, - new BrokerRegistrationRequestData(). - setBrokerId(0). - setClusterId(active.clusterId()). - setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). - setListeners(listeners)); - assertEquals(2L, reply.get().epoch()); - CreateTopicsRequestData createTopicsRequestData = - new CreateTopicsRequestData().setTopics( - new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("foo").setNumPartitions(1). - setReplicationFactor((short) 1)).iterator())); - assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics( - ANONYMOUS_CONTEXT, - createTopicsRequestData, Collections.singleton("foo")).get(). - topics().find("foo").errorCode()); - assertEquals("Unable to replicate the partition 1 time(s): All brokers " + - "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT, - createTopicsRequestData, Collections.singleton("foo")). - get().topics().find("foo").errorMessage()); - assertEquals(new BrokerHeartbeatReply(true, false, false, false), - active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). - setWantFence(false).setBrokerEpoch(2L).setBrokerId(0). - setCurrentMetadataOffset(100000L)).get()); - assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT, + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); + ) { + ListenerCollection listeners = new ListenerCollection(); + listeners.add(new Listener().setName("PLAINTEXT"). + setHost("localhost").setPort(9092)); + QuorumController active = controlEnv.activeController(); + CompletableFuture reply = active.registerBroker( + ANONYMOUS_CONTEXT, + new BrokerRegistrationRequestData(). + setBrokerId(0). + setClusterId(active.clusterId()). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). + setListeners(listeners)); + assertEquals(2L, reply.get().epoch()); + CreateTopicsRequestData createTopicsRequestData = + new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor((short) 1)).iterator())); + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics( + ANONYMOUS_CONTEXT, + createTopicsRequestData, Collections.singleton("foo")).get(). + topics().find("foo").errorCode()); + assertEquals("Unable to replicate the partition 1 time(s): All brokers " + + "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")). - get().topics().find("foo").errorCode()); - CompletableFuture topicPartitionFuture = active.appendReadEvent( - "debugGetPartition", OptionalLong.empty(), () -> { - Iterator iterator = active. - replicationControl().brokersToIsrs().iterator(0, true); - assertTrue(iterator.hasNext()); - return iterator.next(); - }); - assertEquals(0, topicPartitionFuture.get().partitionId()); - active.unregisterBroker(ANONYMOUS_CONTEXT, 0).get(); - topicPartitionFuture = active.appendReadEvent( - "debugGetPartition", OptionalLong.empty(), () -> { - Iterator iterator = active. - replicationControl().brokersToIsrs().partitionsWithNoLeader(); - assertTrue(iterator.hasNext()); - return iterator.next(); - }); - assertEquals(0, topicPartitionFuture.get().partitionId()); - } + get().topics().find("foo").errorMessage()); + assertEquals(new BrokerHeartbeatReply(true, false, false, false), + active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). + setWantFence(false).setBrokerEpoch(2L).setBrokerId(0). + setCurrentMetadataOffset(100000L)).get()); + assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT, + createTopicsRequestData, Collections.singleton("foo")). + get().topics().find("foo").errorCode()); + CompletableFuture topicPartitionFuture = active.appendReadEvent( + "debugGetPartition", OptionalLong.empty(), () -> { + Iterator iterator = active. + replicationControl().brokersToIsrs().iterator(0, true); + assertTrue(iterator.hasNext()); + return iterator.next(); + }); + assertEquals(0, topicPartitionFuture.get().partitionId()); + active.unregisterBroker(ANONYMOUS_CONTEXT, 0).get(); + topicPartitionFuture = active.appendReadEvent( + "debugGetPartition", OptionalLong.empty(), () -> { + Iterator iterator = active. + replicationControl().brokersToIsrs().partitionsWithNoLeader(); + assertTrue(iterator.hasNext()); + return iterator.next(); + }); + assertEquals(0, topicPartitionFuture.get().partitionId()); } } @@ -572,7 +593,9 @@ public class QuorumControllerTest { } private BrokerRegistrationRequestData.FeatureCollection brokerFeatures( - MetadataVersion minVersion, MetadataVersion maxVersion) { + MetadataVersion minVersion, + MetadataVersion maxVersion + ) { BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection(); features.add(new BrokerRegistrationRequestData.Feature() .setName(MetadataVersion.FEATURE_NAME) @@ -599,71 +622,77 @@ public class QuorumControllerTest { Map brokerEpochs = new HashMap<>(); RawSnapshotReader reader = null; Uuid fooId; - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv( - logEnv, - b -> b.setConfigSchema(SCHEMA), - OptionalLong.empty(), - OptionalLong.empty(), - SIMPLE_BOOTSTRAP) - ) { - QuorumController active = controlEnv.activeController(); - for (int i = 0; i < numBrokers; i++) { - BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, - new BrokerRegistrationRequestData(). - setBrokerId(i). - setRack(null). - setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). - setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)). - setListeners(new ListenerCollection(Arrays.asList(new Listener(). - setName("PLAINTEXT").setHost("localhost"). - setPort(9092 + i)).iterator()))).get(); - brokerEpochs.put(i, reply.epoch()); - } - for (int i = 0; i < numBrokers - 1; i++) { - assertEquals(new BrokerHeartbeatReply(true, false, false, false), - active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). - setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)). - setBrokerId(i).setCurrentMetadataOffset(100000L)).get()); - } - CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT, - new CreateTopicsRequestData().setTopics( - new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("foo").setNumPartitions(-1). - setReplicationFactor((short) -1). - setAssignments(new CreatableReplicaAssignmentCollection( - Arrays.asList(new CreatableReplicaAssignment(). - setPartitionIndex(0). - setBrokerIds(Arrays.asList(0, 1, 2)), - new CreatableReplicaAssignment(). - setPartitionIndex(1). - setBrokerIds(Arrays.asList(1, 2, 0))). - iterator()))).iterator())), - Collections.singleton("foo")).get(); - fooId = fooData.topics().find("foo").topicId(); - active.allocateProducerIds(ANONYMOUS_CONTEXT, - new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); - long snapshotLogOffset = active.beginWritingSnapshot().get(); - reader = logEnv.waitForSnapshot(snapshotLogOffset); - SnapshotReader snapshot = createSnapshotReader(reader); - assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset()); - checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot); + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + setBootstrapMetadata(SIMPLE_BOOTSTRAP). + build(); + ) { + QuorumController active = controlEnv.activeController(); + for (int i = 0; i < numBrokers; i++) { + BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, + new BrokerRegistrationRequestData(). + setBrokerId(i). + setRack(null). + setClusterId(active.clusterId()). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)). + setListeners(new ListenerCollection(Arrays.asList(new Listener(). + setName("PLAINTEXT").setHost("localhost"). + setPort(9092 + i)).iterator()))).get(); + brokerEpochs.put(i, reply.epoch()); } + for (int i = 0; i < numBrokers - 1; i++) { + assertEquals(new BrokerHeartbeatReply(true, false, false, false), + active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). + setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)). + setBrokerId(i).setCurrentMetadataOffset(100000L)).get()); + } + CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT, + new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo").setNumPartitions(-1). + setReplicationFactor((short) -1). + setAssignments(new CreatableReplicaAssignmentCollection( + Arrays.asList(new CreatableReplicaAssignment(). + setPartitionIndex(0). + setBrokerIds(Arrays.asList(0, 1, 2)), + new CreatableReplicaAssignment(). + setPartitionIndex(1). + setBrokerIds(Arrays.asList(1, 2, 0))). + iterator()))).iterator())), + Collections.singleton("foo")).get(); + fooId = fooData.topics().find("foo").topicId(); + active.allocateProducerIds(ANONYMOUS_CONTEXT, + new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); + long snapshotLogOffset = active.beginWritingSnapshot().get(); + reader = logEnv.waitForSnapshot(snapshotLogOffset); + SnapshotReader snapshot = createSnapshotReader(reader); + assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset()); + checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot); } - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(reader))) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - })) { - QuorumController active = controlEnv.activeController(); - long snapshotLogOffset = active.beginWritingSnapshot().get(); - SnapshotReader snapshot = createSnapshotReader( - logEnv.waitForSnapshot(snapshotLogOffset) - ); - assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset()); - checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot); - } + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + setSnapshotReader(reader). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); + ) { + QuorumController active = controlEnv.activeController(); + long snapshotLogOffset = active.beginWritingSnapshot().get(); + SnapshotReader snapshot = createSnapshotReader( + logEnv.waitForSnapshot(snapshotLogOffset) + ); + assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset()); + checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot); } } @@ -673,36 +702,110 @@ public class QuorumControllerTest { final int maxNewRecordBytes = 4; Map brokerEpochs = new HashMap<>(); Uuid fooId; - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); - b.setBootstrapMetadata(SIMPLE_BOOTSTRAP); - })) { - QuorumController active = controlEnv.activeController(); - for (int i = 0; i < numBrokers; i++) { - BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, - new BrokerRegistrationRequestData(). - setBrokerId(i). - setRack(null). - setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). - setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)). - setListeners(new ListenerCollection(Arrays.asList(new Listener(). - setName("PLAINTEXT").setHost("localhost"). - setPort(9092 + i)).iterator()))).get(); - brokerEpochs.put(i, reply.epoch()); - } - for (int i = 0; i < numBrokers - 1; i++) { - assertEquals(new BrokerHeartbeatReply(true, false, false, false), - active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). - setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)). - setBrokerId(i).setCurrentMetadataOffset(100000L)).get()); - } - CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT, - new CreateTopicsRequestData().setTopics( + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); + controllerBuilder.setBootstrapMetadata(SIMPLE_BOOTSTRAP); + }). + build(); + ) { + QuorumController active = controlEnv.activeController(); + for (int i = 0; i < numBrokers; i++) { + BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, + new BrokerRegistrationRequestData(). + setBrokerId(i). + setRack(null). + setClusterId(active.clusterId()). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)). + setListeners(new ListenerCollection(Arrays.asList(new Listener(). + setName("PLAINTEXT").setHost("localhost"). + setPort(9092 + i)).iterator()))).get(); + brokerEpochs.put(i, reply.epoch()); + } + for (int i = 0; i < numBrokers - 1; i++) { + assertEquals(new BrokerHeartbeatReply(true, false, false, false), + active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). + setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)). + setBrokerId(i).setCurrentMetadataOffset(100000L)).get()); + } + CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT, + new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo").setNumPartitions(-1). + setReplicationFactor((short) -1). + setAssignments(new CreatableReplicaAssignmentCollection( + Arrays.asList(new CreatableReplicaAssignment(). + setPartitionIndex(0). + setBrokerIds(Arrays.asList(0, 1, 2)), + new CreatableReplicaAssignment(). + setPartitionIndex(1). + setBrokerIds(Arrays.asList(1, 2, 0))). + iterator()))).iterator())), + Collections.singleton("foo")).get(); + fooId = fooData.topics().find("foo").topicId(); + active.allocateProducerIds(ANONYMOUS_CONTEXT, + new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); + + SnapshotReader snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot()); + checkSnapshotSubcontent( + expectedSnapshotContent(fooId, brokerEpochs), + snapshot + ); + } + } + + @Test + public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable { + final int numBrokers = 4; + final int maxNewRecordBytes = 1000; + Map brokerEpochs = new HashMap<>(); + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); + }). + build(); + ) { + QuorumController active = controlEnv.activeController(); + for (int i = 0; i < numBrokers; i++) { + BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, + new BrokerRegistrationRequestData(). + setBrokerId(i). + setRack(null). + setClusterId(active.clusterId()). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)). + setListeners(new ListenerCollection(Arrays.asList(new Listener(). + setName("PLAINTEXT").setHost("localhost"). + setPort(9092 + i)).iterator()))).get(); + brokerEpochs.put(i, reply.epoch()); + assertEquals(new BrokerHeartbeatReply(true, false, false, false), + active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). + setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)). + setBrokerId(i).setCurrentMetadataOffset(100000L)).get()); + } + + assertTrue(logEnv.appendedBytes() < maxNewRecordBytes, + String.format("%s appended bytes is not less than %s max new record bytes", + logEnv.appendedBytes(), + maxNewRecordBytes)); + + // Keep creating topic until we reached the max bytes limit + int counter = 0; + while (logEnv.appendedBytes() < maxNewRecordBytes) { + counter += 1; + String topicName = String.format("foo-%s", counter); + active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics( new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("foo").setNumPartitions(-1). + new CreatableTopic().setName(topicName).setNumPartitions(-1). setReplicationFactor((short) -1). setAssignments(new CreatableReplicaAssignmentCollection( Arrays.asList(new CreatableReplicaAssignment(). @@ -712,75 +815,9 @@ public class QuorumControllerTest { setPartitionIndex(1). setBrokerIds(Arrays.asList(1, 2, 0))). iterator()))).iterator())), - Collections.singleton("foo")).get(); - fooId = fooData.topics().find("foo").topicId(); - active.allocateProducerIds(ANONYMOUS_CONTEXT, - new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); - - SnapshotReader snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot()); - checkSnapshotSubcontent( - expectedSnapshotContent(fooId, brokerEpochs), - snapshot - ); - } - } - } - - @Test - public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable { - final int numBrokers = 4; - final int maxNewRecordBytes = 1000; - Map brokerEpochs = new HashMap<>(); - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); - })) { - QuorumController active = controlEnv.activeController(); - for (int i = 0; i < numBrokers; i++) { - BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, - new BrokerRegistrationRequestData(). - setBrokerId(i). - setRack(null). - setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)). - setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)). - setListeners(new ListenerCollection(Arrays.asList(new Listener(). - setName("PLAINTEXT").setHost("localhost"). - setPort(9092 + i)).iterator()))).get(); - brokerEpochs.put(i, reply.epoch()); - assertEquals(new BrokerHeartbeatReply(true, false, false, false), - active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData(). - setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)). - setBrokerId(i).setCurrentMetadataOffset(100000L)).get()); - } - - assertTrue(logEnv.appendedBytes() < maxNewRecordBytes, - String.format("%s appended bytes is not less than %s max new record bytes", - logEnv.appendedBytes(), - maxNewRecordBytes)); - - // Keep creating topic until we reached the max bytes limit - int counter = 0; - while (logEnv.appendedBytes() < maxNewRecordBytes) { - counter += 1; - String topicName = String.format("foo-%s", counter); - active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics( - new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName(topicName).setNumPartitions(-1). - setReplicationFactor((short) -1). - setAssignments(new CreatableReplicaAssignmentCollection( - Arrays.asList(new CreatableReplicaAssignment(). - setPartitionIndex(0). - setBrokerIds(Arrays.asList(0, 1, 2)), - new CreatableReplicaAssignment(). - setPartitionIndex(1). - setBrokerIds(Arrays.asList(1, 2, 0))). - iterator()))).iterator())), - Collections.singleton(topicName)).get(60, TimeUnit.SECONDS); - } - logEnv.waitForLatestSnapshot(); + Collections.singleton(topicName)).get(60, TimeUnit.SECONDS); } + logEnv.waitForLatestSnapshot(); } } @@ -912,52 +949,55 @@ public class QuorumControllerTest { */ @Test public void testTimeouts() throws Throwable { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - })) { - QuorumController controller = controlEnv.activeController(); - CountDownLatch countDownLatch = controller.pause(); - long now = controller.time().nanoseconds(); - ControllerRequestContext context0 = new ControllerRequestContext( - new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now)); - CompletableFuture createFuture = - controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0). - setTopics(new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("foo")).iterator())), - Collections.emptySet()); - CompletableFuture> deleteFuture = - controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID)); - CompletableFuture>> findTopicIdsFuture = - controller.findTopicIds(context0, Collections.singletonList("foo")); - CompletableFuture>> findTopicNamesFuture = - controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID)); - CompletableFuture> createPartitionsFuture = - controller.createPartitions(context0, Collections.singletonList( - new CreatePartitionsTopic()), false); - CompletableFuture electLeadersFuture = - controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0). - setTopicPartitions(null)); - CompletableFuture alterReassignmentsFuture = - controller.alterPartitionReassignments(context0, - new AlterPartitionReassignmentsRequestData().setTimeoutMs(0). - setTopics(Collections.singletonList(new ReassignableTopic()))); - CompletableFuture listReassignmentsFuture = - controller.listPartitionReassignments(context0, - new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0)); - while (controller.time().nanoseconds() == now) { - Thread.sleep(0, 10); - } - countDownLatch.countDown(); - assertYieldsTimeout(createFuture); - assertYieldsTimeout(deleteFuture); - assertYieldsTimeout(findTopicIdsFuture); - assertYieldsTimeout(findTopicNamesFuture); - assertYieldsTimeout(createPartitionsFuture); - assertYieldsTimeout(electLeadersFuture); - assertYieldsTimeout(alterReassignmentsFuture); - assertYieldsTimeout(listReassignmentsFuture); + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); + ) { + QuorumController controller = controlEnv.activeController(); + CountDownLatch countDownLatch = controller.pause(); + long now = controller.time().nanoseconds(); + ControllerRequestContext context0 = new ControllerRequestContext( + new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now)); + CompletableFuture createFuture = + controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0). + setTopics(new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo")).iterator())), + Collections.emptySet()); + CompletableFuture> deleteFuture = + controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID)); + CompletableFuture>> findTopicIdsFuture = + controller.findTopicIds(context0, Collections.singletonList("foo")); + CompletableFuture>> findTopicNamesFuture = + controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID)); + CompletableFuture> createPartitionsFuture = + controller.createPartitions(context0, Collections.singletonList( + new CreatePartitionsTopic()), false); + CompletableFuture electLeadersFuture = + controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0). + setTopicPartitions(null)); + CompletableFuture alterReassignmentsFuture = + controller.alterPartitionReassignments(context0, + new AlterPartitionReassignmentsRequestData().setTimeoutMs(0). + setTopics(Collections.singletonList(new ReassignableTopic()))); + CompletableFuture listReassignmentsFuture = + controller.listPartitionReassignments(context0, + new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0)); + while (controller.time().nanoseconds() == now) { + Thread.sleep(0, 10); } + countDownLatch.countDown(); + assertYieldsTimeout(createFuture); + assertYieldsTimeout(deleteFuture); + assertYieldsTimeout(findTopicIdsFuture); + assertYieldsTimeout(findTopicNamesFuture); + assertYieldsTimeout(createPartitionsFuture); + assertYieldsTimeout(electLeadersFuture); + assertYieldsTimeout(alterReassignmentsFuture); + assertYieldsTimeout(listReassignmentsFuture); } } @@ -972,37 +1012,41 @@ public class QuorumControllerTest { */ @Test public void testEarlyControllerResults() throws Throwable { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - })) { - QuorumController controller = controlEnv.activeController(); - CountDownLatch countDownLatch = controller.pause(); - CompletableFuture createFuture = - controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). - setTimeoutMs(120000), Collections.emptySet()); - CompletableFuture> deleteFuture = - controller.deleteTopics(ANONYMOUS_CONTEXT, Collections.emptyList()); - CompletableFuture>> findTopicIdsFuture = - controller.findTopicIds(ANONYMOUS_CONTEXT, Collections.emptyList()); - CompletableFuture>> findTopicNamesFuture = - controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList()); - CompletableFuture> createPartitionsFuture = - controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false); - CompletableFuture electLeadersFuture = - controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData()); - CompletableFuture alterReassignmentsFuture = - controller.alterPartitionReassignments(ANONYMOUS_CONTEXT, - new AlterPartitionReassignmentsRequestData()); - createFuture.get(); - deleteFuture.get(); - findTopicIdsFuture.get(); - findTopicNamesFuture.get(); - createPartitionsFuture.get(); - electLeadersFuture.get(); - alterReassignmentsFuture.get(); - countDownLatch.countDown(); - } + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); + ) { + QuorumController controller = controlEnv.activeController(); + CountDownLatch countDownLatch = controller.pause(); + CompletableFuture createFuture = + controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). + setTimeoutMs(120000), Collections.emptySet()); + CompletableFuture> deleteFuture = + controller.deleteTopics(ANONYMOUS_CONTEXT, Collections.emptyList()); + CompletableFuture>> findTopicIdsFuture = + controller.findTopicIds(ANONYMOUS_CONTEXT, Collections.emptyList()); + CompletableFuture>> findTopicNamesFuture = + controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList()); + CompletableFuture> createPartitionsFuture = + controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false); + CompletableFuture electLeadersFuture = + controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData()); + CompletableFuture alterReassignmentsFuture = + controller.alterPartitionReassignments(ANONYMOUS_CONTEXT, + new AlterPartitionReassignmentsRequestData()); + createFuture.get(); + deleteFuture.get(); + findTopicIdsFuture.get(); + findTopicNamesFuture.get(); + createPartitionsFuture.get(); + electLeadersFuture.get(); + alterReassignmentsFuture.get(); + countDownLatch.countDown(); } } @@ -1013,10 +1057,15 @@ public class QuorumControllerTest { int numPartitions = 3; String topicName = "topic-name"; - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - })) { + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); + ) { QuorumController controller = controlEnv.activeController(); Map brokerEpochs = registerBrokers(controller, numBrokers); @@ -1165,38 +1214,42 @@ public class QuorumControllerTest { @Test public void testConfigResourceExistenceChecker() throws Throwable { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - })) { - QuorumController active = controlEnv.activeController(); - registerBrokers(active, 5); - active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). - setTopics(new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("foo"). - setReplicationFactor((short) 3). - setNumPartitions(1)).iterator())), - Collections.singleton("foo")).get(); - ConfigResourceExistenceChecker checker = - active.new ConfigResourceExistenceChecker(); - // A ConfigResource with type=BROKER and name=(empty string) represents - // the default broker resource. It is used to set cluster configs. - checker.accept(new ConfigResource(BROKER, "")); + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + build(); + ) { + QuorumController active = controlEnv.activeController(); + registerBrokers(active, 5); + active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). + setTopics(new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo"). + setReplicationFactor((short) 3). + setNumPartitions(1)).iterator())), + Collections.singleton("foo")).get(); + ConfigResourceExistenceChecker checker = + active.new ConfigResourceExistenceChecker(); + // A ConfigResource with type=BROKER and name=(empty string) represents + // the default broker resource. It is used to set cluster configs. + checker.accept(new ConfigResource(BROKER, "")); - // Broker 3 exists, so we can set a configuration for it. - checker.accept(new ConfigResource(BROKER, "3")); + // Broker 3 exists, so we can set a configuration for it. + checker.accept(new ConfigResource(BROKER, "3")); - // Broker 10 does not exist, so this should throw an exception. - assertThrows(BrokerIdNotRegisteredException.class, - () -> checker.accept(new ConfigResource(BROKER, "10"))); + // Broker 10 does not exist, so this should throw an exception. + assertThrows(BrokerIdNotRegisteredException.class, + () -> checker.accept(new ConfigResource(BROKER, "10"))); - // Topic foo exists, so we can set a configuration for it. - checker.accept(new ConfigResource(TOPIC, "foo")); + // Topic foo exists, so we can set a configuration for it. + checker.accept(new ConfigResource(TOPIC, "foo")); - // Topic bar does not exist, so this should throw an exception. - assertThrows(UnknownTopicOrPartitionException.class, - () -> checker.accept(new ConfigResource(TOPIC, "bar"))); - } + // Topic bar does not exist, so this should throw an exception. + assertThrows(UnknownTopicOrPartitionException.class, + () -> checker.accept(new ConfigResource(TOPIC, "bar"))); } } @@ -1214,18 +1267,22 @@ public class QuorumControllerTest { authorizer.configure(Collections.emptyMap()); authorizers.add(authorizer); } - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv( - numControllers, - Optional.empty(), - shared -> { - shared.setInitialMaxReadOffset(2); - } - )) { + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(numControllers). + setSharedLogDataInitializer(sharedLogData -> { + sharedLogData.setInitialMaxReadOffset(2); + }). + build() + ) { logEnv.appendInitialRecords(expectedSnapshotContent(FOO_ID, ALL_ZERO_BROKER_EPOCHS)); logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2)); - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setAuthorizer(authorizers.get(b.nodeId())); - })) { + try ( + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setAuthorizer(authorizers.get(controllerBuilder.nodeId())); + }). + build() + ) { assertInitialLoadFuturesNotComplete(authorizers); logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE); QuorumController active = controlEnv.activeController(); @@ -1241,25 +1298,27 @@ public class QuorumControllerTest { @Test public void testFatalMetadataReplayErrorOnActive() throws Throwable { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - })) { - QuorumController active = controlEnv.activeController(); - CompletableFuture future = active.appendWriteEvent("errorEvent", - OptionalLong.empty(), () -> { - return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion( - new ConfigRecord(). - setName(null). - setResourceName(null). - setResourceType((byte) 255). - setValue(null), (short) 0)), null); - }); - assertThrows(ExecutionException.class, () -> future.get()); - assertEquals(NullPointerException.class, - controlEnv.fatalFaultHandler().firstException().getCause().getClass()); - controlEnv.fatalFaultHandler().setIgnore(true); - controlEnv.metadataFaultHandler().setIgnore(true); - } + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + build(); + ) { + QuorumController active = controlEnv.activeController(); + CompletableFuture future = active.appendWriteEvent("errorEvent", + OptionalLong.empty(), () -> { + return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord(). + setName(null). + setResourceName(null). + setResourceType((byte) 255). + setValue(null), (short) 0)), null); + }); + assertThrows(ExecutionException.class, () -> future.get()); + assertEquals(NullPointerException.class, + controlEnv.fatalFaultHandler().firstException().getCause().getClass()); + controlEnv.fatalFaultHandler().setIgnore(true); + controlEnv.metadataFaultHandler().setIgnore(true); } } @@ -1323,60 +1382,68 @@ public class QuorumControllerTest { @Test public void testUpgradeFromPreProductionVersion() throws Exception { - try (InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS)) { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of( - FileRawSnapshotReader.open(initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0))) - )) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) { - QuorumController active = controlEnv.activeController(); - TestUtils.waitForCondition(() -> - active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1), - "Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1); - // The ConfigRecord in our bootstrap should not have been applied, since there - // were already records present. - assertEquals(Collections.emptyMap(), active.configurationControl(). - getConfigs(new ConfigResource(BROKER, ""))); - } - } + try ( + InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS); + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + setSnapshotReader(FileRawSnapshotReader.open( + initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0))). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + setBootstrapMetadata(COMPLEX_BOOTSTRAP). + build(); + ) { + QuorumController active = controlEnv.activeController(); + TestUtils.waitForCondition(() -> + active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1), + "Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1); + // The ConfigRecord in our bootstrap should not have been applied, since there + // were already records present. + assertEquals(Collections.emptyMap(), active.configurationControl(). + getConfigs(new ConfigResource(BROKER, ""))); } } @Test public void testInsertBootstrapRecordsToEmptyLog() throws Exception { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty()) + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setConfigSchema(SCHEMA); + }). + setBootstrapMetadata(COMPLEX_BOOTSTRAP). + build(); ) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { - b.setConfigSchema(SCHEMA); - }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) { - QuorumController active = controlEnv.activeController(); + QuorumController active = controlEnv.activeController(); - ControllerRequestContext ctx = new ControllerRequestContext( - new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE)); + ControllerRequestContext ctx = new ControllerRequestContext( + new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE)); - TestUtils.waitForCondition(() -> { - FinalizedControllerFeatures features = active.finalizedFeatures(ctx).get(); - Optional metadataVersionOpt = features.get(MetadataVersion.FEATURE_NAME); - return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt); - }, "Failed to see expected metadata version from bootstrap metadata"); + TestUtils.waitForCondition(() -> { + FinalizedControllerFeatures features = active.finalizedFeatures(ctx).get(); + Optional metadataVersionOpt = features.get(MetadataVersion.FEATURE_NAME); + return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt); + }, "Failed to see expected metadata version from bootstrap metadata"); - TestUtils.waitForCondition(() -> { - ConfigResource defaultBrokerResource = new ConfigResource(BROKER, ""); + TestUtils.waitForCondition(() -> { + ConfigResource defaultBrokerResource = new ConfigResource(BROKER, ""); - Map> configs = Collections.singletonMap( - defaultBrokerResource, - Collections.emptyList() - ); + Map> configs = Collections.singletonMap( + defaultBrokerResource, + Collections.emptyList() + ); - Map>> results = - active.describeConfigs(ctx, configs).get(); + Map>> results = + active.describeConfigs(ctx, configs).get(); - ResultOrError> resultOrError = results.get(defaultBrokerResource); - return resultOrError.isResult() && - Collections.singletonMap("foo", "bar").equals(resultOrError.result()); - }, "Failed to see expected config change from bootstrap metadata"); - } + ResultOrError> resultOrError = results.get(defaultBrokerResource); + return resultOrError.isResult() && + Collections.singletonMap("foo", "bar").equals(resultOrError.result()); + }, "Failed to see expected config change from bootstrap metadata"); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index fd56ef6a644..993ba848aad 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -18,7 +18,6 @@ package org.apache.kafka.controller; import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.controller.QuorumController.Builder; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.LeaderAndEpoch; @@ -44,28 +43,51 @@ public class QuorumControllerTestEnv implements AutoCloseable { private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler"); private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler"); - public QuorumControllerTestEnv( - LocalLogManagerTestEnv logEnv, - Consumer builderConsumer - ) throws Exception { - this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), - BootstrapMetadata.fromVersion(MetadataVersion.latest(), "test-provided version")); + public static class Builder { + private final LocalLogManagerTestEnv logEnv; + private Consumer controllerBuilderInitializer = __ -> { }; + private OptionalLong sessionTimeoutMillis = OptionalLong.empty(); + private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty(); + private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. + fromVersion(MetadataVersion.latest(), "test-provided version"); + + public Builder(LocalLogManagerTestEnv logEnv) { + this.logEnv = logEnv; + } + + public Builder setControllerBuilderInitializer(Consumer controllerBuilderInitializer) { + this.controllerBuilderInitializer = controllerBuilderInitializer; + return this; + } + + public Builder setSessionTimeoutMillis(OptionalLong sessionTimeoutMillis) { + this.sessionTimeoutMillis = sessionTimeoutMillis; + return this; + } + + public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong leaderImbalanceCheckIntervalNs) { + this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs; + return this; + } + + public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) { + this.bootstrapMetadata = bootstrapMetadata; + return this; + } + + public QuorumControllerTestEnv build() throws Exception { + return new QuorumControllerTestEnv( + logEnv, + controllerBuilderInitializer, + sessionTimeoutMillis, + leaderImbalanceCheckIntervalNs, + bootstrapMetadata); + } } - public QuorumControllerTestEnv( - LocalLogManagerTestEnv logEnv, - Consumer builderConsumer, - OptionalLong sessionTimeoutMillis, - OptionalLong leaderImbalanceCheckIntervalNs, - MetadataVersion metadataVersion - ) throws Exception { - this(logEnv, builderConsumer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs, - BootstrapMetadata.fromVersion(metadataVersion, "test-provided version")); - } - - public QuorumControllerTestEnv( + private QuorumControllerTestEnv( LocalLogManagerTestEnv logEnv, - Consumer builderConsumer, + Consumer controllerBuilderInitializer, OptionalLong sessionTimeoutMillis, OptionalLong leaderImbalanceCheckIntervalNs, BootstrapMetadata bootstrapMetadata @@ -87,7 +109,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { }); builder.setFatalFaultHandler(fatalFaultHandler); builder.setMetadataFaultHandler(metadataFaultHandler); - builderConsumer.accept(builder); + controllerBuilderInitializer.accept(builder); this.controllers.add(builder.build()); } } catch (Exception e) { diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java index 7b5e26d79f6..066ca4e000d 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Timeout; import java.util.Arrays; import java.util.List; -import java.util.Optional; import java.util.OptionalInt; import java.util.stream.Collectors; @@ -44,8 +43,10 @@ public class LocalLogManagerTest { */ @Test public void testCreateAndClose() throws Exception { - try (LocalLogManagerTestEnv env = - LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) { + try ( + LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1). + buildWithMockListeners(); + ) { env.close(); assertEquals(null, env.firstError.get()); } @@ -56,8 +57,10 @@ public class LocalLogManagerTest { */ @Test public void testClaimsLeadership() throws Exception { - try (LocalLogManagerTestEnv env = - LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) { + try ( + LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1). + buildWithMockListeners(); + ) { assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader()); env.close(); assertEquals(null, env.firstError.get()); @@ -69,8 +72,10 @@ public class LocalLogManagerTest { */ @Test public void testPassLeadership() throws Exception { - try (LocalLogManagerTestEnv env = - LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) { + try ( + LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3). + buildWithMockListeners(); + ) { LeaderAndEpoch first = env.waitForLeader(); LeaderAndEpoch cur = first; do { @@ -123,8 +128,10 @@ public class LocalLogManagerTest { */ @Test public void testCommits() throws Exception { - try (LocalLogManagerTestEnv env = - LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) { + try ( + LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3). + buildWithMockListeners(); + ) { LeaderAndEpoch leaderInfo = env.waitForLeader(); int leaderId = leaderInfo.leaderId().orElseThrow(() -> new AssertionError("Current leader is undefined") diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java index 17c9c467124..1693b62be1a 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java @@ -66,31 +66,59 @@ public class LocalLogManagerTestEnv implements AutoCloseable { */ private final List logManagers; - public static LocalLogManagerTestEnv createWithMockListeners( - int numManagers, - Optional snapshot - ) throws Exception { - LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers, snapshot); - try { - for (LocalLogManager logManager : testEnv.logManagers) { - logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt())); - } - } catch (Exception e) { - testEnv.close(); - throw e; + public static class Builder { + private final int numManagers; + private Optional snapshotReader = Optional.empty(); + private Consumer sharedLogDataInitializer = __ -> { }; + + public Builder(int numManagers) { + this.numManagers = numManagers; + } + + public Builder setSnapshotReader(RawSnapshotReader snapshotReader) { + this.snapshotReader = Optional.of(snapshotReader); + return this; + } + + public Builder setSharedLogDataInitializer(Consumer sharedLogDataInitializer) { + this.sharedLogDataInitializer = sharedLogDataInitializer; + return this; + } + + public LocalLogManagerTestEnv build() { + return new LocalLogManagerTestEnv( + numManagers, + snapshotReader, + sharedLogDataInitializer); + } + + public LocalLogManagerTestEnv buildWithMockListeners() { + LocalLogManagerTestEnv env = build(); + try { + for (LocalLogManager logManager : env.logManagers) { + logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt())); + } + } catch (Exception e) { + try { + env.close(); + } catch (Exception t) { + log.error("Error while closing new log environment", t); + } + throw e; + } + return env; } - return testEnv; } - public LocalLogManagerTestEnv( + private LocalLogManagerTestEnv( int numManagers, - Optional snapshot, - Consumer dataSetup - ) throws Exception { + Optional snapshotReader, + Consumer sharedLogDataInitializer + ) { clusterId = Uuid.randomUuid().toString(); dir = TestUtils.tempDirectory(); - shared = new SharedLogData(snapshot); - dataSetup.accept(shared); + shared = new SharedLogData(snapshotReader); + sharedLogDataInitializer.accept(shared); List newLogManagers = new ArrayList<>(numManagers); try { for (int nodeId = 0; nodeId < numManagers; nodeId++) { @@ -112,13 +140,6 @@ public class LocalLogManagerTestEnv implements AutoCloseable { this.logManagers = newLogManagers; } - public LocalLogManagerTestEnv( - int numManagers, - Optional snapshot - ) throws Exception { - this(numManagers, snapshot, __ -> { }); - } - /** * Append some records to the log. This method is meant to be called before the * controllers are started, to simulate a pre-existing metadata log.