From 992eaafb62c559d14db460590d5a5e37e9194b51 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Sat, 21 Jun 2025 21:50:39 +0530 Subject: [PATCH] MINOR: Cleanup Core Module- Scala Modules (3/n) (#19804) Now that Kafka Brokers support Java 17, this PR makes some changes in core module. The changes in this PR are limited to only some Scala files in the Core module's tests. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() To be clear, the directories being targeted in this PR from unit.kafka module: - admin - cluster - coordinator - docker - integration - metrics Reviewers: Chia-Ping Tsai --- .../unit/kafka/admin/AddPartitionsTest.scala | 39 +- .../kafka/cluster/AbstractPartitionTest.scala | 3 +- .../kafka/cluster/AssignmentStateTest.scala | 116 ++-- .../kafka/cluster/PartitionLockTest.scala | 7 +- .../unit/kafka/cluster/PartitionTest.scala | 500 +++++++++--------- .../CoordinatorPartitionWriterTest.scala | 10 +- ...ransactionCoordinatorConcurrencyTest.scala | 6 +- .../transaction/TransactionLogTest.scala | 15 +- .../TransactionMarkerChannelManagerTest.scala | 18 +- ...onMarkerRequestCompletionHandlerTest.scala | 5 +- .../transaction/TransactionMetadataTest.scala | 16 +- .../TransactionStateManagerTest.scala | 6 +- .../integration/KafkaServerTestHarness.scala | 4 +- .../UncleanLeaderElectionTest.scala | 17 +- .../kafka/metrics/KafkaMetricsGroupTest.scala | 23 +- .../unit/kafka/metrics/MetricsTest.scala | 9 +- 16 files changed, 402 insertions(+), 392 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 2259acd9c2b..bba5278d7a6 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -17,7 +17,6 @@ package kafka.admin -import java.util.Collections import kafka.server.{BaseRequestTest, BrokerServer} import kafka.utils.TestUtils import kafka.utils.TestUtils._ @@ -28,8 +27,6 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} import java.util -import java.util.Arrays.asList -import java.util.Collections.singletonList import java.util.concurrent.ExecutionException import scala.jdk.CollectionConverters._ @@ -66,8 +63,8 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testWrongReplicaCount(): Unit = { assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => { - admin.createPartitions(Collections.singletonMap(topic1, - NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get() + admin.createPartitions(util.Map.of(topic1, + NewPartitions.increaseTo(2, util.List.of(util.List.of[Integer](0, 1, 2))))).all().get() }).getCause.getClass) } @@ -78,12 +75,12 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testMissingPartitionsInCreateTopics(): Unit = { val topic6Placements = new util.HashMap[Integer, util.List[Integer]] - topic6Placements.put(1, asList(0, 1)) - topic6Placements.put(2, asList(1, 0)) + topic6Placements.put(1, util.List.of(0, 1)) + topic6Placements.put(2, util.List.of(1, 0)) val topic7Placements = new util.HashMap[Integer, util.List[Integer]] - topic7Placements.put(2, asList(0, 1)) - topic7Placements.put(3, asList(1, 0)) - val futures = admin.createTopics(asList( + topic7Placements.put(2, util.List.of(0, 1)) + topic7Placements.put(3, util.List.of(1, 0)) + val futures = admin.createTopics(util.List.of( new NewTopic("new-topic6", topic6Placements), new NewTopic("new-topic7", topic7Placements))).values() val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause @@ -103,8 +100,8 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testMissingPartitionsInCreatePartitions(): Unit = { val cause = assertThrows(classOf[ExecutionException], () => - admin.createPartitions(Collections.singletonMap(topic1, - NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause + admin.createPartitions(util.Map.of(topic1, + NewPartitions.increaseTo(3, util.List.of(util.List.of[Integer](0, 1, 2))))).all().get()).getCause assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass) assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " + "were specified."), "Unexpected error message: " + cause.getMessage) @@ -112,7 +109,7 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testIncrementPartitions(): Unit = { - admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get() + admin.createPartitions(util.Map.of(topic1, NewPartitions.increaseTo(3))).all().get() // wait until leader is elected waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1) @@ -122,7 +119,7 @@ class AddPartitionsTest extends BaseRequestTest { TestUtils.waitForPartitionMetadata(brokers, topic1, 1) TestUtils.waitForPartitionMetadata(brokers, topic1, 2) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic1).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic1), false).build) assertEquals(1, response.topicMetadata.size) val partitions = response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition) assertEquals(partitions.size, 3) @@ -141,8 +138,8 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testManualAssignmentOfReplicas(): Unit = { // Add 2 partitions - admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3, - asList(asList(0, 1), asList(2, 3))))).all().get() + admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3, + util.List.of(util.List.of[Integer](0, 1), util.List.of[Integer](2, 3))))).all().get() // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1) val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2) @@ -153,7 +150,7 @@ class AddPartitionsTest extends BaseRequestTest { val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2) assertEquals(leader2, partition2Metadata.leader()) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic2).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic2), false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head val partitionMetadata = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) @@ -168,7 +165,7 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testReplicaPlacementAllServers(): Unit = { - admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get() + admin.createPartitions(util.Map.of(topic3, NewPartitions.increaseTo(7))).all().get() // read metadata from a broker and verify the new topic partitions exist TestUtils.waitForPartitionMetadata(brokers, topic3, 1) @@ -179,7 +176,7 @@ class AddPartitionsTest extends BaseRequestTest { TestUtils.waitForPartitionMetadata(brokers, topic3, 6) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic3).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic3), false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head @@ -195,14 +192,14 @@ class AddPartitionsTest extends BaseRequestTest { @Test def testReplicaPlacementPartialServers(): Unit = { - admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get() + admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3))).all().get() // read metadata from a broker and verify the new topic partitions exist TestUtils.waitForPartitionMetadata(brokers, topic2, 1) TestUtils.waitForPartitionMetadata(brokers, topic2, 2) val response = connectAndReceive[MetadataResponse]( - new MetadataRequest.Builder(Seq(topic2).asJava, false).build) + new MetadataRequest.Builder(util.List.of(topic2), false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index fff1930a718..ae1e86421eb 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -38,7 +38,6 @@ import java.io.File import java.lang.{Long => JLong} import java.util.{Optional, Properties} import java.util.concurrent.atomic.AtomicInteger -import scala.jdk.CollectionConverters._ object AbstractPartitionTest { val brokerId = 101 @@ -121,7 +120,7 @@ class AbstractPartitionTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val controllerEpoch = 0 - val replicas = List[Integer](brokerId, remoteReplicaId).asJava + val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId) val isr = replicas if (isLeader) { diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala index c34a7ac7536..fe8f67eed5f 100644 --- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala @@ -21,97 +21,99 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import java.util + import scala.jdk.CollectionConverters._ object AssignmentStateTest { import AbstractPartitionTest._ - def parameters: java.util.stream.Stream[Arguments] = Seq[Arguments]( + def parameters: util.stream.Stream[Arguments] = util.List.of[Arguments]( Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List.empty[Integer], List.empty[Integer], Seq.empty[Int], Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(false)), Arguments.of( - List[Integer](brokerId, brokerId + 1), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List.empty[Integer], List.empty[Integer], Seq.empty[Int], Boolean.box(true)), + util.List.of[Integer](brokerId, brokerId + 1), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(true)), Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4), - List[Integer](brokerId + 1), - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4), + util.List.of[Integer](brokerId + 1), + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List.empty[Integer], - List[Integer](brokerId + 1), - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer], + util.List.of[Integer](brokerId + 1), + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 1, brokerId + 2), - List[Integer](brokerId + 1, brokerId + 2), - List[Integer](brokerId), - List.empty[Integer], - Seq(brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId), + util.List.of[Integer], + util.List.of(brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 2, brokerId + 3, brokerId + 4), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 2, brokerId + 3, brokerId + 4), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), + util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - List[Integer](brokerId + 2, brokerId + 3), - List[Integer](brokerId, brokerId + 1, brokerId + 2), - List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - List.empty[Integer], - Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true)) - ).asJava.stream() + util.List.of[Integer](brokerId + 2, brokerId + 3), + util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), + util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), + util.List.of[Integer], + util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true)) + ).stream() } class AssignmentStateTest extends AbstractPartitionTest { @ParameterizedTest @MethodSource(Array("parameters")) - def testPartitionAssignmentStatus(isr: List[Integer], replicas: List[Integer], - adding: List[Integer], removing: List[Integer], - original: Seq[Int], isUnderReplicated: Boolean): Unit = { + def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], + adding: util.List[Integer], removing: util.List[Integer], + original: util.List[Int], isUnderReplicated: Boolean): Unit = { val controllerEpoch = 3 val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(6) - .setIsr(isr.asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.asJava) + .setReplicas(replicas) .setIsNew(false) - if (adding.nonEmpty) - leaderState.setAddingReplicas(adding.asJava) - if (removing.nonEmpty) - leaderState.setRemovingReplicas(removing.asJava) + if (!adding.isEmpty) + leaderState.setAddingReplicas(adding) + if (!removing.isEmpty) + leaderState.setRemovingReplicas(removing) - val isReassigning = adding.nonEmpty || removing.nonEmpty + val isReassigning = !adding.isEmpty || !removing.isEmpty // set the original replicas as the URP calculation will need them - if (original.nonEmpty) - partition.assignmentState = SimpleAssignmentState(original) + if (!original.isEmpty) + partition.assignmentState = SimpleAssignmentState(original.asScala) // do the test partition.makeLeader(leaderState, offsetCheckpoints, None) assertEquals(isReassigning, partition.isReassigning) - if (adding.nonEmpty) - adding.foreach(r => assertTrue(partition.isAddingReplica(r))) + if (!adding.isEmpty) + adding.forEach(r => assertTrue(partition.isAddingReplica(r))) if (adding.contains(brokerId)) assertTrue(partition.isAddingLocalReplica) else diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 77b098cf682..81a72b27323 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -18,6 +18,7 @@ package kafka.cluster import java.lang.{Long => JLong} +import java.util import java.util.{Optional, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean @@ -139,9 +140,9 @@ class PartitionLockTest extends Logging { def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { val active = new AtomicBoolean(true) val replicaToCheck = 3 - val firstReplicaSet = Seq[Integer](3, 4, 5).asJava - val secondReplicaSet = Seq[Integer](1, 2, 3).asJava - def partitionState(replicas: java.util.List[Integer]) = new LeaderAndIsrRequest.PartitionState() + val firstReplicaSet = util.List.of[Integer](3, 4, 5) + val secondReplicaSet = util.List.of[Integer](1, 2, 3) + def partitionState(replicas: util.List[Integer]) = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(1) .setLeader(replicas.get(0)) .setLeaderEpoch(1) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 63287ac853a..91fff3408eb 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -67,6 +67,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.lang +import java.util import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOption @@ -102,7 +104,7 @@ object PartitionTest { /** * Verifies the callbacks that have been triggered since the last - * verification. Values different than `-1` are the ones that have + * verification. Values different from `-1` are the ones that have * been updated. */ def verify( @@ -186,7 +188,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 10 val logStartOffset = 0L val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true) - addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](remoteReplicaId)) def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = { new FetchResponseData.EpochEndOffset() @@ -308,8 +310,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val followerId = brokerId + 1 val leaderId = brokerId + 2 - val replicas = List[Integer](brokerId, followerId, leaderId).asJava - val isr = List[Integer](brokerId, followerId, leaderId).asJava + val replicas = util.List.of[Integer](brokerId, followerId, leaderId) + val isr = util.List.of[Integer](brokerId, followerId, leaderId) val leaderEpoch = 8 val partitionEpoch = 1 @@ -351,11 +353,11 @@ class PartitionTest extends AbstractPartitionTest { val validReplica = brokerId + 1 val addingReplica1 = brokerId + 2 val addingReplica2 = brokerId + 3 - val replicas = List(leader, validReplica) - val isr = List[Integer](leader, validReplica).asJava + val replicas = util.List.of[Integer](leader, validReplica) + val isr = util.List.of[Integer](leader, validReplica) val leaderEpoch = 8 val partitionEpoch = 1 - addBrokerEpochToMockMetadataCache(metadataCache, List(leader, addingReplica1, addingReplica2)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](leader, addingReplica1, addingReplica2)) assertTrue(partition.makeLeader(new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -363,7 +365,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(partitionEpoch) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, topicId )) @@ -389,9 +391,9 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(None, partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset)) // The replicas are added as part of a reassignment - val newReplicas = List(leader, validReplica, addingReplica1, addingReplica2) + val newReplicas = util.List.of[Integer](leader, validReplica, addingReplica1, addingReplica2) val newPartitionEpoch = partitionEpoch + 1 - val addingReplicas = List(addingReplica1, addingReplica2) + val addingReplicas = util.List.of[Integer](addingReplica1, addingReplica2) assertFalse(partition.makeLeader(new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -399,8 +401,8 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(newPartitionEpoch) - .setReplicas(newReplicas.map(Int.box).asJava) - .setAddingReplicas(addingReplicas.map(Int.box).asJava) + .setReplicas(newReplicas) + .setAddingReplicas(addingReplicas) .setIsNew(true), offsetCheckpoints, None )) @@ -485,16 +487,16 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(2) .setLeaderEpoch(prevLeaderEpoch) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) val appendThread = new Thread { override def run(): Unit = { val records = createRecords( - List( + util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes) ), @@ -511,9 +513,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(2) .setLeaderEpoch(prevLeaderEpoch + 1) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(2) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) @@ -665,7 +667,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) - addBrokerEpochToMockMetadataCache(metadataCache, List(remoteReplicaId)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](remoteReplicaId)) def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = { fetchFollower( partition, @@ -804,8 +806,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = List(leader, follower1, follower2) - val isr = List[Integer](leader, follower2).asJava + val replicas = util.List.of[Integer](leader, follower1, follower2) + val isr = util.List.of[Integer](leader, follower2) val leaderEpoch = 8 val batch1 = TestUtils.records(records = List( new SimpleRecord(10, "k1".getBytes, "v1".getBytes), @@ -821,7 +823,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") @@ -892,7 +894,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(4) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeFollower(followerState, offsetCheckpoints, None)) @@ -904,7 +906,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(5) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None)) @@ -984,9 +986,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(epoch) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) partition.makeFollower(partitionState, offsetCheckpoints, None) @@ -1002,7 +1004,7 @@ class PartitionTest extends AbstractPartitionTest { classOf[UnexpectedAppendOffsetException], // append one record with offset = 3 () => partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false, partitionLeaderEpoch = epoch ) @@ -1013,7 +1015,7 @@ class PartitionTest extends AbstractPartitionTest { // verify that we can append records that contain log start offset, even when first // offset < log start offset if the log is empty val newLogStartOffset = 4L - val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + val records = createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), baseOffset = newLogStartOffset) @@ -1023,7 +1025,7 @@ class PartitionTest extends AbstractPartitionTest { // and we can append more records after that partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false, partitionLeaderEpoch = epoch ) @@ -1031,7 +1033,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(newLogStartOffset, log.logStartOffset, s"Log start offset not expected to change:") // but we cannot append to offset < log start if the log is not empty - val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + val records2 = createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes)), baseOffset = 3L) assertThrows( @@ -1042,7 +1044,7 @@ class PartitionTest extends AbstractPartitionTest { // we still can append to next offset partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false, partitionLeaderEpoch = epoch ) @@ -1054,7 +1056,7 @@ class PartitionTest extends AbstractPartitionTest { def testListOffsetIsolationLevels(): Unit = { val controllerEpoch = 0 val leaderEpoch = 5 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1069,13 +1071,13 @@ class PartitionTest extends AbstractPartitionTest { .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) - val records = createTransactionalRecords(List( + val records = createTransactionalRecords(util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), baseOffset = 0L, producerId = 2L) - val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0, true) + val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0, supportsEpochBump = true) partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching, verificationGuard) def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = { @@ -1131,7 +1133,7 @@ class PartitionTest extends AbstractPartitionTest { assertThrows( classOf[NotLeaderOrFollowerException], () => partition.appendRecordsToFollowerOrFutureReplica( - createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), + createRecords(util.List.of(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false, partitionLeaderEpoch = 0 ) @@ -1145,9 +1147,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(1) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) partition.makeFollower(partitionState, offsetCheckpoints, None) @@ -1156,9 +1158,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(4) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) .setIsNew(false) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) @@ -1167,9 +1169,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(1) .setLeaderEpoch(4) - .setIsr(List[Integer](0, 1, 2, brokerId).asJava) + .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) .setPartitionEpoch(1) - .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) + .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) assertFalse(partition.makeFollower(partitionState, offsetCheckpoints, None)) } @@ -1179,8 +1181,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = Seq(leader, follower1, follower2) - val isr = List[Integer](leader, follower2).asJava + val replicas = util.List.of[Integer](leader, follower1, follower2) + val isr = util.List.of[Integer](leader, follower2) val leaderEpoch = 8 val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes))) @@ -1189,7 +1191,7 @@ class PartitionTest extends AbstractPartitionTest { new SimpleRecord("k5".getBytes, "v3".getBytes))) val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes), new SimpleRecord("k7".getBytes, "v2".getBytes))) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -1197,7 +1199,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") @@ -1224,7 +1226,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) partition.makeFollower(followerState, offsetCheckpoints, None) @@ -1234,7 +1236,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, topicId), "Expected makeLeader() to return 'leader changed' after makeFollower()") @@ -1259,38 +1261,38 @@ class PartitionTest extends AbstractPartitionTest { Set(leader, follower1, follower2), "AlterIsr") } - def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + def createRecords(records: lang.Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)) val builder = MemoryRecords.builder( buf, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds, partitionLeaderEpoch) - records.foreach(builder.append) + records.forEach(builder.append) builder.build() } - def createIdempotentRecords(records: Iterable[SimpleRecord], + def createIdempotentRecords(records: lang.Iterable[SimpleRecord], baseOffset: Long, baseSequence: Int = 0, producerId: Long = 1L): MemoryRecords = { val producerEpoch = 0.toShort val isTransactional = false - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)) val builder = MemoryRecords.builder(buf, Compression.NONE, baseOffset, producerId, producerEpoch, baseSequence, isTransactional) - records.foreach(builder.append) + records.forEach(builder.append) builder.build() } - def createTransactionalRecords(records: Iterable[SimpleRecord], + def createTransactionalRecords(records: lang.Iterable[SimpleRecord], baseOffset: Long, baseSequence: Int = 0, producerId: Long = 1L): MemoryRecords = { val producerEpoch = 0.toShort val isTransactional = true - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)) val builder = MemoryRecords.builder(buf, Compression.NONE, baseOffset, producerId, producerEpoch, baseSequence, isTransactional) - records.foreach(builder.append) + records.forEach(builder.append) builder.build() } @@ -1304,8 +1306,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = List[Integer](leader, follower1, follower2).asJava - val isr = List[Integer](leader).asJava + val replicas = util.List.of[Integer](leader, follower1, follower2) + val isr = util.List.of[Integer](leader) val leaderEpoch = 8 assertFalse(partition.isAtMinIsr) @@ -1341,8 +1343,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -1353,9 +1355,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(1) - .setIsr(List(brokerId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId)) .setPartitionEpoch(2) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setIsNew(false) partition.makeLeader(LeaderState, offsetCheckpoints, None) @@ -1370,9 +1372,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) val isr = replicas - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1382,9 +1384,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -1434,7 +1436,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1445,9 +1447,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(List[Integer](brokerId).asJava) + .setIsr(util.List.of[Integer](brokerId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -1470,9 +1472,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = List[Integer](brokerId).asJava - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1482,7 +1484,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1522,8 +1524,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val isr = List[Integer](brokerId).asJava + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) @@ -1534,7 +1536,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1556,7 +1558,7 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) assertEquals(alterPartitionManager.isrUpdates.size, 1) val isrItem = alterPartitionManager.isrUpdates.head - assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava) + assertEquals(isrItem.leaderAndIsr.isr, util.List.of[Integer](brokerId, remoteBrokerId)) isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => // the broker epochs should be equal to broker epoch of the leader assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) @@ -1586,9 +1588,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = List[Integer](brokerId).asJava - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) assertTrue(partition.makeLeader( @@ -1598,7 +1600,7 @@ class PartitionTest extends AbstractPartitionTest { .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1642,8 +1644,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val shrinkedIsr = Set(brokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val shrinkedIsr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1667,17 +1669,17 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(replicas.map(Int.box).asJava) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false), offsetCheckpoints, None ), "Expected become leader transition to succeed" ) - assertEquals(replicas.toSet, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) @@ -1709,9 +1711,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(shrinkedIsr.toList.map(Int.box).asJava) + .setIsr(shrinkedIsr) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false), offsetCheckpoints, None @@ -1720,11 +1722,11 @@ class PartitionTest extends AbstractPartitionTest { ) assertTrue(partition.isLeader) - assertEquals(shrinkedIsr, partition.partitionState.isr) - assertEquals(shrinkedIsr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.maximalIsr) assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs)) - // In the case of unfenced, the HWM doesn't increase, otherwise the the HWM increases because the + // In the case of unfenced, the HWM doesn't increase, otherwise the HWM increases because the // fenced and shutdown replica is not considered during HWM calculation. if (brokerState == "unfenced") { assertEquals(10, partition.localLogOrException.highWatermark) @@ -1741,8 +1743,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val isr = Set(brokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1771,13 +1773,13 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) markRemoteReplicaEligible(true) @@ -1793,16 +1795,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is fenced or offline. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1813,8 +1815,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1825,8 +1827,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -1834,8 +1836,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1) // ISR is committed. - assertEquals(replicas.toSet, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -1849,8 +1851,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = List(brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = Set(brokerId, remoteBrokerId2) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = util.List.of[Integer](brokerId, remoteBrokerId2) val metadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache]) @@ -1877,16 +1879,16 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. - addBrokerEpochToMockMetadataCache(metadataCache, List(brokerId, remoteBrokerId2)) + addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](brokerId, remoteBrokerId2)) // Create a race case where the replica epoch get bumped right after the previous fetch succeeded. val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) - 1 when(metadataCache.getAliveBrokerEpoch(remoteBrokerId1)).thenReturn(Optional.of(wrongReplicaEpoch), Optional.of(defaultBrokerEpoch(remoteBrokerId1))) @@ -1904,8 +1906,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is not triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertEquals(0, alterPartitionManager.isrUpdates.size) // Fetch again, this time with correct default broker epoch. @@ -1922,8 +1924,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => @@ -1943,8 +1945,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId1) - val isr = Set(brokerId, remoteBrokerId1) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1) + val isr = util.List.of[Integer](brokerId, remoteBrokerId1) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1967,13 +1969,13 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) val expectedReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) fetchFollower(partition, @@ -2008,8 +2010,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = List(brokerId, remoteBrokerId) - val isr = Set(brokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( @@ -2031,13 +2033,13 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset and // to check if an expansion is possible. @@ -2051,16 +2053,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is in controlled shutdown. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2071,8 +2073,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(isr, partition.partitionState.isr) - assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2083,8 +2085,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(isr, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -2092,8 +2094,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1) // ISR is committed. - assertEquals(replicas.toSet, partition.partitionState.isr) - assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -2106,8 +2108,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val topicId = Uuid.randomUuid() assertTrue(makeLeader( @@ -2127,7 +2129,7 @@ class PartitionTest extends AbstractPartitionTest { // Try to shrink the ISR partition.maybeShrinkIsr() assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId).map(Int.box).asJava) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.List.of[Integer](brokerId)) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) @@ -2160,12 +2162,12 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) val initializeTimeMs = time.milliseconds() val metadataCache = mock(classOf[KRaftMetadataCache]) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( topicPartition, @@ -2186,9 +2188,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -2213,7 +2215,7 @@ class PartitionTest extends AbstractPartitionTest { partition.maybeShrinkIsr() assertEquals(0, alterPartitionListener.shrinks.get) assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId, remoteBrokerId1).map(Int.box).asJava) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.List.of[Integer](brokerId, remoteBrokerId1)) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) @@ -2243,10 +2245,10 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = Seq(brokerId, remoteBrokerId1) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = util.List.of[Integer](brokerId, remoteBrokerId1) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( topicPartition, @@ -2267,9 +2269,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.toList.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") @@ -2299,8 +2301,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() assertTrue(makeLeader( @@ -2357,10 +2359,10 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( topicId = topicId, @@ -2417,10 +2419,10 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( topicId = topicId, @@ -2465,8 +2467,8 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId, remoteBrokerId) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() assertTrue(makeLeader( @@ -2552,9 +2554,9 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = Seq(brokerId, remoteBrokerId) - val isr = Seq(brokerId) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val isr = util.List.of[Integer](brokerId) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( topicId = topicId, @@ -2595,7 +2597,7 @@ class PartitionTest extends AbstractPartitionTest { private def createClientResponseWithAlterPartitionResponse( topicPartition: TopicPartition, partitionErrorCode: Short, - isr: List[Int] = List.empty, + isr: util.List[Integer] = util.List.of[Integer], leaderEpoch: Int = 0, partitionEpoch: Int = 0 ): ClientResponse = { @@ -2604,7 +2606,7 @@ class PartitionTest extends AbstractPartitionTest { topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData() .setPartitionIndex(topicPartition.partition) - .setIsr(isr.map(Integer.valueOf).asJava) + .setIsr(isr) .setLeaderEpoch(leaderEpoch) .setPartitionEpoch(partitionEpoch) .setErrorCode(partitionErrorCode)) @@ -2646,10 +2648,10 @@ class PartitionTest extends AbstractPartitionTest { val follower1 = brokerId + 1 val follower2 = brokerId + 2 val follower3 = brokerId + 3 - val replicas = Seq(brokerId, follower1, follower2, follower3) - val isr = Seq(brokerId, follower1, follower2) + val replicas = util.List.of[Integer](brokerId, follower1, follower2, follower3) + val isr = util.List.of[Integer](brokerId, follower1, follower2) val partitionEpoch = 1 - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -2659,7 +2661,7 @@ class PartitionTest extends AbstractPartitionTest { // Complete the ISR expansion val alterPartitionResponseWithoutError = - createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, List(brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1) + createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, util.List.of[Integer](brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1) when(mockChannelManager.sendRequest(any(), any())) .thenAnswer { invocation => @@ -2703,9 +2705,9 @@ class PartitionTest extends AbstractPartitionTest { val follower1 = brokerId + 1 val follower2 = brokerId + 2 val follower3 = brokerId + 3 - val replicas = Seq(brokerId, follower1, follower2, follower3) - val isr = Seq(brokerId, follower1, follower2) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, follower1, follower2, follower3) + val isr = util.List.of[Integer](brokerId, follower1, follower2) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -2743,7 +2745,7 @@ class PartitionTest extends AbstractPartitionTest { .thenReturn(Optional.of(long2Long(4L))) val controllerEpoch = 3 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -2761,7 +2763,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderEpoch = 5 val topicId = Uuid.randomUuid() - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -2805,7 +2807,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderEpoch = 5 val topicId = Uuid.randomUuid() - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -2880,8 +2882,8 @@ class PartitionTest extends AbstractPartitionTest { @Test def testUnderReplicatedPartitionsCorrectSemantics(): Unit = { val controllerEpoch = 3 - val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava - val isr = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2) + val isr = util.List.of[Integer](brokerId, brokerId + 1) var leaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3082,7 +3084,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderId = brokerId val followerId = brokerId + 1 - val replicas = List(leaderId, followerId) + val replicas = util.List.of[Integer](leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -3091,9 +3093,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) @@ -3124,9 +3126,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3147,7 +3149,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderId = brokerId val followerId = brokerId + 1 - val replicas = List(leaderId, followerId) + val replicas = util.List.of[Integer](leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() @@ -3155,9 +3157,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) @@ -3181,9 +3183,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3198,7 +3200,7 @@ class PartitionTest extends AbstractPartitionTest { def testIgnoreLeaderPartitionStateChangeWithOlderPartitionEpoch(): Unit = { val controllerEpoch = 3 val leaderId = brokerId - val replicas = List(leaderId) + val replicas = util.List.of[Integer](leaderId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() @@ -3206,9 +3208,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) @@ -3221,9 +3223,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(leaderId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId)) .setPartitionEpoch(0) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3236,7 +3238,7 @@ class PartitionTest extends AbstractPartitionTest { val controllerEpoch = 3 val leaderId = brokerId val followerId = brokerId + 1 - val replicas = List(leaderId, followerId) + val replicas = util.List.of[Integer](leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() @@ -3244,9 +3246,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(followerId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId, followerId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId, followerId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeFollower(initialFollowerState, offsetCheckpoints, Some(topicId))) @@ -3259,9 +3261,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(followerId) .setLeaderEpoch(leaderEpoch) - .setIsr(List(leaderId, followerId).map(Int.box).asJava) + .setIsr(util.List.of[Integer](leaderId, followerId)) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertFalse(partition.makeFollower(updatedFollowerState, offsetCheckpoints, Some(topicId))) @@ -3275,7 +3277,7 @@ class PartitionTest extends AbstractPartitionTest { val localReplica = brokerId val remoteReplica1 = brokerId + 1 val remoteReplica2 = brokerId + 2 - val replicas = List(localReplica, remoteReplica1, remoteReplica2) + val replicas = util.List.of[Integer](localReplica, remoteReplica1, remoteReplica2) val topicId = Uuid.randomUuid() // The local replica is the leader. @@ -3283,27 +3285,27 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(localReplica) .setLeaderEpoch(1) - .setIsr(replicas.map(Int.box).asJava) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(true) assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(1, partition.getLeaderEpoch) assertEquals(Some(localReplica), partition.leaderReplicaIdOpt) - assertEquals(replicas.toSet, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) assertEquals(Seq(remoteReplica1, remoteReplica2), partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas, partition.assignmentState.replicas) + assertEquals(replicas.asScala, partition.assignmentState.replicas) // The local replica becomes a follower. val updatedLeaderState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) .setLeader(remoteReplica1) .setLeaderEpoch(2) - .setIsr(replicas.map(Int.box).asJava) + .setIsr(replicas) .setPartitionEpoch(2) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(false) assertTrue(partition.makeFollower(updatedLeaderState, offsetCheckpoints, Some(topicId))) @@ -3312,23 +3314,23 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(Some(remoteReplica1), partition.leaderReplicaIdOpt) assertEquals(Set.empty, partition.partitionState.isr) assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas, partition.assignmentState.replicas) + assertEquals(replicas.asScala, partition.assignmentState.replicas) } @Test def testAddAndRemoveListeners(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - val replicas = Seq(brokerId, brokerId + 1) + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(isr.map(Int.box).asJava) - .setReplicas(replicas.map(Int.box).asJava) + .setIsr(isr) + .setReplicas(replicas) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3397,8 +3399,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3413,16 +3415,16 @@ class PartitionTest extends AbstractPartitionTest { def testPartitionListenerWhenLogOffsetsChanged(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - val replicas = Seq(brokerId, brokerId + 1) - val isr = Seq(brokerId, brokerId + 1) - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val isr = util.List.of[Integer](brokerId, brokerId + 1) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(isr.map(Int.box).asJava) - .setReplicas(replicas.map(Int.box).asJava) + .setIsr(isr) + .setReplicas(replicas) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3449,7 +3451,7 @@ class PartitionTest extends AbstractPartitionTest { listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset) - partition.truncateFullyAndStartAt(0L, false) + partition.truncateFullyAndStartAt(0L, isFuture = false) listener.verify(expectedHighWatermark = 0L) } @@ -3463,8 +3465,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3487,8 +3489,8 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(0) - .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) - .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) + .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) + .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3508,17 +3510,17 @@ class PartitionTest extends AbstractPartitionTest { partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = topicId) assertTrue(partition.log.isDefined) - val replicas = Seq(brokerId, brokerId + 1) + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val epoch = 0 - addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.makeLeader( new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(0) .setLeader(brokerId) .setLeaderEpoch(epoch) - .setIsr(isr.map(Int.box).asJava) - .setReplicas(replicas.map(Int.box).asJava) + .setIsr(isr) + .setReplicas(replicas) .setPartitionEpoch(1) .setIsNew(true), offsetCheckpoints, @@ -3578,7 +3580,7 @@ class PartitionTest extends AbstractPartitionTest { def testMaybeStartTransactionVerification(): Unit = { val controllerEpoch = 0 val leaderEpoch = 5 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val producerId = 22L @@ -3594,7 +3596,7 @@ class PartitionTest extends AbstractPartitionTest { .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) - val idempotentRecords = createIdempotentRecords(List( + val idempotentRecords = createIdempotentRecords(util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), @@ -3602,7 +3604,7 @@ class PartitionTest extends AbstractPartitionTest { producerId = producerId) partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) - def transactionRecords() = createTransactionalRecords(List( + def transactionRecords() = createTransactionalRecords(util.List.of( new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes), new SimpleRecord("k3".getBytes, "v3".getBytes)), @@ -3614,7 +3616,7 @@ class PartitionTest extends AbstractPartitionTest { assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)) // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard. - val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0, true) + val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0, supportsEpochBump = true) assertNotEquals(VerificationGuard.SENTINEL, verificationGuard) // With the wrong VerificationGuard, append should fail. @@ -3622,12 +3624,12 @@ class PartitionTest extends AbstractPartitionTest { origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, new VerificationGuard())) // We should return the same VerificationGuard when we still need to verify. Append should proceed. - val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0, true) + val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0, supportsEpochBump = true) assertEquals(verificationGuard, verificationGuard2) partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard) // We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed. - val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0, true) + val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0, supportsEpochBump = true) assertEquals(VerificationGuard.SENTINEL, verificationGuard3) partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) } @@ -3636,8 +3638,8 @@ class PartitionTest extends AbstractPartitionTest { topicId: Option[Uuid], controllerEpoch: Int, leaderEpoch: Int, - isr: Seq[Int], - replicas: Seq[Int], + isr: util.List[Integer], + replicas: util.List[Integer], partitionEpoch: Int, isNew: Boolean, partition: Partition = partition @@ -3653,9 +3655,9 @@ class PartitionTest extends AbstractPartitionTest { .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) - .setIsr(isr.map(Int.box).asJava) + .setIsr(isr) .setPartitionEpoch(partitionEpoch) - .setReplicas(replicas.map(Int.box).asJava) + .setReplicas(replicas) .setIsNew(isNew), offsetCheckpoints, topicId @@ -3664,8 +3666,8 @@ class PartitionTest extends AbstractPartitionTest { assertFalse(partition.partitionState.isInflight) assertEquals(topicId, partition.topicId) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(isr.toSet, partition.partitionState.isr) - assertEquals(isr.toSet, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) assertEquals(partitionEpoch, partition.getPartitionEpoch) newLeader } @@ -3804,8 +3806,8 @@ class PartitionTest extends AbstractPartitionTest { ) } - private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: List[Int]): Unit = { - brokers.foreach { broker => + private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: util.List[Integer]): Unit = { + brokers.forEach { broker => when(metadataCache.getAliveBrokerEpoch(broker)).thenReturn(Optional.of(defaultBrokerEpoch(broker))) } } @@ -3832,7 +3834,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3877,7 +3879,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3922,7 +3924,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -3967,7 +3969,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -4012,7 +4014,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) @@ -4058,7 +4060,7 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) val controllerEpoch = 0 val leaderEpoch = 1 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = util.List.of[Integer](brokerId, brokerId + 1) val isr = replicas val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() .setControllerEpoch(controllerEpoch) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index 262bc3f85b6..c55d2c0da3d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -34,9 +34,9 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} import java.nio.charset.Charset +import java.util import java.util.Optional import scala.collection.Map -import scala.jdk.CollectionConverters._ class CoordinatorPartitionWriterTest { @Test @@ -75,8 +75,8 @@ class CoordinatorPartitionWriterTest { replicaManager ) - when(replicaManager.getLogConfig(tp)).thenReturn(Some(new LogConfig(Map.empty.asJava))) - assertEquals(new LogConfig(Map.empty.asJava), partitionRecordWriter.config(tp)) + when(replicaManager.getLogConfig(tp)).thenReturn(Some(new LogConfig(util.Map.of))) + assertEquals(new LogConfig(util.Map.of), partitionRecordWriter.config(tp)) when(replicaManager.getLogConfig(tp)).thenReturn(None) assertThrows(classOf[NotLeaderOrFollowerException], () => partitionRecordWriter.config(tp)) @@ -119,7 +119,7 @@ class CoordinatorPartitionWriterTest { 10L ), Option.empty, - false + hasCustomErrorMessage = false ))) val batch = MemoryRecords.withRecords( @@ -215,7 +215,7 @@ class CoordinatorPartitionWriterTest { )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(Errors.NOT_LEADER_OR_FOLLOWER.exception), - false + hasCustomErrorMessage = false ))) val batch = MemoryRecords.withRecords( diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 5db59dd51fe..030295975a4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -17,7 +17,7 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer -import java.util.Collections +import java.util import java.util.Optional import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -87,7 +87,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), - Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), + util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), 0) } @@ -476,7 +476,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren private def prepareTxnLog(partitionId: Int): Unit = { val logMock: UnifiedLog = mock(classOf[UnifiedLog]) - when(logMock.config).thenReturn(new LogConfig(Collections.emptyMap())) + when(logMock.config).thenReturn(new LogConfig(util.Map.of)) val fileRecordsMock: FileRecords = mock(classOf[FileRecords]) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala index 91039d9abc0..dd378757130 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala @@ -16,7 +16,6 @@ */ package kafka.coordinator.transaction - import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} @@ -30,7 +29,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, import org.junit.jupiter.api.Test import java.nio.ByteBuffer -import java.util.Collections +import java.util import scala.collection.Seq import scala.jdk.CollectionConverters._ @@ -115,14 +114,14 @@ class TransactionLogTest { @Test def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { - val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, Collections.emptySet(), 500, 500, TV_0) + val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, util.Set.of, 500, 500, TV_0) val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_0)) assertEquals(0, txnLogValueBuffer.getShort) } @Test def testSerializeTransactionLogValueToFlexibleVersion(): Unit = { - val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, Collections.emptySet(), 500, 500, TV_2) + val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, util.Set.of, 500, 500, TV_2) val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_2)) assertEquals(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, txnLogValueBuffer.getShort) } @@ -131,7 +130,7 @@ class TransactionLogTest { def testDeserializeHighestSupportedTransactionLogValue(): Unit = { val txnPartitions = new TransactionLogValue.PartitionsSchema() .setTopic("topic") - .setPartitionIds(java.util.Collections.singletonList(0)) + .setPartitionIds(util.List.of(0)) val txnLogValue = new TransactionLogValue() .setProducerId(100) @@ -140,7 +139,7 @@ class TransactionLogTest { .setTransactionStartTimestampMs(750L) .setTransactionLastUpdateTimestampMs(1000L) .setTransactionTimeoutMs(500) - .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) + .setTransactionPartitions(util.List.of(txnPartitions)) val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get @@ -174,7 +173,7 @@ class TransactionLogTest { val txnPartitions = new Struct(futurePartitionsSchema) txnPartitions.set("topic", "topic") txnPartitions.set("partition_ids", Array(Integer.valueOf(1))) - val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]() + val txnPartitionsTaggedFields = new util.TreeMap[Integer, Any]() txnPartitionsTaggedFields.put(100, "foo") txnPartitionsTaggedFields.put(101, 4000) txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields) @@ -204,7 +203,7 @@ class TransactionLogTest { transactionLogValue.set("transaction_partitions", Array(txnPartitions)) transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L) transactionLogValue.set("transaction_start_timestamp_ms", 3000L) - val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]() + val txnLogValueTaggedFields = new util.TreeMap[Integer, Any]() txnLogValueTaggedFields.put(100, "foo") txnLogValueTaggedFields.put(101, 4000) transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 4131f564d63..ec12afe6489 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -17,8 +17,6 @@ package kafka.coordinator.transaction import java.util -import java.util.Arrays.asList -import java.util.Collections import java.util.Optional import java.util.concurrent.{Callable, Executors, Future} import kafka.server.KafkaConfig @@ -156,7 +154,7 @@ class TransactionMarkerChannelManagerTest { val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) val response = new WriteTxnMarkersResponse( - Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) + util.Map.of(producerId2: java.lang.Long, util.Map.of(partition1, Errors.NONE))) val clientResponse = new ClientResponse(header, null, null, time.milliseconds(), time.milliseconds(), false, null, null, response) @@ -205,7 +203,7 @@ class TransactionMarkerChannelManagerTest { // Build a successful client response. val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) val successfulResponse = new WriteTxnMarkersResponse( - Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) + util.Map.of(producerId2: java.lang.Long, util.Map.of(partition1, Errors.NONE))) val successfulClientResponse = new ClientResponse(header, null, null, time.milliseconds(), time.milliseconds(), false, null, null, successfulResponse) @@ -302,10 +300,10 @@ class TransactionMarkerChannelManagerTest { assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2)) val expectedBroker1Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)), - new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)), + new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)))).build() val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build() val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) @@ -372,10 +370,10 @@ class TransactionMarkerChannelManagerTest { assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2)) val expectedBroker1Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)), - new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)), + new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)))).build() val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( - asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build() + util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build() val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index b34b0725020..56dc0ec266c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -17,7 +17,6 @@ package kafka.coordinator.transaction import java.{lang, util} -import java.util.Arrays.asList import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -46,10 +45,10 @@ class TransactionMarkerRequestCompletionHandlerTest { private val topicPartition = new TopicPartition("topic1", 0) private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, mutable.Set[TopicPartition](topicPartition), 0L, 0L, TransactionVersion.TV_2) - private val pendingCompleteTxnAndMarkers = asList( + private val pendingCompleteTxnAndMarkers = util.List.of( PendingCompleteTxnAndMarkerEntry( PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, txnMetadata.prepareComplete(42)), - new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, asList(topicPartition)))) + new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, util.List.of(topicPartition)))) private val markerChannelManager: TransactionMarkerChannelManager = mock(classOf[TransactionMarkerChannelManager]) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala index f3224593514..453bc13f0bd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala @@ -127,7 +127,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) @@ -152,7 +152,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) @@ -177,7 +177,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) @@ -295,7 +295,7 @@ class TransactionMetadataTest { clientTransactionVersion = TV_0) // let new time be smaller - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(TransactionState.PREPARE_COMMIT, txnMetadata.state) assertEquals(producerId, txnMetadata.producerId) @@ -323,7 +323,7 @@ class TransactionMetadataTest { clientTransactionVersion = TV_0) // let new time be smaller - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(TransactionState.PREPARE_ABORT, txnMetadata.state) assertEquals(producerId, txnMetadata.producerId) @@ -425,7 +425,7 @@ class TransactionMetadataTest { // We should reset the pending state to make way for the abort transition. txnMetadata.pendingState = None - val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds(), false) + val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds(), noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, transitMetadata.producerId) } @@ -537,7 +537,7 @@ class TransactionMetadataTest { txnLastUpdateTimestamp = time.milliseconds(), clientTransactionVersion = TV_2) - var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) + var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch) @@ -571,7 +571,7 @@ class TransactionMetadataTest { assertTrue(txnMetadata.isProducerEpochExhausted) val newProducerId = 9893L - var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, newProducerId, time.milliseconds() - 1, false) + var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, newProducerId, time.milliseconds() - 1, noPartitionAdded = false) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(producerId, txnMetadata.producerId) assertEquals(Short.MaxValue, txnMetadata.producerEpoch) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 212a915c800..16b3224495a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -18,6 +18,7 @@ package kafka.coordinator.transaction import java.lang.management.ManagementFactory import java.nio.ByteBuffer +import java.util import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import javax.management.ObjectName import kafka.server.ReplicaManager @@ -49,7 +50,6 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort} import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when} -import java.util.Collections import scala.collection.{Map, mutable} import scala.jdk.CollectionConverters._ @@ -72,7 +72,7 @@ class TransactionStateManagerTest { when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), - Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), + util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), 0) } @@ -1354,7 +1354,7 @@ class TransactionStateManagerTest { when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), - Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), + util.Map.of(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), 0) } val transactionManager = new TransactionStateManager(0, scheduler, diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index c45464333c0..7f37eeb25a1 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -34,7 +34,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import java.io.File import java.time.Duration import java.util -import java.util.{Collections, Properties} +import java.util.Properties import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ import scala.util.Using @@ -364,7 +364,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = { Using.resource(createAdminClient(brokers, listenerName)) { admin => { - admin.alterClientQuotas(Collections.singleton( + admin.alterClientQuotas(util.Set.of( new ClientQuotaAlteration( new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "") null else sanitizedClientId)).asJava), configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get() diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index e7a8e10d80e..ecb8c8f011b 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -17,6 +17,7 @@ package kafka.integration +import java.util import java.util.Properties import java.util.concurrent.ExecutionException import scala.util.Random @@ -317,7 +318,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { valueDeserializer = new StringDeserializer) try { val tp = new TopicPartition(topic, partitionId) - consumer.assign(Seq(tp).asJava) + consumer.assign(util.List.of(tp)) consumer.seek(tp, 0) TestUtils.consumeRecords(consumer, numMessages).map(_.value) } finally consumer.close() @@ -410,10 +411,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { } private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = { - val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava - adminClient.incrementalAlterConfigs(Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> - configEntries.asScala.map((e: ConfigEntry) => new AlterConfigOp(e, AlterConfigOp.OpType.SET)).toSeq - .asJavaCollection).asJava) + val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic) + + val configEntries = topicConfigs.entrySet().stream() + .map(e => new ConfigEntry(e.getKey.toString, e.getValue.toString)) + .map(e => new AlterConfigOp(e, AlterConfigOp.OpType.SET)) + .toList + + adminClient.incrementalAlterConfigs(util.Map.of(configResource, configEntries)) } private def createAdminClient(): Admin = { @@ -427,7 +432,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache: MetadataCache, leaderId: Int): Unit = { waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic, partitionId).isPresent() && metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() == LeaderConstants.NO_LEADER && - java.util.Arrays.asList(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), + util.List.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), "Timed out waiting for broker metadata cache updates the info for topic partition:" + topicPartition) } } diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala index 75946f14075..b0dbd0a05c2 100644 --- a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala @@ -22,15 +22,12 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} import org.junit.jupiter.api.Test -import java.util.Collections -import scala.jdk.CollectionConverters._ - class KafkaMetricsGroupTest { @Test def testUntaggedMetricName(): Unit = { val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") - val metricName = metricsGroup.metricName("TaggedMetric", Collections.emptyMap()) + val metricName = metricsGroup.metricName("TaggedMetric", java.util.Map.of) assertEquals("kafka.metrics", metricName.getGroup) assertEquals("TestMetrics", metricName.getType) @@ -42,7 +39,13 @@ class KafkaMetricsGroupTest { @Test def testTaggedMetricName(): Unit = { - val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava + val tags = { + val map = new java.util.LinkedHashMap[String, String]() + map.put("foo", "bar") + map.put("bar", "baz") + map.put("baz", "raz.taz") + map + } val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricName = metricsGroup.metricName("TaggedMetric", tags) @@ -56,7 +59,13 @@ class KafkaMetricsGroupTest { @Test def testTaggedMetricNameWithEmptyValue(): Unit = { - val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava + val tags = { + val map = new java.util.LinkedHashMap[String, String]() + map.put("foo", "bar") + map.put("bar", "") + map.put("baz", "raz.taz") + map + } val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricName = metricsGroup.metricName("TaggedMetric", tags) @@ -67,6 +76,4 @@ class KafkaMetricsGroupTest { metricName.getMBeanName) assertEquals("baz.raz_taz.foo.bar", metricName.getScope) } - - } diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index d59f44e3e3c..834b8efe48d 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -18,6 +18,7 @@ package kafka.metrics import java.lang.management.ManagementFactory +import java.util import java.util.Properties import javax.management.ObjectName import com.yammer.metrics.core.MetricPredicate @@ -124,7 +125,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { def testUpdateJMXFilter(): Unit = { // verify previously exposed metrics are removed and existing matching metrics are added brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure( - Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava + util.Map.of(JmxReporter.EXCLUDE_CONFIG, "kafka.controller:type=KafkaController,name=ActiveControllerCount") )) assertFalse(ManagementFactory.getPlatformMBeanServer .isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount"))) @@ -150,9 +151,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @Test def testWindowsStyleTagNames(): Unit = { val path = "C:\\windows-path\\kafka-logs" - val tags = Map("dir" -> path) - val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=") - val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags.asJava) + val tags = util.Map.of("dir", path) + val expectedMBeanName = Set(tags.keySet().iterator().next(), ObjectName.quote(path)).mkString("=") + val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags) assert(metric.getMBeanName.endsWith(expectedMBeanName)) }