MINOR: Cleanup Core Module- Scala Modules (3/n) (#19804)
CI / build (push) Waiting to run Details

Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only some Scala files
in the Core module's tests. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()

To be clear, the directories being targeted in this PR from unit.kafka
module:
- admin
- cluster
- coordinator
- docker
- integration
- metrics

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-06-21 21:50:39 +05:30 committed by GitHub
parent 7c77519f59
commit 992eaafb62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 402 additions and 392 deletions

View File

@ -17,7 +17,6 @@
package kafka.admin package kafka.admin
import java.util.Collections
import kafka.server.{BaseRequestTest, BrokerServer} import kafka.server.{BaseRequestTest, BrokerServer}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
@ -28,8 +27,6 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import java.util import java.util
import java.util.Arrays.asList
import java.util.Collections.singletonList
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -66,8 +63,8 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testWrongReplicaCount(): Unit = { def testWrongReplicaCount(): Unit = {
assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => { assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => {
admin.createPartitions(Collections.singletonMap(topic1, admin.createPartitions(util.Map.of(topic1,
NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get() NewPartitions.increaseTo(2, util.List.of(util.List.of[Integer](0, 1, 2))))).all().get()
}).getCause.getClass) }).getCause.getClass)
} }
@ -78,12 +75,12 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testMissingPartitionsInCreateTopics(): Unit = { def testMissingPartitionsInCreateTopics(): Unit = {
val topic6Placements = new util.HashMap[Integer, util.List[Integer]] val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
topic6Placements.put(1, asList(0, 1)) topic6Placements.put(1, util.List.of(0, 1))
topic6Placements.put(2, asList(1, 0)) topic6Placements.put(2, util.List.of(1, 0))
val topic7Placements = new util.HashMap[Integer, util.List[Integer]] val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
topic7Placements.put(2, asList(0, 1)) topic7Placements.put(2, util.List.of(0, 1))
topic7Placements.put(3, asList(1, 0)) topic7Placements.put(3, util.List.of(1, 0))
val futures = admin.createTopics(asList( val futures = admin.createTopics(util.List.of(
new NewTopic("new-topic6", topic6Placements), new NewTopic("new-topic6", topic6Placements),
new NewTopic("new-topic7", topic7Placements))).values() new NewTopic("new-topic7", topic7Placements))).values()
val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause
@ -103,8 +100,8 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testMissingPartitionsInCreatePartitions(): Unit = { def testMissingPartitionsInCreatePartitions(): Unit = {
val cause = assertThrows(classOf[ExecutionException], () => val cause = assertThrows(classOf[ExecutionException], () =>
admin.createPartitions(Collections.singletonMap(topic1, admin.createPartitions(util.Map.of(topic1,
NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause NewPartitions.increaseTo(3, util.List.of(util.List.of[Integer](0, 1, 2))))).all().get()).getCause
assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass) assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " + assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " +
"were specified."), "Unexpected error message: " + cause.getMessage) "were specified."), "Unexpected error message: " + cause.getMessage)
@ -112,7 +109,7 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testIncrementPartitions(): Unit = { def testIncrementPartitions(): Unit = {
admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get() admin.createPartitions(util.Map.of(topic1, NewPartitions.increaseTo(3))).all().get()
// wait until leader is elected // wait until leader is elected
waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1) waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
@ -122,7 +119,7 @@ class AddPartitionsTest extends BaseRequestTest {
TestUtils.waitForPartitionMetadata(brokers, topic1, 1) TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
TestUtils.waitForPartitionMetadata(brokers, topic1, 2) TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
val response = connectAndReceive[MetadataResponse]( val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic1).asJava, false).build) new MetadataRequest.Builder(util.List.of(topic1), false).build)
assertEquals(1, response.topicMetadata.size) assertEquals(1, response.topicMetadata.size)
val partitions = response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition) val partitions = response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition)
assertEquals(partitions.size, 3) assertEquals(partitions.size, 3)
@ -141,8 +138,8 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testManualAssignmentOfReplicas(): Unit = { def testManualAssignmentOfReplicas(): Unit = {
// Add 2 partitions // Add 2 partitions
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3, admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3,
asList(asList(0, 1), asList(2, 3))))).all().get() util.List.of(util.List.of[Integer](0, 1), util.List.of[Integer](2, 3))))).all().get()
// wait until leader is elected // wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1) val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2) val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
@ -153,7 +150,7 @@ class AddPartitionsTest extends BaseRequestTest {
val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2) val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
assertEquals(leader2, partition2Metadata.leader()) assertEquals(leader2, partition2Metadata.leader())
val response = connectAndReceive[MetadataResponse]( val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic2).asJava, false).build) new MetadataRequest.Builder(util.List.of(topic2), false).build)
assertEquals(1, response.topicMetadata.size) assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head val topicMetadata = response.topicMetadata.asScala.head
val partitionMetadata = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) val partitionMetadata = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
@ -168,7 +165,7 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testReplicaPlacementAllServers(): Unit = { def testReplicaPlacementAllServers(): Unit = {
admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get() admin.createPartitions(util.Map.of(topic3, NewPartitions.increaseTo(7))).all().get()
// read metadata from a broker and verify the new topic partitions exist // read metadata from a broker and verify the new topic partitions exist
TestUtils.waitForPartitionMetadata(brokers, topic3, 1) TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
@ -179,7 +176,7 @@ class AddPartitionsTest extends BaseRequestTest {
TestUtils.waitForPartitionMetadata(brokers, topic3, 6) TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
val response = connectAndReceive[MetadataResponse]( val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic3).asJava, false).build) new MetadataRequest.Builder(util.List.of(topic3), false).build)
assertEquals(1, response.topicMetadata.size) assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head val topicMetadata = response.topicMetadata.asScala.head
@ -195,14 +192,14 @@ class AddPartitionsTest extends BaseRequestTest {
@Test @Test
def testReplicaPlacementPartialServers(): Unit = { def testReplicaPlacementPartialServers(): Unit = {
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get() admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3))).all().get()
// read metadata from a broker and verify the new topic partitions exist // read metadata from a broker and verify the new topic partitions exist
TestUtils.waitForPartitionMetadata(brokers, topic2, 1) TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
TestUtils.waitForPartitionMetadata(brokers, topic2, 2) TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
val response = connectAndReceive[MetadataResponse]( val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic2).asJava, false).build) new MetadataRequest.Builder(util.List.of(topic2), false).build)
assertEquals(1, response.topicMetadata.size) assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head val topicMetadata = response.topicMetadata.asScala.head

View File

@ -38,7 +38,6 @@ import java.io.File
import java.lang.{Long => JLong} import java.lang.{Long => JLong}
import java.util.{Optional, Properties} import java.util.{Optional, Properties}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
object AbstractPartitionTest { object AbstractPartitionTest {
val brokerId = 101 val brokerId = 101
@ -121,7 +120,7 @@ class AbstractPartitionTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val controllerEpoch = 0 val controllerEpoch = 0
val replicas = List[Integer](brokerId, remoteReplicaId).asJava val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId)
val isr = replicas val isr = replicas
if (isLeader) { if (isLeader) {

View File

@ -21,97 +21,99 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.util
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
object AssignmentStateTest { object AssignmentStateTest {
import AbstractPartitionTest._ import AbstractPartitionTest._
def parameters: java.util.stream.Stream[Arguments] = Seq[Arguments]( def parameters: util.stream.Stream[Arguments] = util.List.of[Arguments](
Arguments.of( Arguments.of(
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List.empty[Integer], List.empty[Integer], Seq.empty[Int], Boolean.box(false)), util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId, brokerId + 1), util.List.of[Integer](brokerId, brokerId + 1),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List.empty[Integer], List.empty[Integer], Seq.empty[Int], Boolean.box(true)), util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(true)),
Arguments.of( Arguments.of(
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId + 3, brokerId + 4), util.List.of[Integer](brokerId + 3, brokerId + 4),
List[Integer](brokerId + 1), util.List.of[Integer](brokerId + 1),
Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId + 3, brokerId + 4), util.List.of[Integer](brokerId + 3, brokerId + 4),
List.empty[Integer], util.List.of[Integer],
Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List.empty[Integer], util.List.of[Integer],
List[Integer](brokerId + 1), util.List.of[Integer](brokerId + 1),
Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId + 1, brokerId + 2),
List[Integer](brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId + 1, brokerId + 2),
List[Integer](brokerId), util.List.of[Integer](brokerId),
List.empty[Integer], util.List.of[Integer],
Seq(brokerId + 1, brokerId + 2), Boolean.box(false)), util.List.of(brokerId + 1, brokerId + 2), Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId + 2, brokerId + 3, brokerId + 4), util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5),
List.empty[Integer], util.List.of[Integer],
Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId + 2, brokerId + 3, brokerId + 4), util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5),
List.empty[Integer], util.List.of[Integer],
Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
Arguments.of( Arguments.of(
List[Integer](brokerId + 2, brokerId + 3), util.List.of[Integer](brokerId + 2, brokerId + 3),
List[Integer](brokerId, brokerId + 1, brokerId + 2), util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
List[Integer](brokerId + 3, brokerId + 4, brokerId + 5), util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5),
List.empty[Integer], util.List.of[Integer],
Seq(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true)) util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true))
).asJava.stream() ).stream()
} }
class AssignmentStateTest extends AbstractPartitionTest { class AssignmentStateTest extends AbstractPartitionTest {
@ParameterizedTest @ParameterizedTest
@MethodSource(Array("parameters")) @MethodSource(Array("parameters"))
def testPartitionAssignmentStatus(isr: List[Integer], replicas: List[Integer], def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer],
adding: List[Integer], removing: List[Integer], adding: util.List[Integer], removing: util.List[Integer],
original: Seq[Int], isUnderReplicated: Boolean): Unit = { original: util.List[Int], isUnderReplicated: Boolean): Unit = {
val controllerEpoch = 3 val controllerEpoch = 3
val leaderState = new LeaderAndIsrRequest.PartitionState() val leaderState = new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(6) .setLeaderEpoch(6)
.setIsr(isr.asJava) .setIsr(isr)
.setPartitionEpoch(1) .setPartitionEpoch(1)
.setReplicas(replicas.asJava) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
if (adding.nonEmpty) if (!adding.isEmpty)
leaderState.setAddingReplicas(adding.asJava) leaderState.setAddingReplicas(adding)
if (removing.nonEmpty) if (!removing.isEmpty)
leaderState.setRemovingReplicas(removing.asJava) leaderState.setRemovingReplicas(removing)
val isReassigning = adding.nonEmpty || removing.nonEmpty val isReassigning = !adding.isEmpty || !removing.isEmpty
// set the original replicas as the URP calculation will need them // set the original replicas as the URP calculation will need them
if (original.nonEmpty) if (!original.isEmpty)
partition.assignmentState = SimpleAssignmentState(original) partition.assignmentState = SimpleAssignmentState(original.asScala)
// do the test // do the test
partition.makeLeader(leaderState, offsetCheckpoints, None) partition.makeLeader(leaderState, offsetCheckpoints, None)
assertEquals(isReassigning, partition.isReassigning) assertEquals(isReassigning, partition.isReassigning)
if (adding.nonEmpty) if (!adding.isEmpty)
adding.foreach(r => assertTrue(partition.isAddingReplica(r))) adding.forEach(r => assertTrue(partition.isAddingReplica(r)))
if (adding.contains(brokerId)) if (adding.contains(brokerId))
assertTrue(partition.isAddingLocalReplica) assertTrue(partition.isAddingLocalReplica)
else else

View File

@ -18,6 +18,7 @@
package kafka.cluster package kafka.cluster
import java.lang.{Long => JLong} import java.lang.{Long => JLong}
import java.util
import java.util.{Optional, Properties} import java.util.{Optional, Properties}
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
@ -139,9 +140,9 @@ class PartitionLockTest extends Logging {
def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
val active = new AtomicBoolean(true) val active = new AtomicBoolean(true)
val replicaToCheck = 3 val replicaToCheck = 3
val firstReplicaSet = Seq[Integer](3, 4, 5).asJava val firstReplicaSet = util.List.of[Integer](3, 4, 5)
val secondReplicaSet = Seq[Integer](1, 2, 3).asJava val secondReplicaSet = util.List.of[Integer](1, 2, 3)
def partitionState(replicas: java.util.List[Integer]) = new LeaderAndIsrRequest.PartitionState() def partitionState(replicas: util.List[Integer]) = new LeaderAndIsrRequest.PartitionState()
.setControllerEpoch(1) .setControllerEpoch(1)
.setLeader(replicas.get(0)) .setLeader(replicas.get(0))
.setLeaderEpoch(1) .setLeaderEpoch(1)

File diff suppressed because it is too large Load Diff

View File

@ -34,9 +34,9 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when} import org.mockito.Mockito.{mock, verify, when}
import java.nio.charset.Charset import java.nio.charset.Charset
import java.util
import java.util.Optional import java.util.Optional
import scala.collection.Map import scala.collection.Map
import scala.jdk.CollectionConverters._
class CoordinatorPartitionWriterTest { class CoordinatorPartitionWriterTest {
@Test @Test
@ -75,8 +75,8 @@ class CoordinatorPartitionWriterTest {
replicaManager replicaManager
) )
when(replicaManager.getLogConfig(tp)).thenReturn(Some(new LogConfig(Map.empty.asJava))) when(replicaManager.getLogConfig(tp)).thenReturn(Some(new LogConfig(util.Map.of)))
assertEquals(new LogConfig(Map.empty.asJava), partitionRecordWriter.config(tp)) assertEquals(new LogConfig(util.Map.of), partitionRecordWriter.config(tp))
when(replicaManager.getLogConfig(tp)).thenReturn(None) when(replicaManager.getLogConfig(tp)).thenReturn(None)
assertThrows(classOf[NotLeaderOrFollowerException], () => partitionRecordWriter.config(tp)) assertThrows(classOf[NotLeaderOrFollowerException], () => partitionRecordWriter.config(tp))
@ -119,7 +119,7 @@ class CoordinatorPartitionWriterTest {
10L 10L
), ),
Option.empty, Option.empty,
false hasCustomErrorMessage = false
))) )))
val batch = MemoryRecords.withRecords( val batch = MemoryRecords.withRecords(
@ -215,7 +215,7 @@ class CoordinatorPartitionWriterTest {
)).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult( )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(Errors.NOT_LEADER_OR_FOLLOWER.exception), Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
false hasCustomErrorMessage = false
))) )))
val batch = MemoryRecords.withRecords( val batch = MemoryRecords.withRecords(

View File

@ -17,7 +17,7 @@
package kafka.coordinator.transaction package kafka.coordinator.transaction
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Collections import java.util
import java.util.Optional import java.util.Optional
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
@ -87,7 +87,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
when(metadataCache.features()).thenReturn { when(metadataCache.features()).thenReturn {
new FinalizedFeatures( new FinalizedFeatures(
MetadataVersion.latestTesting(), MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0) 0)
} }
@ -476,7 +476,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
private def prepareTxnLog(partitionId: Int): Unit = { private def prepareTxnLog(partitionId: Int): Unit = {
val logMock: UnifiedLog = mock(classOf[UnifiedLog]) val logMock: UnifiedLog = mock(classOf[UnifiedLog])
when(logMock.config).thenReturn(new LogConfig(Collections.emptyMap())) when(logMock.config).thenReturn(new LogConfig(util.Map.of))
val fileRecordsMock: FileRecords = mock(classOf[FileRecords]) val fileRecordsMock: FileRecords = mock(classOf[FileRecords])

View File

@ -16,7 +16,6 @@
*/ */
package kafka.coordinator.transaction package kafka.coordinator.transaction
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
@ -30,7 +29,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue,
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Collections import java.util
import scala.collection.Seq import scala.collection.Seq
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -115,14 +114,14 @@ class TransactionLogTest {
@Test @Test
def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, Collections.emptySet(), 500, 500, TV_0) val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, util.Set.of, 500, 500, TV_0)
val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_0)) val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_0))
assertEquals(0, txnLogValueBuffer.getShort) assertEquals(0, txnLogValueBuffer.getShort)
} }
@Test @Test
def testSerializeTransactionLogValueToFlexibleVersion(): Unit = { def testSerializeTransactionLogValueToFlexibleVersion(): Unit = {
val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, Collections.emptySet(), 500, 500, TV_2) val txnTransitMetadata = new TxnTransitMetadata(1, 1, 1, 1, 1, 1000, TransactionState.COMPLETE_COMMIT, util.Set.of, 500, 500, TV_2)
val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_2)) val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, TV_2))
assertEquals(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, txnLogValueBuffer.getShort) assertEquals(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, txnLogValueBuffer.getShort)
} }
@ -131,7 +130,7 @@ class TransactionLogTest {
def testDeserializeHighestSupportedTransactionLogValue(): Unit = { def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
val txnPartitions = new TransactionLogValue.PartitionsSchema() val txnPartitions = new TransactionLogValue.PartitionsSchema()
.setTopic("topic") .setTopic("topic")
.setPartitionIds(java.util.Collections.singletonList(0)) .setPartitionIds(util.List.of(0))
val txnLogValue = new TransactionLogValue() val txnLogValue = new TransactionLogValue()
.setProducerId(100) .setProducerId(100)
@ -140,7 +139,7 @@ class TransactionLogTest {
.setTransactionStartTimestampMs(750L) .setTransactionStartTimestampMs(750L)
.setTransactionLastUpdateTimestampMs(1000L) .setTransactionLastUpdateTimestampMs(1000L)
.setTransactionTimeoutMs(500) .setTransactionTimeoutMs(500)
.setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) .setTransactionPartitions(util.List.of(txnPartitions))
val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get
@ -174,7 +173,7 @@ class TransactionLogTest {
val txnPartitions = new Struct(futurePartitionsSchema) val txnPartitions = new Struct(futurePartitionsSchema)
txnPartitions.set("topic", "topic") txnPartitions.set("topic", "topic")
txnPartitions.set("partition_ids", Array(Integer.valueOf(1))) txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]() val txnPartitionsTaggedFields = new util.TreeMap[Integer, Any]()
txnPartitionsTaggedFields.put(100, "foo") txnPartitionsTaggedFields.put(100, "foo")
txnPartitionsTaggedFields.put(101, 4000) txnPartitionsTaggedFields.put(101, 4000)
txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields) txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
@ -204,7 +203,7 @@ class TransactionLogTest {
transactionLogValue.set("transaction_partitions", Array(txnPartitions)) transactionLogValue.set("transaction_partitions", Array(txnPartitions))
transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L) transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L)
transactionLogValue.set("transaction_start_timestamp_ms", 3000L) transactionLogValue.set("transaction_start_timestamp_ms", 3000L)
val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]() val txnLogValueTaggedFields = new util.TreeMap[Integer, Any]()
txnLogValueTaggedFields.put(100, "foo") txnLogValueTaggedFields.put(100, "foo")
txnLogValueTaggedFields.put(101, 4000) txnLogValueTaggedFields.put(101, 4000)
transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields) transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields)

View File

@ -17,8 +17,6 @@
package kafka.coordinator.transaction package kafka.coordinator.transaction
import java.util import java.util
import java.util.Arrays.asList
import java.util.Collections
import java.util.Optional import java.util.Optional
import java.util.concurrent.{Callable, Executors, Future} import java.util.concurrent.{Callable, Executors, Future}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
@ -156,7 +154,7 @@ class TransactionMarkerChannelManagerTest {
val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
val response = new WriteTxnMarkersResponse( val response = new WriteTxnMarkersResponse(
Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) util.Map.of(producerId2: java.lang.Long, util.Map.of(partition1, Errors.NONE)))
val clientResponse = new ClientResponse(header, null, null, val clientResponse = new ClientResponse(header, null, null,
time.milliseconds(), time.milliseconds(), false, null, null, time.milliseconds(), time.milliseconds(), false, null, null,
response) response)
@ -205,7 +203,7 @@ class TransactionMarkerChannelManagerTest {
// Build a successful client response. // Build a successful client response.
val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
val successfulResponse = new WriteTxnMarkersResponse( val successfulResponse = new WriteTxnMarkersResponse(
Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) util.Map.of(producerId2: java.lang.Long, util.Map.of(partition1, Errors.NONE)))
val successfulClientResponse = new ClientResponse(header, null, null, val successfulClientResponse = new ClientResponse(header, null, null,
time.milliseconds(), time.milliseconds(), false, null, null, time.milliseconds(), time.milliseconds(), false, null, null,
successfulResponse) successfulResponse)
@ -302,10 +300,10 @@ class TransactionMarkerChannelManagerTest {
assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2)) assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
val expectedBroker1Request = new WriteTxnMarkersRequest.Builder( val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)), util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build() new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)))).build()
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build() util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build()
val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler => val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@ -372,10 +370,10 @@ class TransactionMarkerChannelManagerTest {
assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2)) assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
val expectedBroker1Request = new WriteTxnMarkersRequest.Builder( val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)), util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, asList(partition1)))).build() new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)))).build()
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, asList(partition2)))).build() util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build()
val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler => val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().asScala.map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())

View File

@ -17,7 +17,6 @@
package kafka.coordinator.transaction package kafka.coordinator.transaction
import java.{lang, util} import java.{lang, util}
import java.util.Arrays.asList
import org.apache.kafka.clients.ClientResponse import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -46,10 +45,10 @@ class TransactionMarkerRequestCompletionHandlerTest {
private val topicPartition = new TopicPartition("topic1", 0) private val topicPartition = new TopicPartition("topic1", 0)
private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, mutable.Set[TopicPartition](topicPartition), 0L, 0L, TransactionVersion.TV_2) producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, mutable.Set[TopicPartition](topicPartition), 0L, 0L, TransactionVersion.TV_2)
private val pendingCompleteTxnAndMarkers = asList( private val pendingCompleteTxnAndMarkers = util.List.of(
PendingCompleteTxnAndMarkerEntry( PendingCompleteTxnAndMarkerEntry(
PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, txnMetadata.prepareComplete(42)), PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, txnMetadata.prepareComplete(42)),
new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, asList(topicPartition)))) new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, txnResult, util.List.of(topicPartition))))
private val markerChannelManager: TransactionMarkerChannelManager = private val markerChannelManager: TransactionMarkerChannelManager =
mock(classOf[TransactionMarkerChannelManager]) mock(classOf[TransactionMarkerChannelManager])

View File

@ -127,7 +127,7 @@ class TransactionMetadataTest {
txnLastUpdateTimestamp = time.milliseconds(), txnLastUpdateTimestamp = time.milliseconds(),
clientTransactionVersion = TV_2) clientTransactionVersion = TV_2)
val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
@ -152,7 +152,7 @@ class TransactionMetadataTest {
txnLastUpdateTimestamp = time.milliseconds(), txnLastUpdateTimestamp = time.milliseconds(),
clientTransactionVersion = TV_2) clientTransactionVersion = TV_2)
val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
@ -177,7 +177,7 @@ class TransactionMetadataTest {
txnLastUpdateTimestamp = time.milliseconds(), txnLastUpdateTimestamp = time.milliseconds(),
clientTransactionVersion = TV_2) clientTransactionVersion = TV_2)
val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, true) val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() + 1, noPartitionAdded = true)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
@ -295,7 +295,7 @@ class TransactionMetadataTest {
clientTransactionVersion = TV_0) clientTransactionVersion = TV_0)
// let new time be smaller // let new time be smaller
val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(TransactionState.PREPARE_COMMIT, txnMetadata.state) assertEquals(TransactionState.PREPARE_COMMIT, txnMetadata.state)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
@ -323,7 +323,7 @@ class TransactionMetadataTest {
clientTransactionVersion = TV_0) clientTransactionVersion = TV_0)
// let new time be smaller // let new time be smaller
val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(TransactionState.PREPARE_ABORT, txnMetadata.state) assertEquals(TransactionState.PREPARE_ABORT, txnMetadata.state)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
@ -425,7 +425,7 @@ class TransactionMetadataTest {
// We should reset the pending state to make way for the abort transition. // We should reset the pending state to make way for the abort transition.
txnMetadata.pendingState = None txnMetadata.pendingState = None
val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds(), false) val transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_ABORT, TV_0, RecordBatch.NO_PRODUCER_ID, time.milliseconds(), noPartitionAdded = false)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, transitMetadata.producerId) assertEquals(producerId, transitMetadata.producerId)
} }
@ -537,7 +537,7 @@ class TransactionMetadataTest {
txnLastUpdateTimestamp = time.milliseconds(), txnLastUpdateTimestamp = time.milliseconds(),
clientTransactionVersion = TV_2) clientTransactionVersion = TV_2)
var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, false) var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, RecordBatch.NO_PRODUCER_ID, time.milliseconds() - 1, noPartitionAdded = false)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch) assertEquals((producerEpoch + 1).toShort, txnMetadata.producerEpoch)
@ -571,7 +571,7 @@ class TransactionMetadataTest {
assertTrue(txnMetadata.isProducerEpochExhausted) assertTrue(txnMetadata.isProducerEpochExhausted)
val newProducerId = 9893L val newProducerId = 9893L
var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, newProducerId, time.milliseconds() - 1, false) var transitMetadata = txnMetadata.prepareAbortOrCommit(TransactionState.PREPARE_COMMIT, TV_2, newProducerId, time.milliseconds() - 1, noPartitionAdded = false)
txnMetadata.completeTransitionTo(transitMetadata) txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata.producerId) assertEquals(producerId, txnMetadata.producerId)
assertEquals(Short.MaxValue, txnMetadata.producerEpoch) assertEquals(Short.MaxValue, txnMetadata.producerEpoch)

View File

@ -18,6 +18,7 @@ package kafka.coordinator.transaction
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
import javax.management.ObjectName import javax.management.ObjectName
import kafka.server.ReplicaManager import kafka.server.ReplicaManager
@ -49,7 +50,6 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort} import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when} import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when}
import java.util.Collections
import scala.collection.{Map, mutable} import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -72,7 +72,7 @@ class TransactionStateManagerTest {
when(metadataCache.features()).thenReturn { when(metadataCache.features()).thenReturn {
new FinalizedFeatures( new FinalizedFeatures(
MetadataVersion.latestTesting(), MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0) 0)
} }
@ -1354,7 +1354,7 @@ class TransactionStateManagerTest {
when(metadataCache.features()).thenReturn { when(metadataCache.features()).thenReturn {
new FinalizedFeatures( new FinalizedFeatures(
MetadataVersion.latestTesting(), MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), util.Map.of(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()),
0) 0)
} }
val transactionManager = new TransactionStateManager(0, scheduler, val transactionManager = new TransactionStateManager(0, scheduler,

View File

@ -34,7 +34,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import java.io.File import java.io.File
import java.time.Duration import java.time.Duration
import java.util import java.util
import java.util.{Collections, Properties} import java.util.Properties
import scala.collection.{Seq, mutable} import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.util.Using import scala.util.Using
@ -364,7 +364,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = { def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
Using.resource(createAdminClient(brokers, listenerName)) { Using.resource(createAdminClient(brokers, listenerName)) {
admin => { admin => {
admin.alterClientQuotas(Collections.singleton( admin.alterClientQuotas(util.Set.of(
new ClientQuotaAlteration( new ClientQuotaAlteration(
new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava), new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get() configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get()

View File

@ -17,6 +17,7 @@
package kafka.integration package kafka.integration
import java.util
import java.util.Properties import java.util.Properties
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import scala.util.Random import scala.util.Random
@ -317,7 +318,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
valueDeserializer = new StringDeserializer) valueDeserializer = new StringDeserializer)
try { try {
val tp = new TopicPartition(topic, partitionId) val tp = new TopicPartition(topic, partitionId)
consumer.assign(Seq(tp).asJava) consumer.assign(util.List.of(tp))
consumer.seek(tp, 0) consumer.seek(tp, 0)
TestUtils.consumeRecords(consumer, numMessages).map(_.value) TestUtils.consumeRecords(consumer, numMessages).map(_.value)
} finally consumer.close() } finally consumer.close()
@ -410,10 +411,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
} }
private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = { private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = {
val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
adminClient.incrementalAlterConfigs(Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) ->
configEntries.asScala.map((e: ConfigEntry) => new AlterConfigOp(e, AlterConfigOp.OpType.SET)).toSeq val configEntries = topicConfigs.entrySet().stream()
.asJavaCollection).asJava) .map(e => new ConfigEntry(e.getKey.toString, e.getValue.toString))
.map(e => new AlterConfigOp(e, AlterConfigOp.OpType.SET))
.toList
adminClient.incrementalAlterConfigs(util.Map.of(configResource, configEntries))
} }
private def createAdminClient(): Admin = { private def createAdminClient(): Admin = {
@ -427,7 +432,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache: MetadataCache, leaderId: Int): Unit = { private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache: MetadataCache, leaderId: Int): Unit = {
waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic, partitionId).isPresent() && waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic, partitionId).isPresent() &&
metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() == LeaderConstants.NO_LEADER && metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() == LeaderConstants.NO_LEADER &&
java.util.Arrays.asList(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), util.List.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()),
"Timed out waiting for broker metadata cache updates the info for topic partition:" + topicPartition) "Timed out waiting for broker metadata cache updates the info for topic partition:" + topicPartition)
} }
} }

View File

@ -22,15 +22,12 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.Collections
import scala.jdk.CollectionConverters._
class KafkaMetricsGroupTest { class KafkaMetricsGroupTest {
@Test @Test
def testUntaggedMetricName(): Unit = { def testUntaggedMetricName(): Unit = {
val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
val metricName = metricsGroup.metricName("TaggedMetric", Collections.emptyMap()) val metricName = metricsGroup.metricName("TaggedMetric", java.util.Map.of)
assertEquals("kafka.metrics", metricName.getGroup) assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType) assertEquals("TestMetrics", metricName.getType)
@ -42,7 +39,13 @@ class KafkaMetricsGroupTest {
@Test @Test
def testTaggedMetricName(): Unit = { def testTaggedMetricName(): Unit = {
val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava val tags = {
val map = new java.util.LinkedHashMap[String, String]()
map.put("foo", "bar")
map.put("bar", "baz")
map.put("baz", "raz.taz")
map
}
val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
val metricName = metricsGroup.metricName("TaggedMetric", tags) val metricName = metricsGroup.metricName("TaggedMetric", tags)
@ -56,7 +59,13 @@ class KafkaMetricsGroupTest {
@Test @Test
def testTaggedMetricNameWithEmptyValue(): Unit = { def testTaggedMetricNameWithEmptyValue(): Unit = {
val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava val tags = {
val map = new java.util.LinkedHashMap[String, String]()
map.put("foo", "bar")
map.put("bar", "")
map.put("baz", "raz.taz")
map
}
val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics") val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
val metricName = metricsGroup.metricName("TaggedMetric", tags) val metricName = metricsGroup.metricName("TaggedMetric", tags)
@ -67,6 +76,4 @@ class KafkaMetricsGroupTest {
metricName.getMBeanName) metricName.getMBeanName)
assertEquals("baz.raz_taz.foo.bar", metricName.getScope) assertEquals("baz.raz_taz.foo.bar", metricName.getScope)
} }
} }

View File

@ -18,6 +18,7 @@
package kafka.metrics package kafka.metrics
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import java.util
import java.util.Properties import java.util.Properties
import javax.management.ObjectName import javax.management.ObjectName
import com.yammer.metrics.core.MetricPredicate import com.yammer.metrics.core.MetricPredicate
@ -124,7 +125,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
def testUpdateJMXFilter(): Unit = { def testUpdateJMXFilter(): Unit = {
// verify previously exposed metrics are removed and existing matching metrics are added // verify previously exposed metrics are removed and existing matching metrics are added
brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure( brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure(
Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava util.Map.of(JmxReporter.EXCLUDE_CONFIG, "kafka.controller:type=KafkaController,name=ActiveControllerCount")
)) ))
assertFalse(ManagementFactory.getPlatformMBeanServer assertFalse(ManagementFactory.getPlatformMBeanServer
.isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount"))) .isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")))
@ -150,9 +151,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@Test @Test
def testWindowsStyleTagNames(): Unit = { def testWindowsStyleTagNames(): Unit = {
val path = "C:\\windows-path\\kafka-logs" val path = "C:\\windows-path\\kafka-logs"
val tags = Map("dir" -> path) val tags = util.Map.of("dir", path)
val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=") val expectedMBeanName = Set(tags.keySet().iterator().next(), ObjectName.quote(path)).mkString("=")
val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags.asJava) val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags)
assert(metric.getMBeanName.endsWith(expectedMBeanName)) assert(metric.getMBeanName.endsWith(expectedMBeanName))
} }