diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 2259acd9c2b..bba5278d7a6 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -17,7 +17,6 @@ package kafka.admin -import java.util.Collections import kafka.server.{BaseRequestTest, BrokerServer} import kafka.utils.TestUtils import kafka.utils.TestUtils._ @@ -28,8 +27,6 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} import java.util -import java.util.Arrays.asList -import java.util.Collections.singletonList import java.util.concurrent.ExecutionException import scala.jdk.CollectionConverters._ @@ -66,8 +63,8 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testWrongReplicaCount(): Unit = { assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => { - admin.createPartitions(Collections.singletonMap(topic1, - NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get() + admin.createPartitions(util.Map.of(topic1, + NewPartitions.increaseTo(2, util.List.of(util.List.of[Integer](0, 1, 2))))).all().get() }).getCause.getClass) } @@ -78,12 +75,12 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testMissingPartitionsInCreateTopics(): Unit = { val topic6Placements = new util.HashMap[Integer, util.List[Integer]] - topic6Placements.put(1, asList(0, 1)) - topic6Placements.put(2, asList(1, 0)) + topic6Placements.put(1, util.List.of(0, 1)) + topic6Placements.put(2, util.List.of(1, 0)) val topic7Placements = new util.HashMap[Integer, util.List[Integer]] - topic7Placements.put(2, asList(0, 1)) - topic7Placements.put(3, asList(1, 0)) - val futures = admin.createTopics(asList( + topic7Placements.put(2, util.List.of(0, 1)) + topic7Placements.put(3, util.List.of(1, 0)) + val futures = admin.createTopics(util.List.of( new NewTopic("new-topic6", topic6Placements), new NewTopic("new-topic7", topic7Placements))).values() val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause @@ -103,8 +100,8 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testMissingPartitionsInCreatePartitions(): Unit = { val cause = assertThrows(classOf[ExecutionException], () => - admin.createPartitions(Collections.singletonMap(topic1, - NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause + admin.createPartitions(util.Map.of(topic1, + NewPartitions.increaseTo(3, util.List.of(util.List.of[Integer](0, 1, 2))))).all().get()).getCause assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass) assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " + "were specified."), "Unexpected error message: " + cause.getMessage) @@ -112,7 +109,7 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testIncrementPartitions(): Unit = { - admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get() + admin.createPartitions(util.Map.of(topic1, NewPartitions.increaseTo(3))).all().get() // wait until leader is elected waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1) @@ -122,7 +119,7 @@ class AddPartitionsTest extends BaseRequestTest { TestUtils.waitForPartitionMetadata(brokers, topic1, 1) TestUtils.waitForPartitionMetadata(brokers, topic1, 2) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic1).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic1), false).build) assertEquals(1, response.topicMetadata.size) val partitions = response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition) assertEquals(partitions.size, 3) @@ -141,8 +138,8 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testManualAssignmentOfReplicas(): Unit = { // Add 2 partitions - admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3, - asList(asList(0, 1), asList(2, 3))))).all().get() + admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3, + util.List.of(util.List.of[Integer](0, 1), util.List.of[Integer](2, 3))))).all().get() // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1) val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2) @@ -153,7 +150,7 @@ class AddPartitionsTest extends BaseRequestTest { val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2) assertEquals(leader2, partition2Metadata.leader()) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic2).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic2), false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head val partitionMetadata = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) @@ -168,7 +165,7 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testReplicaPlacementAllServers(): Unit = { - admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get() + admin.createPartitions(util.Map.of(topic3, NewPartitions.increaseTo(7))).all().get() // read metadata from a broker and verify the new topic partitions exist TestUtils.waitForPartitionMetadata(brokers, topic3, 1) @@ -179,7 +176,7 @@ class AddPartitionsTest extends BaseRequestTest { TestUtils.waitForPartitionMetadata(brokers, topic3, 6) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic3).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic3), false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head @@ -195,14 +192,14 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testReplicaPlacementPartialServers(): Unit = { - admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get() + admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3))).all().get() // read metadata from a broker and verify the new topic partitions exist TestUtils.waitForPartitionMetadata(brokers, topic2, 1) TestUtils.waitForPartitionMetadata(brokers, topic2, 2) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic2).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic2), false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index fff1930a718..ae1e86421eb 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -38,7 +38,6 @@ import java.io.File import java.lang.{Long => JLong} import java.util.{Optional, Properties} import java.util.concurrent.atomic.AtomicInteger -import scala.jdk.CollectionConverters._ object AbstractPartitionTest { val brokerId = 101 @@ -121,7 +120,7 @@ class AbstractPartitionTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val controllerEpoch = 0 - val replicas = List[Integer](brokerId, remoteReplicaId).asJava + val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId) val isr = replicas if (isLeader) { diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala index c34a7ac7536..fe8f67eed5f 100644 --- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala @@ -21,97 +21,99 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import java.util + import scala.jdk.CollectionConverters._ object AssignmentStateTest { import AbstractPartitionTest._ - def parameters: java.util.stream.Stream[Arguments] = Seq[Arguments]( + def parameters: util.stream.Stream[Arguments] = util.List.of[Arguments]( Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List.empty[Integer], List.empty[Integer], Seq.empty[Int], Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(false)), Arguments.of( - List[Integer](brokerId, brokerId + 1), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List.empty[Integer], List.empty[Integer], Seq.empty[Int], Boolean.box(true)), + util.List.of[Integer](brokerId, brokerId + 1), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(true)), Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4), - List[Integer](brokerId + 1), - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4), + util.List.of[Integer](brokerId + 1), + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List.empty[Integer], - List[Integer](brokerId + 1), - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer], + util.List.of[Integer](brokerId + 1), + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 1, brokerId + 2), - List[Integer](brokerId + 1, brokerId + 2), - List[Integer](brokerId), - List.empty[Integer], - Seq(brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId), + util.List.of[Integer], + util.List.of(brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 2, brokerId + 3, brokerId + 4), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 2, brokerId + 3, brokerId + 4), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 2, brokerId + 3), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true)) - ).asJava.stream() + util.List.of[Integer](brokerId + 2, brokerId + 3), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true)) + ).stream() } class AssignmentStateTest extends AbstractPartitionTest { @ParameterizedTest @MethodSource(Array("parameters")) - def testPartitionAssignmentStatus(isr: List[Integer], replicas: List[Integer], - adding: List[Integer], removing: List[Integer], - original: Seq[Int], isUnderReplicated: Boolean): Unit = { + def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], + adding: util.List[Integer], removing: util.List[Integer], + original: util.List[Int], isUnderReplicated: Boolean): Unit = { val controllerEpoch = 3 val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(6) - .setIsr(isr.asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.asJava) + .setReplicas(replicas) .setIsNew(false) - if (adding.nonEmpty) - leaderState.setAddingReplicas(adding.asJava) - if (removing.nonEmpty) - leaderState.setRemovingReplicas(removing.asJava) + if (!adding.isEmpty) + leaderState.setAddingReplicas(adding) + if (!removing.isEmpty) + leaderState.setRemovingReplicas(removing) - val isReassigning = adding.nonEmpty || removing.nonEmpty + val isReassigning = !adding.isEmpty || !removing.isEmpty // set the original replicas as the URP calculation will need them - if (original.nonEmpty) - partition.assignmentState = SimpleAssignmentState(original) + if (!original.isEmpty) + partition.assignmentState = SimpleAssignmentState(original.asScala) // do the test partition.makeLeader(leaderState, offsetCheckpoints, None) assertEquals(isReassigning, partition.isReassigning) - if (adding.nonEmpty) - adding.foreach(r => assertTrue(partition.isAddingReplica(r))) + if (!adding.isEmpty) + adding.forEach(r => assertTrue(partition.isAddingReplica(r))) if (adding.contains(brokerId)) assertTrue(partition.isAddingLocalReplica) else diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 77b098cf682..81a72b27323 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -18,6 +18,7 @@ package kafka.cluster import java.lang.{Long => JLong} +import java.util import java.util.{Optional, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean @@ -139,9 +140,9 @@ class PartitionLockTest extends Logging { def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { val active = new AtomicBoolean(true) val replicaToCheck = 3 - val firstReplicaSet = Seq[Integer](3, 4, 5).asJava - val secondReplicaSet = Seq[Integer](1, 2, 3).asJava - def partitionState(replicas: java.util.List[Integer]) = new LeaderAndIsrRequest.PartitionState() + val firstReplicaSet = util.List.of[Integer](3, 4, 5) + val secondReplicaSet = util.List.of[Integer](1, 2, 3) + def partitionState(replicas: util.List[Integer]) = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(1) .setLeader(replicas.get(0)) .setLeaderEpoch(1) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 63287ac853a..91fff3408eb 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -67,6 +67,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.lang +import java.util import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOption @@ -102,7 +104,7 @@ object PartitionTest { /** * Verifies the callbacks that have been triggered since the last - * verification. Values different than `-1` are the ones that have + * verification. Values different from `-1` are the ones that have * been updated. */ def verify( @@ -186,7 +188,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 10 val logStartOffset = 0L val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true) - addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](remoteReplicaId)) def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = { new FetchResponseData.EpochEndOffset() @@ -308,8 +310,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val followerId = brokerId + 1 val leaderId = brokerId + 2 - val replicas = List[Integer](brokerId, followerId, leaderId).asJava - val isr = List[Integer](brokerId, followerId, leaderId).asJava + val replicas = util.List.of[Integer](brokerId, followerId, leaderId) + val isr = util.List.of[Integer](brokerId, followerId, leaderId) val leaderEpoch = 8 val partitionEpoch = 1 @@ -351,11 +353,11 @@ class PartitionTest extends AbstractPartitionTest { val validReplica = brokerId + 1 val addingReplica1 = brokerId + 2 val addingReplica2 = brokerId + 3 - val replicas = List(leader, validReplica) - val isr = List[Integer](leader, validReplica).asJava + val replicas = util.List.of[Integer](leader, validReplica) + val isr = util.List.of[Integer](leader, validReplica) val leaderEpoch = 8 val partitionEpoch = 1 - addBrokerEpochToMockMetadataCache(metadataCache, List(leader, addingReplica1, addingReplica2)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](leader, addingReplica1, addingReplica2)) assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -363,7 +365,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(partitionEpoch) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, topicId )) @@ -389,9 +391,9 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(None, partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset)) // The replicas are added as part of a reassignment - val newReplicas = List(leader, validReplica, addingReplica1, addingReplica2) + val newReplicas = util.List.of[Integer](leader, validReplica, addingReplica1, addingReplica2) val newPartitionEpoch = partitionEpoch + 1 - val addingReplicas = List(addingReplica1, addingReplica2) + val addingReplicas = util.List.of[Integer](addingReplica1, addingReplica2) assertFalse(partition.makeLeader(new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -399,8 +401,8 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(newPartitionEpoch) - .setReplicas(newReplicas.map(Int.box).asJava) - .setAddingReplicas(addingReplicas.map(Int.box).asJava) + .setReplicas(newReplicas) + .setAddingReplicas(addingReplicas) .setIsNew(true), offsetCheckpoints, None )) @@ -485,16 +487,16 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(2) .setLeaderEpoch(prevLeaderEpoch) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) val appendThread = new Thread { override def run(): Unit = { val records = createRecords( - List( + util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes) ), @@ -511,9 +513,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(2) .setLeaderEpoch(prevLeaderEpoch + 1) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(2) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) @@ -665,7 +667,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) - addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](remoteReplicaId)) def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = { fetchFollower( partition, @@ -804,8 +806,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = List(leader, follower1, follower2) - val isr = List[Integer](leader, follower2).asJava + val replicas = util.List.of[Integer](leader, follower1, follower2) + val isr = util.List.of[Integer](leader, follower2) val leaderEpoch = 8 val batch1 = TestUtils.records(records = List( new SimpleRecord(10, "k1".getBytes, "v1".getBytes), @@ -821,7 +823,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") @@ -892,7 +894,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(4) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeFollower(followerState, offsetCheckpoints, None)) @@ -904,7 +906,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(5) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None)) @@ -984,9 +986,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(epoch) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) partition.makeFollower(partitionState, offsetCheckpoints, None) @@ -1002,7 +1004,7 @@ class PartitionTest extends AbstractPartitionTest { classOf[UnexpectedAppendOffsetException], // append one record with offset = 3 () => partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false, partitionLeaderEpoch = epoch ) @@ -1013,7 +1015,7 @@ class PartitionTest extends AbstractPartitionTest { // verify that we can append records that contain log start offset, even when first // offset < log start offset if the log is empty val newLogStartOffset = 4L - val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + val records = createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), baseOffset = newLogStartOffset) @@ -1023,7 +1025,7 @@ class PartitionTest extends AbstractPartitionTest { // and we can append more records after that partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false, partitionLeaderEpoch = epoch ) @@ -1031,7 +1033,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(newLogStartOffset, log.logStartOffset, s"Log start offset not expected to change:") // but we cannot append to offset < log start if the log is not empty - val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + val records2 = createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes)), baseOffset = 3L) assertThrows( @@ -1042,7 +1044,7 @@ class PartitionTest extends AbstractPartitionTest { // we still can append to next offset partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false, partitionLeaderEpoch = epoch ) @@ -1054,7 +1056,7 @@ class PartitionTest extends AbstractPartitionTest { def testListOffsetIsolationLevels(): Unit = { val controllerEpoch = 0 val leaderEpoch = 5 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1069,13 +1071,13 @@ class PartitionTest extends AbstractPartitionTest { .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) - val records = createTransactionalRecords(List( + val records = createTransactionalRecords(util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), baseOffset = 0L, producerId = 2L) - val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0, true) + val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0, supportsEpochBump = true) partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching, verificationGuard) def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = { @@ -1131,7 +1133,7 @@ class PartitionTest extends AbstractPartitionTest { assertThrows( classOf[NotLeaderOrFollowerException], () => partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false, partitionLeaderEpoch = 0 ) @@ -1145,9 +1147,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(1) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) partition.makeFollower(partitionState, offsetCheckpoints, None) @@ -1156,9 +1158,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(4) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) @@ -1167,9 +1169,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(4) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) assertFalse(partition.makeFollower(partitionState, offsetCheckpoints, None)) } @@ -1179,8 +1181,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = Seq(leader, follower1, follower2) - val isr = List[Integer](leader, follower2).asJava + val replicas = util.List.of[Integer](leader, follower1, follower2) + val isr = util.List.of[Integer](leader, follower2) val leaderEpoch = 8 val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes))) @@ -1189,7 +1191,7 @@ class PartitionTest extends AbstractPartitionTest { new SimpleRecord("k5".getBytes, "v3".getBytes))) val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes), new SimpleRecord("k7".getBytes, "v2".getBytes))) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -1197,7 +1199,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") @@ -1224,7 +1226,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) partition.makeFollower(followerState, offsetCheckpoints, None) @@ -1234,7 +1236,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, topicId), "Expected makeLeader() to return 'leader changed' after makeFollower()") @@ -1259,38 +1261,38 @@ class PartitionTest extends AbstractPartitionTest { Set(leader, follower1, follower2), "AlterIsr") } - def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + def createRecords(records: lang.Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)) val builder = MemoryRecords.builder( buf, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds, partitionLeaderEpoch) - records.foreach(builder.append) + records.forEach(builder.append) builder.build() } - def createIdempotentRecords(records: Iterable[SimpleRecord], + def createIdempotentRecords(records: lang.Iterable[SimpleRecord], baseOffset: Long, baseSequence: Int = 0, producerId: Long = 1L): MemoryRecords = { val producerEpoch = 0.toShort val isTransactional = false - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)) val builder = MemoryRecords.builder(buf, Compression.NONE, baseOffset, producerId, producerEpoch, baseSequence, isTransactional) - records.foreach(builder.append) + records.forEach(builder.append) builder.build() } - def createTransactionalRecords(records: Iterable[SimpleRecord], + def createTransactionalRecords(records: lang.Iterable[SimpleRecord], baseOffset: Long, baseSequence: Int = 0, producerId: Long = 1L): MemoryRecords = { val producerEpoch = 0.toShort val isTransactional = true - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)) val builder = MemoryRecords.builder(buf, Compression.NONE, baseOffset, producerId, producerEpoch, baseSequence, isTransactional) - records.foreach(builder.append) + records.forEach(builder.append) builder.build() } @@ -1304,8 +1306,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = List[Integer](leader, follower1, follower2).asJava - val isr = List[Integer](leader).asJava + val replicas = util.List.of[Integer](leader, follower1, follower2) + val isr = util.List.of[Integer](leader) val leaderEpoch = 8 assertFalse(partition.isAtMinIsr) @@ -1341,8 +1343,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -1353,9 +1355,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(1) - .setIsr(List(brokerId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId)) .setPartitionEpoch(2) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setIsNew(false) partition.makeLeader(LeaderState, offsetCheckpoints, None) @@ -1370,9 +1372,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) val isr = replicas - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1382,9 +1384,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -1434,7 +1436,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1445,9 +1447,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(List[Integer](brokerId).asJava) + .setIsr(util.List.of[Integer](brokerId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -1470,9 +1472,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = List[Integer](brokerId).asJava - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1482,7 +1484,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1522,8 +1524,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val isr = List[Integer](brokerId).asJava + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1534,7 +1536,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1556,7 +1558,7 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) assertEquals(alterPartitionManager.isrUpdates.size, 1) val isrItem = alterPartitionManager.isrUpdates.head - assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava) + assertEquals(isrItem.leaderAndIsr.isr, util.List.of[Integer](brokerId, remoteBrokerId)) isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => // the broker epochs should be equal to broker epoch of the leader assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) @@ -1586,9 +1588,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = List[Integer](brokerId).asJava - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1598,7 +1600,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1642,8 +1644,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val shrinkedIsr = Set(brokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val shrinkedIsr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1667,17 +1669,17 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(replicas.map(Int.box).asJava) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false), offsetCheckpoints, None ), "Expected become leader transition to succeed" ) - assertEquals(replicas.toSet, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) @@ -1709,9 +1711,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(shrinkedIsr.toList.map(Int.box).asJava) + .setIsr(shrinkedIsr) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false), offsetCheckpoints, None @@ -1720,11 +1722,11 @@ class PartitionTest extends AbstractPartitionTest { ) assertTrue(partition.isLeader) - assertEquals(shrinkedIsr, partition.partitionState.isr) - assertEquals(shrinkedIsr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.maximalIsr) assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs)) - // In the case of unfenced, the HWM doesn't increase, otherwise the the HWM increases because the + // In the case of unfenced, the HWM doesn't increase, otherwise the HWM increases because the // fenced and shutdown replica is not considered during HWM calculation. if (brokerState == "unfenced") { assertEquals(10, partition.localLogOrException.highWatermark) @@ -1741,8 +1743,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val isr = Set(brokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1771,13 +1773,13 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) markRemoteReplicaEligible(true) @@ -1793,16 +1795,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is fenced or offline. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1813,8 +1815,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1825,8 +1827,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -1834,8 +1836,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1) // ISR is committed. - assertEquals(replicas.toSet, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -1849,8 +1851,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = List(brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = Set(brokerId, remoteBrokerId2) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = util.List.of[Integer](brokerId, remoteBrokerId2) val metadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache]) @@ -1877,16 +1879,16 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. - addBrokerEpochToMockMetadataCache(metadataCache, List(brokerId, remoteBrokerId2)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](brokerId, remoteBrokerId2)) // Create a race case where the replica epoch get bumped right after the previous fetch succeeded. val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) - 1 when(metadataCache.getAliveBrokerEpoch(remoteBrokerId1)).thenReturn(Optional.of(wrongReplicaEpoch), Optional.of(defaultBrokerEpoch(remoteBrokerId1))) @@ -1904,8 +1906,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is not triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertEquals(0, alterPartitionManager.isrUpdates.size) // Fetch again, this time with correct default broker epoch. @@ -1922,8 +1924,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => @@ -1943,8 +1945,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId1) - val isr = Set(brokerId, remoteBrokerId1) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1) + val isr = util.List.of[Integer](brokerId, remoteBrokerId1) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1967,13 +1969,13 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) val expectedReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) fetchFollower(partition, @@ -2008,8 +2010,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val isr = Set(brokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( @@ -2031,13 +2033,13 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset and // to check if an expansion is possible. @@ -2051,16 +2053,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is in controlled shutdown. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2071,8 +2073,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2083,8 +2085,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -2092,8 +2094,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1) // ISR is committed. - assertEquals(replicas.toSet, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -2106,8 +2108,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val topicId = Uuid.randomUuid() assertTrue(makeLeader( @@ -2127,7 +2129,7 @@ class PartitionTest extends AbstractPartitionTest { // Try to shrink the ISR partition.maybeShrinkIsr() assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId).map(Int.box).asJava) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.List.of[Integer](brokerId)) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) @@ -2160,12 +2162,12 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) val initializeTimeMs = time.milliseconds() val metadataCache = mock(classOf[KRaftMetadataCache]) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( topicPartition, @@ -2186,9 +2188,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -2213,7 +2215,7 @@ class PartitionTest extends AbstractPartitionTest { partition.maybeShrinkIsr() assertEquals(0, alterPartitionListener.shrinks.get) assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId, remoteBrokerId1).map(Int.box).asJava) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.List.of[Integer](brokerId, remoteBrokerId1)) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) @@ -2243,10 +2245,10 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = Seq(brokerId, remoteBrokerId1) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = util.List.of[Integer](brokerId, remoteBrokerId1) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( topicPartition, @@ -2267,9 +2269,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -2299,8 +2301,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() assertTrue(makeLeader( @@ -2357,10 +2359,10 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( topicId = topicId, @@ -2417,10 +2419,10 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( topicId = topicId, @@ -2465,8 +2467,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() assertTrue(makeLeader( @@ -2552,9 +2554,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( topicId = topicId, @@ -2595,7 +2597,7 @@ class PartitionTest extends AbstractPartitionTest { private def createClientResponseWithAlterPartitionResponse( topicPartition: TopicPartition, partitionErrorCode: Short, - isr: List[Int] = List.empty, + isr: util.List[Integer] = util.List.of[Integer], leaderEpoch: Int = 0, partitionEpoch: Int = 0 ): ClientResponse = { @@ -2604,7 +2606,7 @@ class PartitionTest extends AbstractPartitionTest { topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData() .setPartitionIndex(topicPartition.partition) - .setIsr(isr.map(Integer.valueOf).asJava) + .setIsr(isr) .setLeaderEpoch(leaderEpoch) .setPartitionEpoch(partitionEpoch) .setErrorCode(partitionErrorCode)) @@ -2646,10 +2648,10 @@ class PartitionTest extends AbstractPartitionTest { val follower1 = brokerId + 1 val follower2 = brokerId + 2 val follower3 = brokerId + 3 - val replicas = Seq(brokerId, follower1, follower2, follower3) - val isr = Seq(brokerId, follower1, follower2) + val replicas = util.List.of[Integer](brokerId, follower1, follower2, follower3) + val isr = util.List.of[Integer](brokerId, follower1, follower2) val partitionEpoch = 1 - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -2659,7 +2661,7 @@ class PartitionTest extends AbstractPartitionTest { // Complete the ISR expansion val alterPartitionResponseWithoutError = - createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, List(brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1) + createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, util.List.of[Integer](brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1) when(mockChannelManager.sendRequest(any(), any())) .thenAnswer { invocation => @@ -2703,9 +2705,9 @@ class PartitionTest extends AbstractPartitionTest { val follower1 = brokerId + 1 val follower2 = brokerId + 2 val follower3 = brokerId + 3 - val replicas = Seq(brokerId, follower1, follower2, follower3) - val isr = Seq(brokerId, follower1, follower2) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, follower1, follower2, follower3) + val isr = util.List.of[Integer](brokerId, follower1, follower2) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -2743,7 +2745,7 @@ class PartitionTest extends AbstractPartitionTest { .thenReturn(Optional.of(long2Long(4L))) val controllerEpoch = 3 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -2761,7 +2763,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderEpoch = 5 val topicId = Uuid.randomUuid() - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -2805,7 +2807,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderEpoch = 5 val topicId = Uuid.randomUuid() - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -2880,8 +2882,8 @@ class PartitionTest extends AbstractPartitionTest { @Test def testUnderReplicatedPartitionsCorrectSemantics(): Unit = { val controllerEpoch = 3 - val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava - val isr = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2) + val isr = util.List.of[Integer](brokerId, brokerId + 1) var leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3082,7 +3084,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderId = brokerId val followerId = brokerId + 1 - val replicas = List(leaderId, followerId) + val replicas = util.List.of[Integer](leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -3091,9 +3093,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) @@ -3124,9 +3126,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3147,7 +3149,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderId = brokerId val followerId = brokerId + 1 - val replicas = List(leaderId, followerId) + val replicas = util.List.of[Integer](leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() @@ -3155,9 +3157,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) @@ -3181,9 +3183,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3198,7 +3200,7 @@ class PartitionTest extends AbstractPartitionTest { def testIgnoreLeaderPartitionStateChangeWithOlderPartitionEpoch(): Unit = { val controllerEpoch = 3 val leaderId = brokerId - val replicas = List(leaderId) + val replicas = util.List.of[Integer](leaderId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() @@ -3206,9 +3208,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) @@ -3221,9 +3223,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(0) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3236,7 +3238,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderId = brokerId val followerId = brokerId + 1 - val replicas = List(leaderId, followerId) + val replicas = util.List.of[Integer](leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() @@ -3244,9 +3246,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(followerId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId, followerId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId, followerId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeFollower(initialFollowerState, offsetCheckpoints, Some(topicId))) @@ -3259,9 +3261,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(followerId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId, followerId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId, followerId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertFalse(partition.makeFollower(updatedFollowerState, offsetCheckpoints, Some(topicId))) @@ -3275,7 +3277,7 @@ class PartitionTest extends AbstractPartitionTest { val localReplica = brokerId val remoteReplica1 = brokerId + 1 val remoteReplica2 = brokerId + 2 - val replicas = List(localReplica, remoteReplica1, remoteReplica2) + val replicas = util.List.of[Integer](localReplica, remoteReplica1, remoteReplica2) val topicId = Uuid.randomUuid() // The local replica is the leader. @@ -3283,27 +3285,27 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(localReplica) .setLeaderEpoch(1) - .setIsr(replicas.map(Int.box).asJava) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(1, partition.getLeaderEpoch) assertEquals(Some(localReplica), partition.leaderReplicaIdOpt) - assertEquals(replicas.toSet, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) assertEquals(Seq(remoteReplica1, remoteReplica2), partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas, partition.assignmentState.replicas) + assertEquals(replicas.asScala, partition.assignmentState.replicas) // The local replica becomes a follower. val updatedLeaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(remoteReplica1) .setLeaderEpoch(2) - .setIsr(replicas.map(Int.box).asJava) + .setIsr(replicas) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeFollower(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3312,23 +3314,23 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(Some(remoteReplica1), partition.leaderReplicaIdOpt) assertEquals(Set.empty, partition.partitionState.isr) assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas, partition.assignmentState.replicas) + assertEquals(replicas.asScala, partition.assignmentState.replicas) } @Test def testAddAndRemoveListeners(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - val replicas = Seq(brokerId, brokerId + 1) + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(isr.map(Int.box).asJava) - .setReplicas(replicas.map(Int.box).asJava) + .setIsr(isr) + .setReplicas(replicas) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3397,8 +3399,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3413,16 +3415,16 @@ class PartitionTest extends AbstractPartitionTest { def testPartitionListenerWhenLogOffsetsChanged(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - val replicas = Seq(brokerId, brokerId + 1) - val isr = Seq(brokerId, brokerId + 1) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val isr = util.List.of[Integer](brokerId, brokerId + 1) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(isr.map(Int.box).asJava) - .setReplicas(replicas.map(Int.box).asJava) + .setIsr(isr) + .setReplicas(replicas) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3449,7 +3451,7 @@ class PartitionTest extends AbstractPartitionTest { listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset) - partition.truncateFullyAndStartAt(0L, false) + partition.truncateFullyAndStartAt(0L, isFuture = false) listener.verify(expectedHighWatermark = 0L) } @@ -3463,8 +3465,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3487,8 +3489,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3508,17 +3510,17 @@ class PartitionTest extends AbstractPartitionTest { partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = topicId) assertTrue(partition.log.isDefined) - val replicas = Seq(brokerId, brokerId + 1) + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val epoch = 0 - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(epoch) - .setIsr(isr.map(Int.box).asJava) - .setReplicas(replicas.map(Int.box).asJava) + .setIsr(isr) + .setReplicas(replicas) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3578,7 +3580,7 @@ class PartitionTest extends AbstractPartitionTest { def testMaybeStartTransactionVerification(): Unit = { val controllerEpoch = 0 val leaderEpoch = 5 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val producerId = 22L @@ -3594,7 +3596,7 @@ class PartitionTest extends AbstractPartitionTest { .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) - val idempotentRecords = createIdempotentRecords(List( + val idempotentRecords = createIdempotentRecords(util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), @@ -3602,7 +3604,7 @@ class PartitionTest extends AbstractPartitionTest { producerId = producerId) partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) - def transactionRecords() = createTransactionalRecords(List( + def transactionRecords() = createTransactionalRecords(util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), @@ -3614,7 +3616,7 @@ class PartitionTest extends AbstractPartitionTest { assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)) // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard. - val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0, true) + val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0, supportsEpochBump = true) assertNotEquals(VerificationGuard.SENTINEL, verificationGuard) // With the wrong VerificationGuard, append should fail. @@ -3622,12 +3624,12 @@ class PartitionTest extends AbstractPartitionTest { origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, new VerificationGuard())) // We should return the same VerificationGuard when we still need to verify. Append should proceed. - val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0, true) + val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0, supportsEpochBump = true) assertEquals(verificationGuard, verificationGuard2) partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard) // We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed. - val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0, true) + val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0, supportsEpochBump = true) assertEquals(VerificationGuard.SENTINEL, verificationGuard3) partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) } @@ -3636,8 +3638,8 @@ class PartitionTest extends AbstractPartitionTest { topicId: Option[Uuid], controllerEpoch: Int, leaderEpoch: Int, - isr: Seq[Int], - replicas: Seq[Int], + isr: util.List[Integer], + replicas: util.List[Integer], partitionEpoch: Int, isNew: Boolean, partition: Partition = partition @@ -3653,9 +3655,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(partitionEpoch) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(isNew), offsetCheckpoints, topicId @@ -3664,8 +3666,8 @@ class PartitionTest extends AbstractPartitionTest { assertFalse(partition.partitionState.isInflight) assertEquals(topicId, partition.topicId) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(isr.toSet, partition.partitionState.isr) - assertEquals(isr.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertEquals(partitionEpoch, partition.getPartitionEpoch) newLeader } @@ -3804,8 +3806,8 @@ class PartitionTest extends AbstractPartitionTest { ) } - private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: List[Int]): Unit = { - brokers.foreach { broker => + private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: util.List[Integer]): Unit = { + brokers.forEach { broker => when(metadataCache.getAliveBrokerEpoch(broker)).thenReturn(Optional.of(defaultBrokerEpoch(broker))) } } @@ -3832,7 +3834,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3877,7 +3879,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3922,7 +3924,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3967,7 +3969,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -4012,7 +4014,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -4058,7 +4060,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index 262bc3f85b6..c55d2c0da3d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -34,9 +34,9 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} import java.nio.charset.Charset +import java.util import java.util.Optional import scala.collection.Map -import scala.jdk.CollectionConverters._ class CoordinatorPartitionWriterTest { @Test @@ -75,8 +75,8 @@ class CoordinatorPartitionWriterTest { replicaManager ) - when(replicaManager.getLogConfig(tp)).thenReturn(Some(new LogConfig(Map.empty.asJava))) - assertEquals(new LogConfig(Map.empty.asJava), partitionRecordWriter.config(tp)) + when(replicaManager.getLogConfig(tp)).thenReturn(Some(new LogConfig(util.Map.of))) + assertEquals(new LogConfig(util.Map.of), partitionRecordWriter.config(tp)) when(replicaManager.getLogConfig(tp)).thenReturn(None) assertThrows(classOf[NotLeaderOrFollowerException], () => partitionRecordWriter.config(tp)) @@ -119,7 +119,7 @@ class CoordinatorPartitionWriterTest { 10L ), Option.empty, - false + hasCustomErrorMessage = false ))) val batch = MemoryRecords.withRecords( @@ -215,7 +215,7 @@ class CoordinatorPartitionWriterTest { )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(Errors.NOT_LEADER_OR_FOLLOWER.exception), - false + hasCustomErrorMessage = false ))) val batch = MemoryRecords.withRecords( diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 5db59dd51fe..030295975a4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -17,7 +17,7 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer -import java.util.Collections +import java.util import java.util.Optional import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -87,7 +87,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), - Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), + util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), 0) } @@ -476,7 +476,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren private def prepareTxnLog(partitionId: Int): Unit = { val logMock: UnifiedLog = mock(classOf[UnifiedLog]) - when(logMock.config).thenReturn(new LogConfig(Collections.emptyMap())) + when(logMock.config).thenReturn(new LogConfig(util.Map.of)) val fileRecordsMock: FileRecords = mock(classOf[FileRecords]) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala index 91039d9abc0..dd378757130 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala @@ -16,7 +16,6 @@ */ package kafka.coordinator.transaction - import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} @@ -30,7 +29,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, import org.junit.jupiter.api.Test import java.nio.ByteBuffer -import java.util.Collections +import java.util import scala.collection.Seq import scala.jdk.CollectionConverters._ @@ -115,14 +114,14 @@ class TransactionLogTest { @Test def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { - val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, Collections.emptySet(), 500, 500, TV_0) + val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, util.Set.of, 500, 500, TV_0) val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_0)) assertEquals(0, txnLogValueBuffer.getShort) } @Test def testSerializeTransactionLogValueToFlexibleVersion(): Unit = { - val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, Collections.emptySet(), 500, 500, TV_2) + val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, util.Set.of, 500, 500, TV_2) val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_2)) assertEquals(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, txnLogValueBuffer.getShort) } @@ -131,7 +130,7 @@ class TransactionLogTest { def testDeserializeHighestSupportedTransactionLogValue(): Unit = { val txnPartitions = new TransactionLogValue.PartitionsSchema() .setTopic("topic") - .setPartitionIds(java.util.Collections.singletonList(0)) + .setPartitionIds(util.List.of(0)) val txnLogValue = new TransactionLogValue() .setProducerId(100) @@ -140,7 +139,7 @@ class TransactionLogTest { .setTransactionStartTimestampMs(750L) .setTransactionLastUpdateTimestampMs(1000L) .setTransactionTimeoutMs(500) - .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) + .setTransactionPartitions(util.List.of(txnPartitions)) val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get @@ -174,7 +173,7 @@ class TransactionLogTest { val txnPartitions = new Struct(futurePartitionsSchema) txnPartitions.set("topic", "topic") txnPartitions.set("partition_ids", Array(Integer.valueOf(1))) - val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]() + val txnPartitionsTaggedFields = new util.TreeMap[Integer, Any]() txnPartitionsTaggedFields.put(100, "foo") txnPartitionsTaggedFields.put(101, 4000) txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields) @@ -204,7 +203,7 @@ class TransactionLogTest { transactionLogValue.set("transaction_partitions", Array(txnPartitions)) transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L) transactionLogValue.set("transaction_start_timestamp_ms", 3000L) - val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]() + val txnLogValueTaggedFields = new util.TreeMap[Integer, Any]() txnLogValueTaggedFields.put(100, "foo") txnLogValueTaggedFields.put(101, 4000) transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 4131f564d63..ec12afe6489 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -17,8 +17,6 @@ package kafka.coordinator.transaction import java.util -import java.util.Arrays.asList -import java.util.Collections import java.util.Optional import java.util.concurrent.{Callable, Executors, Future} import kafka.server.KafkaConfig @@ -156,7 +154,7 @@ class TransactionMarkerChannelManagerTest { val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) val response = new WriteTxnMarkersResponse( - Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) + util.Map.of(producerId2: java.lang.Long, util.Map.of(partition1, Errors.NONE))) val clientResponse = new ClientResponse(header, null, null, time.milliseconds(), time.milliseconds(), false, null, null, response) @@ -205,7 +203,7 @@ class TransactionMarkerChannelManagerTest { // Build a successful client response. val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) val successfulResponse = new WriteTxnMarkersResponse( - Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) + util.Map.of(producerId2: java.lang.Long, util.Map.of(partition1, Errors.NONE))) val successfulClientResponse = new ClientResponse(header, null, null, time.milliseconds(), time.milliseconds(), false, null, null, successfulResponse) @@ -302,10 +300,10 @@ class TransactionMarkerChannelManagerTest { assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2)) val expectedBroker1Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)), - new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)), + new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)))).build() val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build() val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) @@ -372,10 +370,10 @@ class TransactionMarkerChannelManagerTest { assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2)) val expectedBroker1Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)), - new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)), + new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)))).build() val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build() val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index b34b0725020..56dc0ec266c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -17,7 +17,6 @@ package kafka.coordinator.transaction import java.{lang, util} -import java.util.Arrays.asList import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -46,10 +45,10 @@ class TransactionMarkerRequestCompletionHandlerTest { private val topicPartition = new TopicPartition("topic1", 0) private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, mutable.Set[TopicPartition](topicPartition), 0L, 0L, TransactionVersion.TV_2) - private val pendingCompleteTxnAndMarkers = asList( + private val pendingCompleteTxnAndMarkers = util.List.of( PendingCompleteTxnAndMarkerEntry( PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, txnMetadata.prepareComplete(42)), - new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, asList(topicPartition)))) + new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, util.List.of(topicPartition)))) private val markerChannelManager: TransactionMarkerChannelManager = mock(classOf[TransactionMarkerChannelManager]) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala index f3224593514..453bc13f0bd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala @@ -127,7 +127,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) @@ -152,7 +152,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) @@ -177,7 +177,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) @@ -295,7 +295,7 @@ class TransactionMetadataTest { clientTransactionVersion = TV_0) // let new time be smaller - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(TransactionState.PREPARE_COMMIT, txnMetadata.state) assertEquals(producerId, txnMetadata.producerId) @@ -323,7 +323,7 @@ class TransactionMetadataTest { clientTransactionVersion = TV_0) // let new time be smaller - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(TransactionState.PREPARE_ABORT, txnMetadata.state) assertEquals(producerId, txnMetadata.producerId) @@ -425,7 +425,7 @@ class TransactionMetadataTest { // We should reset the pending state to make way for the abort transition. txnMetadata.pendingState = None - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds(), false) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds(), noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, transitMetadata.producerId) } @@ -537,7 +537,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) + var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch) @@ -571,7 +571,7 @@ class TransactionMetadataTest { assertTrue(txnMetadata.isProducerEpochExhausted) val newProducerId = 9893L - var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, newProducerId, time.milliseconds() - 1, false) + var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, newProducerId, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(Short.MaxValue, txnMetadata.producerEpoch) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 212a915c800..16b3224495a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -18,6 +18,7 @@ package kafka.coordinator.transaction import java.lang.management.ManagementFactory import java.nio.ByteBuffer +import java.util import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import javax.management.ObjectName import kafka.server.ReplicaManager @@ -49,7 +50,6 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort} import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when} -import java.util.Collections import scala.collection.{Map, mutable} import scala.jdk.CollectionConverters._ @@ -72,7 +72,7 @@ class TransactionStateManagerTest { when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), - Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), + util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), 0) } @@ -1354,7 +1354,7 @@ class TransactionStateManagerTest { when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), - Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), + util.Map.of(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), 0) } val transactionManager = new TransactionStateManager(0, scheduler, diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index c45464333c0..7f37eeb25a1 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -34,7 +34,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import java.io.File import java.time.Duration import java.util -import java.util.{Collections, Properties} +import java.util.Properties import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ import scala.util.Using @@ -364,7 +364,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = { Using.resource(createAdminClient(brokers, listenerName)) { admin => { - admin.alterClientQuotas(Collections.singleton( + admin.alterClientQuotas(util.Set.of( new ClientQuotaAlteration( new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "") null else sanitizedClientId)).asJava), configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get() diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index e7a8e10d80e..ecb8c8f011b 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -17,6 +17,7 @@ package kafka.integration +import java.util import java.util.Properties import java.util.concurrent.ExecutionException import scala.util.Random @@ -317,7 +318,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { valueDeserializer = new StringDeserializer) try { val tp = new TopicPartition(topic, partitionId) - consumer.assign(Seq(tp).asJava) + consumer.assign(util.List.of(tp)) consumer.seek(tp, 0) TestUtils.consumeRecords(consumer, numMessages).map(_.value) } finally consumer.close() @@ -410,10 +411,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { } private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = { - val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava - adminClient.incrementalAlterConfigs(Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> - configEntries.asScala.map((e: ConfigEntry) => new AlterConfigOp(e, AlterConfigOp.OpType.SET)).toSeq - .asJavaCollection).asJava) + val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic) + + val configEntries = topicConfigs.entrySet().stream() + .map(e => new ConfigEntry(e.getKey.toString, e.getValue.toString)) + .map(e => new AlterConfigOp(e, AlterConfigOp.OpType.SET)) + .toList + + adminClient.incrementalAlterConfigs(util.Map.of(configResource, configEntries)) } private def createAdminClient(): Admin = { @@ -427,7 +432,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache: MetadataCache, leaderId: Int): Unit = { waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic, partitionId).isPresent() && metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() == LeaderConstants.NO_LEADER && - java.util.Arrays.asList(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), + util.List.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), "Timed out waiting for broker metadata cache updates the info for topic partition:" + topicPartition) } } diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala index 75946f14075..b0dbd0a05c2 100644 --- a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala @@ -22,15 +22,12 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} import org.junit.jupiter.api.Test -import java.util.Collections -import scala.jdk.CollectionConverters._ - class KafkaMetricsGroupTest { @Test def testUntaggedMetricName(): Unit = { val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") - val metricName = metricsGroup.metricName("TaggedMetric", Collections.emptyMap()) + val metricName = metricsGroup.metricName("TaggedMetric", java.util.Map.of) assertEquals("kafka.metrics", metricName.getGroup) assertEquals("TestMetrics", metricName.getType) @@ -42,7 +39,13 @@ class KafkaMetricsGroupTest { @Test def testTaggedMetricName(): Unit = { - val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava + val tags = { + val map = new java.util.LinkedHashMap[String, String]() + map.put("foo", "bar") + map.put("bar", "baz") + map.put("baz", "raz.taz") + map + } val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricName = metricsGroup.metricName("TaggedMetric", tags) @@ -56,7 +59,13 @@ class KafkaMetricsGroupTest { @Test def testTaggedMetricNameWithEmptyValue(): Unit = { - val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava + val tags = { + val map = new java.util.LinkedHashMap[String, String]() + map.put("foo", "bar") + map.put("bar", "") + map.put("baz", "raz.taz") + map + } val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricName = metricsGroup.metricName("TaggedMetric", tags) @@ -67,6 +76,4 @@ class KafkaMetricsGroupTest { metricName.getMBeanName) assertEquals("baz.raz_taz.foo.bar", metricName.getScope) } - - } diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index d59f44e3e3c..834b8efe48d 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -18,6 +18,7 @@ package kafka.metrics import java.lang.management.ManagementFactory +import java.util import java.util.Properties import javax.management.ObjectName import com.yammer.metrics.core.MetricPredicate @@ -124,7 +125,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { def testUpdateJMXFilter(): Unit = { // verify previously exposed metrics are removed and existing matching metrics are added brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure( - Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava + util.Map.of(JmxReporter.EXCLUDE_CONFIG, "kafka.controller:type=KafkaController,name=ActiveControllerCount") )) assertFalse(ManagementFactory.getPlatformMBeanServer .isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount"))) @@ -150,9 +151,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @Test def testWindowsStyleTagNames(): Unit = { val path = "C:\\windows-path\\kafka-logs" - val tags = Map("dir" -> path) - val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=") - val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags.asJava) + val tags = util.Map.of("dir", path) + val expectedMBeanName = Set(tags.keySet().iterator().next(), ObjectName.quote(path)).mkString("=") + val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags) assert(metric.getMBeanName.endsWith(expectedMBeanName)) }