mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: fix indentation and add builders in some KRaft tests (#12720)
Add builders for LocalLogManagerTestEnv and QuorumControllerTestEnv, since the constructor overloads were starting to get unwieldy. Make indentation more consistent in QuorumControllerTest. Take advantage of the fact that you can initialize multiple resources in a Java try-with-resources block to avoid excessive indentation in a few cases. Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
		
							parent
							
								
									e8d32563f3
								
							
						
					
					
						commit
						1c07095cbd
					
				|  | @ -145,9 +145,13 @@ public class QuorumControllerTest { | |||
|     public void testCreateAndClose() throws Throwable { | ||||
|         MockControllerMetrics metrics = new MockControllerMetrics(); | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = | ||||
|                 new QuorumControllerTestEnv(logEnv, builder -> builder.setMetrics(metrics)) | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setMetrics(metrics); | ||||
|                 }). | ||||
|                 build() | ||||
|         ) { | ||||
|         } | ||||
|         assertTrue(metrics.isClosed(), "metrics were not closed"); | ||||
|  | @ -159,10 +163,13 @@ public class QuorumControllerTest { | |||
|     @Test | ||||
|     public void testConfigurationOperations() throws Throwable { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             }) | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, | ||||
|                 new BrokerRegistrationRequestData(). | ||||
|  | @ -197,10 +204,13 @@ public class QuorumControllerTest { | |||
|     @Test | ||||
|     public void testDelayedConfigurationOperations() throws Throwable { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             }) | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, | ||||
|                 new BrokerRegistrationRequestData(). | ||||
|  | @ -211,9 +221,10 @@ public class QuorumControllerTest { | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, | ||||
|                                                     QuorumController controller) | ||||
|                                                     throws Throwable { | ||||
|     private void testDelayedConfigurationOperations( | ||||
|         LocalLogManagerTestEnv logEnv, | ||||
|         QuorumController controller | ||||
|     ) throws Throwable { | ||||
|         logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L)); | ||||
|         CompletableFuture<Map<ConfigResource, ApiError>> future1 = | ||||
|             controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap( | ||||
|  | @ -237,14 +248,15 @@ public class QuorumControllerTest { | |||
|         long sessionTimeoutMillis = 1000; | ||||
| 
 | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, | ||||
|                 b -> { | ||||
|                     b.setConfigSchema(SCHEMA); | ||||
|                 }, | ||||
|                 OptionalLong.of(sessionTimeoutMillis), | ||||
|                 OptionalLong.empty(), | ||||
|                 SIMPLE_BOOTSTRAP); | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). | ||||
|                 setBootstrapMetadata(SIMPLE_BOOTSTRAP). | ||||
|                 build(); | ||||
|         ) { | ||||
|             ListenerCollection listeners = new ListenerCollection(); | ||||
|             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); | ||||
|  | @ -334,14 +346,16 @@ public class QuorumControllerTest { | |||
|         long leaderImbalanceCheckIntervalNs = 1_000_000_000; | ||||
| 
 | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, | ||||
|                 b -> { | ||||
|                     b.setConfigSchema(SCHEMA); | ||||
|                 }, | ||||
|                 OptionalLong.of(sessionTimeoutMillis), | ||||
|                 OptionalLong.of(leaderImbalanceCheckIntervalNs), | ||||
|                 SIMPLE_BOOTSTRAP); | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). | ||||
|                 setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)). | ||||
|                 setBootstrapMetadata(SIMPLE_BOOTSTRAP). | ||||
|                 build(); | ||||
|         ) { | ||||
|             ListenerCollection listeners = new ListenerCollection(); | ||||
|             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); | ||||
|  | @ -467,11 +481,14 @@ public class QuorumControllerTest { | |||
|         long maxIdleIntervalNs = 1_000; | ||||
|         long maxReplicationDelayMs = 60_000; | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|                 b.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs)); | ||||
|             }); | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                     controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs)); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             ListenerCollection listeners = new ListenerCollection(); | ||||
|             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); | ||||
|  | @ -509,10 +526,15 @@ public class QuorumControllerTest { | |||
| 
 | ||||
|     @Test | ||||
|     public void testUnregisterBroker() throws Throwable { | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             ListenerCollection listeners = new ListenerCollection(); | ||||
|             listeners.add(new Listener().setName("PLAINTEXT"). | ||||
|                 setHost("localhost").setPort(9092)); | ||||
|  | @ -565,14 +587,15 @@ public class QuorumControllerTest { | |||
|             assertEquals(0, topicPartitionFuture.get().partitionId()); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() { | ||||
|         return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest()); | ||||
|     } | ||||
| 
 | ||||
|     private BrokerRegistrationRequestData.FeatureCollection brokerFeatures( | ||||
|             MetadataVersion minVersion, MetadataVersion maxVersion) { | ||||
|         MetadataVersion minVersion, | ||||
|         MetadataVersion maxVersion | ||||
|     ) { | ||||
|         BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection(); | ||||
|         features.add(new BrokerRegistrationRequestData.Feature() | ||||
|             .setName(MetadataVersion.FEATURE_NAME) | ||||
|  | @ -599,13 +622,15 @@ public class QuorumControllerTest { | |||
|         Map<Integer, Long> brokerEpochs = new HashMap<>(); | ||||
|         RawSnapshotReader reader = null; | ||||
|         Uuid fooId; | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv( | ||||
|                 logEnv, | ||||
|                 b -> b.setConfigSchema(SCHEMA), | ||||
|                 OptionalLong.empty(), | ||||
|                 OptionalLong.empty(), | ||||
|                 SIMPLE_BOOTSTRAP) | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 setBootstrapMetadata(SIMPLE_BOOTSTRAP). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             for (int i = 0; i < numBrokers; i++) { | ||||
|  | @ -650,12 +675,17 @@ public class QuorumControllerTest { | |||
|             assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset()); | ||||
|             checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot); | ||||
|         } | ||||
|         } | ||||
| 
 | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(reader))) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 setSnapshotReader(reader). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             long snapshotLogOffset = active.beginWritingSnapshot().get(); | ||||
|             SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader( | ||||
|  | @ -665,7 +695,6 @@ public class QuorumControllerTest { | |||
|             checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testSnapshotConfiguration() throws Throwable { | ||||
|  | @ -673,12 +702,17 @@ public class QuorumControllerTest { | |||
|         final int maxNewRecordBytes = 4; | ||||
|         Map<Integer, Long> brokerEpochs = new HashMap<>(); | ||||
|         Uuid fooId; | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|                 b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); | ||||
|                 b.setBootstrapMetadata(SIMPLE_BOOTSTRAP); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                     controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); | ||||
|                     controllerBuilder.setBootstrapMetadata(SIMPLE_BOOTSTRAP); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             for (int i = 0; i < numBrokers; i++) { | ||||
|                 BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, | ||||
|  | @ -724,18 +758,22 @@ public class QuorumControllerTest { | |||
|             ); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable { | ||||
|         final int numBrokers = 4; | ||||
|         final int maxNewRecordBytes = 1000; | ||||
|         Map<Integer, Long> brokerEpochs = new HashMap<>(); | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|                 b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                     controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             for (int i = 0; i < numBrokers; i++) { | ||||
|                 BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT, | ||||
|  | @ -782,7 +820,6 @@ public class QuorumControllerTest { | |||
|             logEnv.waitForLatestSnapshot(); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     private SnapshotReader<ApiMessageAndVersion> createSnapshotReader(RawSnapshotReader reader) { | ||||
|         return RecordsSnapshotReader.of( | ||||
|  | @ -912,10 +949,14 @@ public class QuorumControllerTest { | |||
|      */ | ||||
|     @Test | ||||
|     public void testTimeouts() throws Throwable { | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController controller = controlEnv.activeController(); | ||||
|             CountDownLatch countDownLatch = controller.pause(); | ||||
|             long now = controller.time().nanoseconds(); | ||||
|  | @ -959,7 +1000,6 @@ public class QuorumControllerTest { | |||
|             assertYieldsTimeout(listReassignmentsFuture); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     private static void assertYieldsTimeout(Future<?> future) { | ||||
|         assertEquals(TimeoutException.class, assertThrows(ExecutionException.class, | ||||
|  | @ -972,10 +1012,15 @@ public class QuorumControllerTest { | |||
|      */ | ||||
|     @Test | ||||
|     public void testEarlyControllerResults() throws Throwable { | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController controller = controlEnv.activeController(); | ||||
|             CountDownLatch countDownLatch = controller.pause(); | ||||
|             CompletableFuture<CreateTopicsResponseData> createFuture = | ||||
|  | @ -1004,7 +1049,6 @@ public class QuorumControllerTest { | |||
|             countDownLatch.countDown(); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     @Disabled // TODO: need to fix leader election in LocalLog. | ||||
|     @Test | ||||
|  | @ -1013,10 +1057,15 @@ public class QuorumControllerTest { | |||
|         int numPartitions = 3; | ||||
|         String topicName = "topic-name"; | ||||
| 
 | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController controller = controlEnv.activeController(); | ||||
| 
 | ||||
|             Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers); | ||||
|  | @ -1165,10 +1214,15 @@ public class QuorumControllerTest { | |||
| 
 | ||||
|     @Test | ||||
|     public void testConfigResourceExistenceChecker() throws Throwable { | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             registerBrokers(active, 5); | ||||
|             active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). | ||||
|  | @ -1198,7 +1252,6 @@ public class QuorumControllerTest { | |||
|                 () -> checker.accept(new ConfigResource(TOPIC, "bar"))); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     private static final Uuid FOO_ID = Uuid.fromString("igRktLOnR8ektWHr79F8mw"); | ||||
| 
 | ||||
|  | @ -1214,18 +1267,22 @@ public class QuorumControllerTest { | |||
|             authorizer.configure(Collections.emptyMap()); | ||||
|             authorizers.add(authorizer); | ||||
|         } | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv( | ||||
|             numControllers, | ||||
|             Optional.empty(), | ||||
|             shared -> { | ||||
|                 shared.setInitialMaxReadOffset(2); | ||||
|             } | ||||
|         )) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(numControllers). | ||||
|                 setSharedLogDataInitializer(sharedLogData -> { | ||||
|                     sharedLogData.setInitialMaxReadOffset(2); | ||||
|                 }). | ||||
|                 build() | ||||
|         ) { | ||||
|             logEnv.appendInitialRecords(expectedSnapshotContent(FOO_ID, ALL_ZERO_BROKER_EPOCHS)); | ||||
|             logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2)); | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setAuthorizer(authorizers.get(b.nodeId())); | ||||
|             })) { | ||||
|             try ( | ||||
|                 QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                     setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                         controllerBuilder.setAuthorizer(authorizers.get(controllerBuilder.nodeId())); | ||||
|                     }). | ||||
|                     build() | ||||
|             ) { | ||||
|                 assertInitialLoadFuturesNotComplete(authorizers); | ||||
|                 logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE); | ||||
|                 QuorumController active = controlEnv.activeController(); | ||||
|  | @ -1241,9 +1298,12 @@ public class QuorumControllerTest { | |||
| 
 | ||||
|     @Test | ||||
|     public void testFatalMetadataReplayErrorOnActive() throws Throwable { | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|             })) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             CompletableFuture<Void> future = active.appendWriteEvent("errorEvent", | ||||
|                     OptionalLong.empty(), () -> { | ||||
|  | @ -1261,7 +1321,6 @@ public class QuorumControllerTest { | |||
|             controlEnv.metadataFaultHandler().setIgnore(true); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) { | ||||
|         for (int i = 0; i < authorizers.size(); i++) { | ||||
|  | @ -1323,13 +1382,19 @@ public class QuorumControllerTest { | |||
| 
 | ||||
|     @Test | ||||
|     public void testUpgradeFromPreProductionVersion() throws Exception { | ||||
|         try (InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS)) { | ||||
|             try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of( | ||||
|                     FileRawSnapshotReader.open(initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0))) | ||||
|             )) { | ||||
|                 try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                     b.setConfigSchema(SCHEMA); | ||||
|                 }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) { | ||||
|         try ( | ||||
|             InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS); | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 setSnapshotReader(FileRawSnapshotReader.open( | ||||
|                     initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0))). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 setBootstrapMetadata(COMPLEX_BOOTSTRAP). | ||||
|                 build(); | ||||
|         ) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
|             TestUtils.waitForCondition(() -> | ||||
|                 active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1), | ||||
|  | @ -1340,16 +1405,19 @@ public class QuorumControllerTest { | |||
|                     getConfigs(new ConfigResource(BROKER, ""))); | ||||
|         } | ||||
|     } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testInsertBootstrapRecordsToEmptyLog() throws Exception { | ||||
|         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty()) | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). | ||||
|                 build(); | ||||
|             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). | ||||
|                 setControllerBuilderInitializer(controllerBuilder -> { | ||||
|                     controllerBuilder.setConfigSchema(SCHEMA); | ||||
|                 }). | ||||
|                 setBootstrapMetadata(COMPLEX_BOOTSTRAP). | ||||
|                 build(); | ||||
|         ) { | ||||
|             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { | ||||
|                 b.setConfigSchema(SCHEMA); | ||||
|             }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) { | ||||
|             QuorumController active = controlEnv.activeController(); | ||||
| 
 | ||||
|             ControllerRequestContext ctx = new ControllerRequestContext( | ||||
|  | @ -1378,7 +1446,6 @@ public class QuorumControllerTest { | |||
|             }, "Failed to see expected config change from bootstrap metadata"); | ||||
|         } | ||||
|     } | ||||
|     } | ||||
| 
 | ||||
|     static class TestAppender implements Function<List<ApiMessageAndVersion>, Long>  { | ||||
|         private long offset = 0; | ||||
|  |  | |||
|  | @ -18,7 +18,6 @@ | |||
| package org.apache.kafka.controller; | ||||
| 
 | ||||
| import org.apache.kafka.clients.ApiVersions; | ||||
| import org.apache.kafka.controller.QuorumController.Builder; | ||||
| import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; | ||||
| import org.apache.kafka.metalog.LocalLogManagerTestEnv; | ||||
| import org.apache.kafka.raft.LeaderAndEpoch; | ||||
|  | @ -44,28 +43,51 @@ public class QuorumControllerTestEnv implements AutoCloseable { | |||
|     private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler"); | ||||
|     private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler"); | ||||
| 
 | ||||
|     public QuorumControllerTestEnv( | ||||
|         LocalLogManagerTestEnv logEnv, | ||||
|         Consumer<QuorumController.Builder> builderConsumer | ||||
|     ) throws Exception { | ||||
|         this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), | ||||
|                 BootstrapMetadata.fromVersion(MetadataVersion.latest(), "test-provided version")); | ||||
|     public static class Builder { | ||||
|         private final LocalLogManagerTestEnv logEnv; | ||||
|         private Consumer<QuorumController.Builder> controllerBuilderInitializer = __ -> { }; | ||||
|         private OptionalLong sessionTimeoutMillis = OptionalLong.empty(); | ||||
|         private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty(); | ||||
|         private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. | ||||
|                 fromVersion(MetadataVersion.latest(), "test-provided version"); | ||||
| 
 | ||||
|         public Builder(LocalLogManagerTestEnv logEnv) { | ||||
|             this.logEnv = logEnv; | ||||
|         } | ||||
| 
 | ||||
|     public QuorumControllerTestEnv( | ||||
|             LocalLogManagerTestEnv logEnv, | ||||
|             Consumer<Builder> builderConsumer, | ||||
|             OptionalLong sessionTimeoutMillis, | ||||
|             OptionalLong leaderImbalanceCheckIntervalNs, | ||||
|             MetadataVersion metadataVersion | ||||
|     ) throws Exception { | ||||
|         this(logEnv, builderConsumer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs, | ||||
|                 BootstrapMetadata.fromVersion(metadataVersion, "test-provided version")); | ||||
|         public Builder setControllerBuilderInitializer(Consumer<QuorumController.Builder> controllerBuilderInitializer) { | ||||
|             this.controllerBuilderInitializer = controllerBuilderInitializer; | ||||
|             return this; | ||||
|         } | ||||
| 
 | ||||
|     public QuorumControllerTestEnv( | ||||
|         public Builder setSessionTimeoutMillis(OptionalLong sessionTimeoutMillis) { | ||||
|             this.sessionTimeoutMillis = sessionTimeoutMillis; | ||||
|             return this; | ||||
|         } | ||||
| 
 | ||||
|         public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong leaderImbalanceCheckIntervalNs) { | ||||
|             this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs; | ||||
|             return this; | ||||
|         } | ||||
| 
 | ||||
|         public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) { | ||||
|             this.bootstrapMetadata = bootstrapMetadata; | ||||
|             return this; | ||||
|         } | ||||
| 
 | ||||
|         public QuorumControllerTestEnv build() throws Exception { | ||||
|             return new QuorumControllerTestEnv( | ||||
|                 logEnv, | ||||
|                 controllerBuilderInitializer, | ||||
|                 sessionTimeoutMillis, | ||||
|                 leaderImbalanceCheckIntervalNs, | ||||
|                 bootstrapMetadata); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private QuorumControllerTestEnv( | ||||
|         LocalLogManagerTestEnv logEnv, | ||||
|         Consumer<Builder> builderConsumer, | ||||
|         Consumer<QuorumController.Builder> controllerBuilderInitializer, | ||||
|         OptionalLong sessionTimeoutMillis, | ||||
|         OptionalLong leaderImbalanceCheckIntervalNs, | ||||
|         BootstrapMetadata bootstrapMetadata | ||||
|  | @ -87,7 +109,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { | |||
|                 }); | ||||
|                 builder.setFatalFaultHandler(fatalFaultHandler); | ||||
|                 builder.setMetadataFaultHandler(metadataFaultHandler); | ||||
|                 builderConsumer.accept(builder); | ||||
|                 controllerBuilderInitializer.accept(builder); | ||||
|                 this.controllers.add(builder.build()); | ||||
|             } | ||||
|         } catch (Exception e) { | ||||
|  |  | |||
|  | @ -26,7 +26,6 @@ import org.junit.jupiter.api.Timeout; | |||
| 
 | ||||
| import java.util.Arrays; | ||||
| import java.util.List; | ||||
| import java.util.Optional; | ||||
| import java.util.OptionalInt; | ||||
| import java.util.stream.Collectors; | ||||
| 
 | ||||
|  | @ -44,8 +43,10 @@ public class LocalLogManagerTest { | |||
|      */ | ||||
|     @Test | ||||
|     public void testCreateAndClose() throws Exception { | ||||
|         try (LocalLogManagerTestEnv env = | ||||
|                  LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1). | ||||
|                 buildWithMockListeners(); | ||||
|         ) { | ||||
|             env.close(); | ||||
|             assertEquals(null, env.firstError.get()); | ||||
|         } | ||||
|  | @ -56,8 +57,10 @@ public class LocalLogManagerTest { | |||
|      */ | ||||
|     @Test | ||||
|     public void testClaimsLeadership() throws Exception { | ||||
|         try (LocalLogManagerTestEnv env = | ||||
|                  LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1). | ||||
|                     buildWithMockListeners(); | ||||
|         ) { | ||||
|             assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader()); | ||||
|             env.close(); | ||||
|             assertEquals(null, env.firstError.get()); | ||||
|  | @ -69,8 +72,10 @@ public class LocalLogManagerTest { | |||
|      */ | ||||
|     @Test | ||||
|     public void testPassLeadership() throws Exception { | ||||
|         try (LocalLogManagerTestEnv env = | ||||
|                  LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3). | ||||
|                     buildWithMockListeners(); | ||||
|         ) { | ||||
|             LeaderAndEpoch first = env.waitForLeader(); | ||||
|             LeaderAndEpoch cur = first; | ||||
|             do { | ||||
|  | @ -123,8 +128,10 @@ public class LocalLogManagerTest { | |||
|      */ | ||||
|     @Test | ||||
|     public void testCommits() throws Exception { | ||||
|         try (LocalLogManagerTestEnv env = | ||||
|                  LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) { | ||||
|         try ( | ||||
|             LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3). | ||||
|                     buildWithMockListeners(); | ||||
|         ) { | ||||
|             LeaderAndEpoch leaderInfo = env.waitForLeader(); | ||||
|             int leaderId = leaderInfo.leaderId().orElseThrow(() -> | ||||
|                 new AssertionError("Current leader is undefined") | ||||
|  |  | |||
|  | @ -66,31 +66,59 @@ public class LocalLogManagerTestEnv implements AutoCloseable { | |||
|      */ | ||||
|     private final List<LocalLogManager> logManagers; | ||||
| 
 | ||||
|     public static LocalLogManagerTestEnv createWithMockListeners( | ||||
|         int numManagers, | ||||
|         Optional<RawSnapshotReader> snapshot | ||||
|     ) throws Exception { | ||||
|         LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers, snapshot); | ||||
|     public static class Builder { | ||||
|         private final int numManagers; | ||||
|         private Optional<RawSnapshotReader> snapshotReader = Optional.empty(); | ||||
|         private Consumer<SharedLogData> sharedLogDataInitializer = __ -> { }; | ||||
| 
 | ||||
|         public Builder(int numManagers) { | ||||
|             this.numManagers = numManagers; | ||||
|         } | ||||
| 
 | ||||
|         public Builder setSnapshotReader(RawSnapshotReader snapshotReader) { | ||||
|             this.snapshotReader = Optional.of(snapshotReader); | ||||
|             return this; | ||||
|         } | ||||
| 
 | ||||
|         public Builder setSharedLogDataInitializer(Consumer<SharedLogData> sharedLogDataInitializer) { | ||||
|             this.sharedLogDataInitializer = sharedLogDataInitializer; | ||||
|             return this; | ||||
|         } | ||||
| 
 | ||||
|         public LocalLogManagerTestEnv build() { | ||||
|             return new LocalLogManagerTestEnv( | ||||
|                 numManagers, | ||||
|                 snapshotReader, | ||||
|                 sharedLogDataInitializer); | ||||
|         } | ||||
| 
 | ||||
|         public LocalLogManagerTestEnv buildWithMockListeners() { | ||||
|             LocalLogManagerTestEnv env = build(); | ||||
|             try { | ||||
|             for (LocalLogManager logManager : testEnv.logManagers) { | ||||
|                 for (LocalLogManager logManager : env.logManagers) { | ||||
|                     logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt())); | ||||
|                 } | ||||
|             } catch (Exception e) { | ||||
|             testEnv.close(); | ||||
|                 try { | ||||
|                     env.close(); | ||||
|                 } catch (Exception t) { | ||||
|                     log.error("Error while closing new log environment", t); | ||||
|                 } | ||||
|                 throw e; | ||||
|             } | ||||
|         return testEnv; | ||||
|             return env; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public LocalLogManagerTestEnv( | ||||
|     private LocalLogManagerTestEnv( | ||||
|         int numManagers, | ||||
|         Optional<RawSnapshotReader> snapshot, | ||||
|         Consumer<SharedLogData> dataSetup | ||||
|     ) throws Exception { | ||||
|         Optional<RawSnapshotReader> snapshotReader, | ||||
|         Consumer<SharedLogData> sharedLogDataInitializer | ||||
|     ) { | ||||
|         clusterId = Uuid.randomUuid().toString(); | ||||
|         dir = TestUtils.tempDirectory(); | ||||
|         shared = new SharedLogData(snapshot); | ||||
|         dataSetup.accept(shared); | ||||
|         shared = new SharedLogData(snapshotReader); | ||||
|         sharedLogDataInitializer.accept(shared); | ||||
|         List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers); | ||||
|         try { | ||||
|             for (int nodeId = 0; nodeId < numManagers; nodeId++) { | ||||
|  | @ -112,13 +140,6 @@ public class LocalLogManagerTestEnv implements AutoCloseable { | |||
|         this.logManagers = newLogManagers; | ||||
|     } | ||||
| 
 | ||||
|     public LocalLogManagerTestEnv( | ||||
|         int numManagers, | ||||
|         Optional<RawSnapshotReader> snapshot | ||||
|     ) throws Exception { | ||||
|         this(numManagers, snapshot, __ -> { }); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * Append some records to the log. This method is meant to be called before the | ||||
|      * controllers are started, to simulate a pre-existing metadata log. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue