mirror of https://github.com/apache/kafka.git
MINOR: ensure LocalLogManagerTestEnv is closed in QuorumControllerTest
This commit is contained in:
parent
2e8d69b78c
commit
ad3876dfdf
|
@ -1540,15 +1540,14 @@ public class QuorumControllerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
|
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
|
||||||
try (
|
try (
|
||||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build()
|
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
|
||||||
) {
|
) {
|
||||||
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
|
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
|
||||||
setControllerBuilderInitializer(controllerBuilder -> {
|
setControllerBuilderInitializer(controllerBuilder -> {
|
||||||
controllerBuilder.setZkMigrationEnabled(true);
|
controllerBuilder.setZkMigrationEnabled(true);
|
||||||
}).
|
}).
|
||||||
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV0, "test"));
|
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV0, "test")).
|
||||||
|
build();
|
||||||
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
|
|
||||||
QuorumController active = controlEnv.activeController();
|
QuorumController active = controlEnv.activeController();
|
||||||
assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(),
|
assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(),
|
||||||
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
|
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
|
||||||
|
@ -1658,12 +1657,12 @@ public class QuorumControllerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverDuringMigrationTransaction() throws Exception {
|
public void testFailoverDuringMigrationTransaction() throws Exception {
|
||||||
try (
|
try (
|
||||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()
|
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
|
||||||
) {
|
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
|
||||||
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
|
|
||||||
setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setZkMigrationEnabled(true)).
|
setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setZkMigrationEnabled(true)).
|
||||||
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"));
|
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test")).
|
||||||
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
|
build();
|
||||||
|
) {
|
||||||
QuorumController active = controlEnv.activeController(true);
|
QuorumController active = controlEnv.activeController(true);
|
||||||
ZkRecordConsumer migrationConsumer = active.zkRecordConsumer();
|
ZkRecordConsumer migrationConsumer = active.zkRecordConsumer();
|
||||||
migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);
|
migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);
|
||||||
|
@ -1703,18 +1702,17 @@ public class QuorumControllerTest {
|
||||||
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", "IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
|
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", "IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
|
||||||
public void testBrokerHeartbeatDuringMigration(MetadataVersion metadataVersion) throws Exception {
|
public void testBrokerHeartbeatDuringMigration(MetadataVersion metadataVersion) throws Exception {
|
||||||
try (
|
try (
|
||||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build()
|
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
|
||||||
) {
|
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
|
||||||
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
|
|
||||||
setControllerBuilderInitializer(controllerBuilder ->
|
setControllerBuilderInitializer(controllerBuilder ->
|
||||||
controllerBuilder
|
controllerBuilder
|
||||||
.setZkMigrationEnabled(true)
|
.setZkMigrationEnabled(true)
|
||||||
.setMaxIdleIntervalNs(OptionalLong.of(TimeUnit.MILLISECONDS.toNanos(100)))
|
.setMaxIdleIntervalNs(OptionalLong.of(TimeUnit.MILLISECONDS.toNanos(100)))
|
||||||
).
|
).
|
||||||
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test"));
|
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test")).
|
||||||
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
|
build();
|
||||||
|
) {
|
||||||
QuorumController active = controlEnv.activeController(true);
|
QuorumController active = controlEnv.activeController(true);
|
||||||
|
|
||||||
// Register a ZK broker
|
// Register a ZK broker
|
||||||
BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
|
BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
|
||||||
new BrokerRegistrationRequestData().
|
new BrokerRegistrationRequestData().
|
||||||
|
@ -1753,7 +1751,6 @@ public class QuorumControllerTest {
|
||||||
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
|
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
|
||||||
setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
|
setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
|
||||||
setCurrentMetadataOffset(100100L)).get());
|
setCurrentMetadataOffset(100100L)).get());
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue