KAFKA-18100 `Using` block suppresses all errors (#17954)

https://github.com/apache/kafka/pull/15881 changed our tests to utilize `using` blocks. But these blocks don't throw any errors, so if there is a failed assertion within the block, the test will still pass. 

We should either check the failure using a corresponding `match` block with Success(_) and Failure(e), use `using.resource`, or use try/finally blocks to clean up resources.

See https://www.scala-lang.org/api/3.0.2/scala/util/Using$.html

Co-authored-by: frankvicky <kitingiao@gmail.com>

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TaiJuWu 2024-11-28 03:25:19 +08:00 committed by GitHub
parent aae42ef656
commit 486f65e8c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 79 additions and 127 deletions

View File

@ -66,7 +66,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
override def configureSecurityAfterServersStart(): Unit = {
// Create the Acls before calling super which will create the additiona tokens
Using(createPrivilegedAdminClient()) { superuserAdminClient =>
Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
superuserAdminClient.createAcls(List(AclTokenOtherDescribe, AclTokenCreate, AclTokenDescribe).asJava).values
brokers.foreach { s =>
@ -106,8 +106,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeTokenForOtherUserFails(quorum: String): Unit = {
Using(createScramAdminClient(kafkaClientSaslMechanism, describeTokenFailPrincipal.getName, describeTokenFailPassword)) { describeTokenFailAdminClient =>
Using(createScramAdminClient(kafkaClientSaslMechanism, otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
Using.resource(createScramAdminClient(kafkaClientSaslMechanism, describeTokenFailPrincipal.getName, describeTokenFailPassword)) { describeTokenFailAdminClient =>
Using.resource(createScramAdminClient(kafkaClientSaslMechanism, otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
otherClientAdminClient.createDelegationToken().delegationToken().get()
val tokens = describeTokenFailAdminClient.describeDelegationToken(
new DescribeDelegationTokenOptions().owners(Collections.singletonList(otherClientPrincipal))

View File

@ -2388,7 +2388,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE.toString)
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
Using(createConsumer(configOverrides = newConsumerConfig)) { consumer =>
Using.resource(createConsumer(configOverrides = newConsumerConfig)) { consumer =>
consumer.subscribe(Collections.singletonList(testTopicName))
val records = consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
assertNotEquals(0, records.count)

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@ -74,8 +75,10 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext)
super.setUp(testInfo)
Using(createPrivilegedAdminClient()) { superuserAdminClient =>
Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
TestUtils.createTopicWithAdmin(
superuserAdminClient, topic, brokers, controllerServers, numPartitions
)

View File

@ -204,7 +204,7 @@ trait SaslSetup {
def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = {
val zkClientConfig = new ZKClientConfig()
Using(KafkaZkClient(
Using.resource(KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig, enableEntityConfigControllerCheck = false)) { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)

View File

@ -41,7 +41,7 @@ class RaftClusterSnapshotTest {
val numberOfBrokers = 3
val numberOfControllers = 3
Using(
Using.resource(
new KafkaClusterTestKit
.Builder(
new TestKitNodes.Builder()
@ -74,7 +74,7 @@ class RaftClusterSnapshotTest {
// For every controller and broker perform some sanity checks against the latest snapshot
for ((_, raftManager) <- cluster.raftManagers().asScala) {
Using(
Using.resource(
RecordsSnapshotReader.of(
raftManager.replicatedLog.latestSnapshot.get(),
new MetadataRecordSerde(),

View File

@ -124,10 +124,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes())
}
@ -211,9 +208,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, LogStartOffsetIncrementReason.SegmentDeletion)
@ -246,10 +241,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
assertThrows(
classOf[IllegalArgumentException],
@ -295,10 +287,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
"Creating an existing snapshot should not do anything")
@ -342,10 +331,7 @@ final class KafkaMetadataLogTest {
val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, sameEpochSnapshotId)
assertTrue(log.truncateToLatestSnapshot())
assertEquals(sameEpochSnapshotId.offset, log.startOffset)
@ -356,10 +342,7 @@ final class KafkaMetadataLogTest {
val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1)
append(log, numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, greaterEpochSnapshotId)
assertTrue(log.truncateToLatestSnapshot())
assertEquals(greaterEpochSnapshotId.offset, log.startOffset)
@ -376,27 +359,18 @@ final class KafkaMetadataLogTest {
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, oldSnapshotId1)
append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, oldSnapshotId2)
append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, oldSnapshotId3)
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, greaterSnapshotId)
assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId())
assertTrue(log.truncateToLatestSnapshot())
@ -487,7 +461,7 @@ final class KafkaMetadataLogTest {
metadataDir: File,
snapshotId: OffsetAndEpoch
): Unit = {
Using(FileRawSnapshotWriter.create(metadataDir.toPath, snapshotId))(_.freeze())
Using.resource(FileRawSnapshotWriter.create(metadataDir.toPath, snapshotId))(_.freeze())
}
@Test
@ -499,18 +473,14 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
val olderEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch - 1)
Using(log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, olderEpochSnapshotId)
assertFalse(log.truncateToLatestSnapshot())
append(log, numberOfRecords, epoch)
val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, olderOffsetSnapshotId)
assertFalse(log.truncateToLatestSnapshot())
}
@ -523,10 +493,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(1, epoch)
append(log, numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId)
log.close()
// Create a few partial snapshots
@ -560,27 +527,19 @@ final class KafkaMetadataLogTest {
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, oldSnapshotId1)
append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, oldSnapshotId2)
append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, oldSnapshotId3)
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, greaterSnapshotId)
log.close()
@ -609,9 +568,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1)
append(log, numberOfRecords, epoch)
Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId)
log.close()
@ -707,9 +664,7 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@ -727,9 +682,8 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
// Simulate log cleaning advancing the LSO
log.log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion)
@ -749,9 +703,7 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshot(log, snapshotId)
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
@ -766,9 +718,7 @@ final class KafkaMetadataLogTest {
val log = buildMetadataLog(tempDir, mockTime)
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, 1)
Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId)
log.truncateToLatestSnapshot()
@ -790,9 +740,7 @@ final class KafkaMetadataLogTest {
val log = buildMetadataLog(tempDir, mockTime)
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, 1)
Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId)
log.truncateToLatestSnapshot()
append(log, numOfRecords, epoch = 3)
@ -872,16 +820,10 @@ final class KafkaMetadataLogTest {
assertFalse(log.maybeClean(), "Should not clean since no snapshots exist")
val snapshotId1 = new OffsetAndEpoch(1000, 1)
Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
append(snapshot, 100)
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId1)
val snapshotId2 = new OffsetAndEpoch(2000, 1)
Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
append(snapshot, 100)
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId2)
val lsoBefore = log.startOffset()
assertTrue(log.maybeClean(), "Expected to clean since there was at least one snapshot")
@ -910,10 +852,7 @@ final class KafkaMetadataLogTest {
for (offset <- Seq(100, 200, 300, 400, 500, 600)) {
val snapshotId = new OffsetAndEpoch(offset, 1)
Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
append(snapshot, 10)
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId)
}
assertEquals(6, log.snapshotCount())
@ -945,14 +884,14 @@ final class KafkaMetadataLogTest {
// Then generate two snapshots
val snapshotId1 = new OffsetAndEpoch(1000, 1)
Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
Using.resource(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
append(snapshot, 500)
snapshot.freeze()
}
// Then generate a snapshot
val snapshotId2 = new OffsetAndEpoch(2000, 1)
Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
Using.resource(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
append(snapshot, 500)
snapshot.freeze()
}
@ -992,17 +931,14 @@ final class KafkaMetadataLogTest {
log.log.logSegments.asScala.drop(1).head.baseOffset,
1
)
Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId1)
// Generate second snapshots that includes the second segment by using the base offset of the third segment
val snapshotId2 = new OffsetAndEpoch(
log.log.logSegments.asScala.drop(2).head.baseOffset,
1
)
Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
snapshot.freeze()
}
createNewSnapshotUnckecked(log, snapshotId2)
// Sleep long enough to trigger a possible segment delete because of the default retention
val defaultLogRetentionMs = LogConfig.DEFAULT_RETENTION_MS * 2
@ -1074,6 +1010,18 @@ object KafkaMetadataLogTest {
log
}
def createNewSnapshot(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch): Unit = {
Using.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
}
def createNewSnapshotUnckecked(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch): Unit = {
Using.resource(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
}
def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo = {
log.appendAsLeader(
MemoryRecords.withRecords(
@ -1103,4 +1051,4 @@ object KafkaMetadataLogTest {
}
dir
}
}
}

View File

@ -59,7 +59,7 @@ class CoordinatorLoaderImplTest {
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -79,7 +79,7 @@ class CoordinatorLoaderImplTest {
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -100,7 +100,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -203,7 +203,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -246,7 +246,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -286,7 +286,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -312,7 +312,8 @@ class CoordinatorLoaderImplTest {
.thenThrow(new RuntimeException("Error!"))
val ex = assertFutureThrows(loader.load(tp, coordinator), classOf[RuntimeException])
assertEquals("Error!", ex.getMessage)
assertEquals(s"Deserializing record DefaultRecord(offset=0, timestamp=-1, key=2 bytes, value=2 bytes) from $tp failed due to: Error!", ex.getMessage)
}
}
@ -327,7 +328,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -359,7 +360,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
val time = new MockTime()
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time,
replicaManager = replicaManager,
deserializer = serde,
@ -414,7 +415,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -489,7 +490,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -515,7 +516,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@ -591,7 +592,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using(new CoordinatorLoaderImpl[(String, String)](
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,

View File

@ -160,7 +160,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
adminClientConfig: Properties = new Properties
): Unit = {
if (isKRaftTest()) {
Using(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
Using.resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers)
}
} else {
@ -239,7 +239,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
listenerName: ListenerName = listenerName
): Unit = {
if (isKRaftTest()) {
Using(createAdminClient(brokers, listenerName)) { admin =>
Using.resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
@ -433,7 +433,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
if (isKRaftTest()) {
Using(createAdminClient(brokers, listenerName)) {
Using.resource(createAdminClient(brokers, listenerName)) {
admin => {
admin.alterClientQuotas(Collections.singleton(
new ClientQuotaAlteration(

View File

@ -111,7 +111,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
//replicate for each of the two follower brokers.
if (!leaderThrottle) throttle = throttle * 3
Using(createAdminClient(brokers, listenerName)) { admin =>
Using.resource(createAdminClient(brokers, listenerName)) { admin =>
(106 to 107).foreach(registerBroker)
admin.createTopics(List(new NewTopic(topic, assignment.map(a => a._1.asInstanceOf[Integer] ->
a._2.map(_.asInstanceOf[Integer]).toList.asJava).asJava)).asJava).all().get()
@ -212,7 +212,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
val expectedDuration = 4
val throttle: Long = msg.length * msgCount / expectedDuration
Using(createAdminClient(brokers, listenerName)) { admin =>
Using.resource(createAdminClient(brokers, listenerName)) { admin =>
registerBroker(101)
admin.createTopics(
List(new NewTopic(topic, Collections.singletonMap(0, List(100, 101).map(_.asInstanceOf[Integer]).asJava))).asJava

View File

@ -283,7 +283,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
}
private def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = {
Using(createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
Using.resource(createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
try {
TestUtils.createTopicWithAdmin(
admin = admin,

View File

@ -560,7 +560,7 @@ class DumpLogSegmentsTest {
val lastContainedLogTimestamp = 10000
Using(
Using.resource(
new RecordsSnapshotWriter.Builder()
.setTime(new MockTime)
.setLastContainedLogTimestamp(lastContainedLogTimestamp)