From 486f65e8c60d1126b1b62180b6b66352e2783907 Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Thu, 28 Nov 2024 03:25:19 +0800 Subject: [PATCH] KAFKA-18100 `Using` block suppresses all errors (#17954) https://github.com/apache/kafka/pull/15881 changed our tests to utilize `using` blocks. But these blocks don't throw any errors, so if there is a failed assertion within the block, the test will still pass. We should either check the failure using a corresponding `match` block with Success(_) and Failure(e), use `using.resource`, or use try/finally blocks to clean up resources. See https://www.scala-lang.org/api/3.0.2/scala/util/Using$.html Co-authored-by: frankvicky Reviewers: Chia-Ping Tsai --- ...enEndToEndAuthorizationWithOwnerTest.scala | 6 +- .../api/PlaintextAdminIntegrationTest.scala | 2 +- ...aslClientsWithInvalidCredentialsTest.scala | 5 +- .../integration/kafka/api/SaslSetup.scala | 2 +- .../server/RaftClusterSnapshotTest.scala | 4 +- .../kafka/raft/KafkaMetadataLogTest.scala | 146 ++++++------------ .../group/CoordinatorLoaderImplTest.scala | 27 ++-- .../integration/KafkaServerTestHarness.scala | 6 +- .../kafka/server/ReplicationQuotasTest.scala | 4 +- .../epoch/LeaderEpochIntegrationTest.scala | 2 +- .../kafka/tools/DumpLogSegmentsTest.scala | 2 +- 11 files changed, 79 insertions(+), 127 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala index d886d52ee9a..833b06654d3 100644 --- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala +++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala @@ -66,7 +66,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE override def configureSecurityAfterServersStart(): Unit = { // Create the Acls before calling super which will create the additiona tokens - Using(createPrivilegedAdminClient()) { superuserAdminClient => + Using.resource(createPrivilegedAdminClient()) { superuserAdminClient => superuserAdminClient.createAcls(List(AclTokenOtherDescribe, AclTokenCreate, AclTokenDescribe).asJava).values brokers.foreach { s => @@ -106,8 +106,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE @ParameterizedTest @ValueSource(strings = Array("kraft")) def testDescribeTokenForOtherUserFails(quorum: String): Unit = { - Using(createScramAdminClient(kafkaClientSaslMechanism, describeTokenFailPrincipal.getName, describeTokenFailPassword)) { describeTokenFailAdminClient => - Using(createScramAdminClient(kafkaClientSaslMechanism, otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient => + Using.resource(createScramAdminClient(kafkaClientSaslMechanism, describeTokenFailPrincipal.getName, describeTokenFailPassword)) { describeTokenFailAdminClient => + Using.resource(createScramAdminClient(kafkaClientSaslMechanism, otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient => otherClientAdminClient.createDelegationToken().delegationToken().get() val tokens = describeTokenFailAdminClient.describeDelegationToken( new DescribeDelegationTokenOptions().owners(Collections.singletonList(otherClientPrincipal)) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 0d0173b6121..df3352e0076 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2388,7 +2388,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE.toString) newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString) - Using(createConsumer(configOverrides = newConsumerConfig)) { consumer => + Using.resource(createConsumer(configOverrides = newConsumerConfig)) { consumer => consumer.subscribe(Collections.singletonList(testTopicName)) val records = consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS)) assertNotEquals(0, records.count) diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index af3f030648f..20435a130e4 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig @@ -74,8 +75,10 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { override def setUp(testInfo: TestInfo): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) + val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism) + superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext) super.setUp(testInfo) - Using(createPrivilegedAdminClient()) { superuserAdminClient => + Using.resource(createPrivilegedAdminClient()) { superuserAdminClient => TestUtils.createTopicWithAdmin( superuserAdminClient, topic, brokers, controllerServers, numPartitions ) diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index a9eb15a24b2..a38a9189ce6 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -204,7 +204,7 @@ trait SaslSetup { def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = { val zkClientConfig = new ZKClientConfig() - Using(KafkaZkClient( + Using.resource(KafkaZkClient( zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000, Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig, enableEntityConfigControllerCheck = false)) { zkClient => val adminZkClient = new AdminZkClient(zkClient) diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala index a8919605f2b..ad47da549ff 100644 --- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala +++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala @@ -41,7 +41,7 @@ class RaftClusterSnapshotTest { val numberOfBrokers = 3 val numberOfControllers = 3 - Using( + Using.resource( new KafkaClusterTestKit .Builder( new TestKitNodes.Builder() @@ -74,7 +74,7 @@ class RaftClusterSnapshotTest { // For every controller and broker perform some sanity checks against the latest snapshot for ((_, raftManager) <- cluster.raftManagers().asScala) { - Using( + Using.resource( RecordsSnapshotReader.of( raftManager.replicatedLog.latestSnapshot.get(), new MetadataRecordSerde(), diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index cca56f7aa96..1c65fd5073c 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -124,10 +124,7 @@ final class KafkaMetadataLogTest { append(log, numberOfRecords, epoch) log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) - - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()) } @@ -211,9 +208,7 @@ final class KafkaMetadataLogTest { append(log, numberOfRecords, epoch) log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) // Simulate log cleanup that advances the LSO log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, LogStartOffsetIncrementReason.SegmentDeletion) @@ -246,10 +241,7 @@ final class KafkaMetadataLogTest { append(log, numberOfRecords, epoch) log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) - - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) assertThrows( classOf[IllegalArgumentException], @@ -295,10 +287,7 @@ final class KafkaMetadataLogTest { append(log, numberOfRecords, epoch) log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) - - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId), "Creating an existing snapshot should not do anything") @@ -342,10 +331,7 @@ final class KafkaMetadataLogTest { val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch) append(log, numberOfRecords, epoch) - - Using(log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, sameEpochSnapshotId) assertTrue(log.truncateToLatestSnapshot()) assertEquals(sameEpochSnapshotId.offset, log.startOffset) @@ -356,10 +342,7 @@ final class KafkaMetadataLogTest { val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1) append(log, numberOfRecords, epoch) - - Using(log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, greaterEpochSnapshotId) assertTrue(log.truncateToLatestSnapshot()) assertEquals(greaterEpochSnapshotId.offset, log.startOffset) @@ -376,27 +359,18 @@ final class KafkaMetadataLogTest { append(log, 1, epoch - 1) val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1) - Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, oldSnapshotId1) append(log, 1, epoch) val oldSnapshotId2 = new OffsetAndEpoch(2, epoch) - Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, oldSnapshotId2) append(log, numberOfRecords - 2, epoch) val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, oldSnapshotId3) val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch) - append(log, numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, greaterSnapshotId) assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId()) assertTrue(log.truncateToLatestSnapshot()) @@ -487,7 +461,7 @@ final class KafkaMetadataLogTest { metadataDir: File, snapshotId: OffsetAndEpoch ): Unit = { - Using(FileRawSnapshotWriter.create(metadataDir.toPath, snapshotId))(_.freeze()) + Using.resource(FileRawSnapshotWriter.create(metadataDir.toPath, snapshotId))(_.freeze()) } @Test @@ -499,18 +473,14 @@ final class KafkaMetadataLogTest { append(log, numberOfRecords, epoch) val olderEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch - 1) - Using(log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) { snapshot => - snapshot.freeze() - } - + createNewSnapshotUnckecked(log, olderEpochSnapshotId) assertFalse(log.truncateToLatestSnapshot()) append(log, numberOfRecords, epoch) + val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, olderOffsetSnapshotId) assertFalse(log.truncateToLatestSnapshot()) } @@ -523,10 +493,7 @@ final class KafkaMetadataLogTest { val snapshotId = new OffsetAndEpoch(1, epoch) append(log, numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot => - snapshot.freeze() - } - + createNewSnapshotUnckecked(log, snapshotId) log.close() // Create a few partial snapshots @@ -560,27 +527,19 @@ final class KafkaMetadataLogTest { append(log, 1, epoch - 1) val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1) - Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, oldSnapshotId1) append(log, 1, epoch) val oldSnapshotId2 = new OffsetAndEpoch(2, epoch) - Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, oldSnapshotId2) append(log, numberOfRecords - 2, epoch) val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, oldSnapshotId3) val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch) append(log, numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, greaterSnapshotId) log.close() @@ -609,9 +568,7 @@ final class KafkaMetadataLogTest { val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1) append(log, numberOfRecords, epoch) - Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId) log.close() @@ -707,9 +664,7 @@ final class KafkaMetadataLogTest { log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) @@ -727,9 +682,8 @@ final class KafkaMetadataLogTest { log.updateHighWatermark(new LogOffsetMetadata(offset)) val snapshotId = new OffsetAndEpoch(offset, epoch) - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) + // Simulate log cleaning advancing the LSO log.log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion) @@ -749,9 +703,7 @@ final class KafkaMetadataLogTest { log.updateHighWatermark(new LogOffsetMetadata(offset)) val snapshotId = new OffsetAndEpoch(offset, epoch) - Using(log.createNewSnapshot(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshot(log, snapshotId) val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch) assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind) @@ -766,9 +718,7 @@ final class KafkaMetadataLogTest { val log = buildMetadataLog(tempDir, mockTime) log.updateHighWatermark(new LogOffsetMetadata(offset)) val snapshotId = new OffsetAndEpoch(offset, 1) - Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId) log.truncateToLatestSnapshot() @@ -790,9 +740,7 @@ final class KafkaMetadataLogTest { val log = buildMetadataLog(tempDir, mockTime) log.updateHighWatermark(new LogOffsetMetadata(offset)) val snapshotId = new OffsetAndEpoch(offset, 1) - Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId) log.truncateToLatestSnapshot() append(log, numOfRecords, epoch = 3) @@ -872,16 +820,10 @@ final class KafkaMetadataLogTest { assertFalse(log.maybeClean(), "Should not clean since no snapshots exist") val snapshotId1 = new OffsetAndEpoch(1000, 1) - Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot => - append(snapshot, 100) - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId1) val snapshotId2 = new OffsetAndEpoch(2000, 1) - Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot => - append(snapshot, 100) - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId2) val lsoBefore = log.startOffset() assertTrue(log.maybeClean(), "Expected to clean since there was at least one snapshot") @@ -910,10 +852,7 @@ final class KafkaMetadataLogTest { for (offset <- Seq(100, 200, 300, 400, 500, 600)) { val snapshotId = new OffsetAndEpoch(offset, 1) - Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot => - append(snapshot, 10) - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId) } assertEquals(6, log.snapshotCount()) @@ -945,14 +884,14 @@ final class KafkaMetadataLogTest { // Then generate two snapshots val snapshotId1 = new OffsetAndEpoch(1000, 1) - Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot => + Using.resource(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot => append(snapshot, 500) snapshot.freeze() } // Then generate a snapshot val snapshotId2 = new OffsetAndEpoch(2000, 1) - Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot => + Using.resource(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot => append(snapshot, 500) snapshot.freeze() } @@ -992,17 +931,14 @@ final class KafkaMetadataLogTest { log.log.logSegments.asScala.drop(1).head.baseOffset, 1 ) - Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId1) + // Generate second snapshots that includes the second segment by using the base offset of the third segment val snapshotId2 = new OffsetAndEpoch( log.log.logSegments.asScala.drop(2).head.baseOffset, 1 ) - Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot => - snapshot.freeze() - } + createNewSnapshotUnckecked(log, snapshotId2) // Sleep long enough to trigger a possible segment delete because of the default retention val defaultLogRetentionMs = LogConfig.DEFAULT_RETENTION_MS * 2 @@ -1074,6 +1010,18 @@ object KafkaMetadataLogTest { log } + def createNewSnapshot(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch): Unit = { + Using.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => + snapshot.freeze() + } + } + + def createNewSnapshotUnckecked(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch): Unit = { + Using.resource(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot => + snapshot.freeze() + } + } + def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo = { log.appendAsLeader( MemoryRecords.withRecords( @@ -1103,4 +1051,4 @@ object KafkaMetadataLogTest { } dir } -} +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index 70397447f5c..68a6ba5da1d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -59,7 +59,7 @@ class CoordinatorLoaderImplTest { val serde = mock(classOf[Deserializer[(String, String)]]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -79,7 +79,7 @@ class CoordinatorLoaderImplTest { val serde = mock(classOf[Deserializer[(String, String)]]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -100,7 +100,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -203,7 +203,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -246,7 +246,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -286,7 +286,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -312,7 +312,8 @@ class CoordinatorLoaderImplTest { .thenThrow(new RuntimeException("Error!")) val ex = assertFutureThrows(loader.load(tp, coordinator), classOf[RuntimeException]) - assertEquals("Error!", ex.getMessage) + + assertEquals(s"Deserializing record DefaultRecord(offset=0, timestamp=-1, key=2 bytes, value=2 bytes) from $tp failed due to: Error!", ex.getMessage) } } @@ -327,7 +328,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -359,7 +360,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) val time = new MockTime() - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time, replicaManager = replicaManager, deserializer = serde, @@ -414,7 +415,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -489,7 +490,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -515,7 +516,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, @@ -591,7 +592,7 @@ class CoordinatorLoaderImplTest { val log = mock(classOf[UnifiedLog]) val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) - Using(new CoordinatorLoaderImpl[(String, String)]( + Using.resource(new CoordinatorLoaderImpl[(String, String)]( time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 0e004b44271..8a8772ea08d 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -160,7 +160,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { adminClientConfig: Properties = new Properties ): Unit = { if (isKRaftTest()) { - Using(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => + Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers) } } else { @@ -239,7 +239,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { listenerName: ListenerName = listenerName ): Unit = { if (isKRaftTest()) { - Using(createAdminClient(brokers, listenerName)) { admin => + Using.resource(createAdminClient(brokers, listenerName)) { admin => TestUtils.deleteTopicWithAdmin( admin = admin, topic = topic, @@ -433,7 +433,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = { if (isKRaftTest()) { - Using(createAdminClient(brokers, listenerName)) { + Using.resource(createAdminClient(brokers, listenerName)) { admin => { admin.alterClientQuotas(Collections.singleton( new ClientQuotaAlteration( diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 64ef5641f23..7ac8966d363 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -111,7 +111,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { //replicate for each of the two follower brokers. if (!leaderThrottle) throttle = throttle * 3 - Using(createAdminClient(brokers, listenerName)) { admin => + Using.resource(createAdminClient(brokers, listenerName)) { admin => (106 to 107).foreach(registerBroker) admin.createTopics(List(new NewTopic(topic, assignment.map(a => a._1.asInstanceOf[Integer] -> a._2.map(_.asInstanceOf[Integer]).toList.asJava).asJava)).asJava).all().get() @@ -212,7 +212,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { val expectedDuration = 4 val throttle: Long = msg.length * msgCount / expectedDuration - Using(createAdminClient(brokers, listenerName)) { admin => + Using.resource(createAdminClient(brokers, listenerName)) { admin => registerBroker(101) admin.createTopics( List(new NewTopic(topic, Collections.singletonMap(0, List(100, 101).map(_.asInstanceOf[Integer]).asJava))).asJava diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index b943dd5bff6..4ac571f452a 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -283,7 +283,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { } private def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = { - Using(createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin => + Using.resource(createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin => try { TestUtils.createTopicWithAdmin( admin = admin, diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 984b3a8eb8c..9bbfa7242c3 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -560,7 +560,7 @@ class DumpLogSegmentsTest { val lastContainedLogTimestamp = 10000 - Using( + Using.resource( new RecordsSnapshotWriter.Builder() .setTime(new MockTime) .setLastContainedLogTimestamp(lastContainedLogTimestamp)