From a206feb4ba2e051395a5ef51b80767ba26307a29 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Sun, 23 Feb 2025 08:57:38 +0530 Subject: [PATCH] MINOR: Clean up share-coordinator (#19007) Given that now we support Java 17 on our brokers, this PR replace the use of `Collections.singletonList()` and `Collections.emptyList()` with `List.of()` Reviewers: Andrew Schofield , Chia-Ping Tsai --- .../share/ShareCoordinatorService.java | 9 +- .../share/ShareCoordinatorShard.java | 9 +- .../metrics/ShareCoordinatorMetrics.java | 2 +- .../PersisterStateBatchCombinerTest.java | 13 +- .../ShareCoordinatorRecordHelpersTest.java | 10 +- .../ShareCoordinatorRecordSerdeTest.java | 4 +- .../share/ShareCoordinatorShardTest.java | 162 +++++++++--------- .../share/ShareCoordinatorTestConfig.java | 2 +- 8 files changed, 104 insertions(+), 107 deletions(-) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 764e008136e..8b87aed65ef 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -62,7 +62,6 @@ import org.slf4j.Logger; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -413,9 +412,9 @@ public class ShareCoordinatorService implements ShareCoordinator { Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.writeState(new WriteShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(topicData.topicId()) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(partitionData.partition()) .setStartOffset(partitionData.startOffset()) .setLeaderEpoch(partitionData.leaderEpoch()) @@ -531,9 +530,9 @@ public class ShareCoordinatorService implements ShareCoordinator { ReadShareGroupStateRequestData requestForCurrentPartition = new ReadShareGroupStateRequestData() .setGroupId(groupId) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(topicId) - .setPartitions(Collections.singletonList(partitionData)))); + .setPartitions(List.of(partitionData)))); // We are issuing a scheduleWriteOperation even though the request is of read type since // we might want to update the leader epoch, if it is the highest seen so far for the specific diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 1022a36fb65..4bbdb2c03cc 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -66,7 +66,6 @@ import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; -import java.util.Collections; import java.util.List; import java.util.Optional; @@ -88,7 +87,7 @@ public class ShareCoordinatorShard implements CoordinatorShard { - private ShareCoordinatorConfig config; + private final ShareCoordinatorConfig config; private LogContext logContext; private SnapshotRegistry snapshotRegistry; private CoordinatorMetrics coordinatorMetrics; @@ -326,7 +325,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData); + return new CoordinatorResult<>(List.of(record), responseData); } /** @@ -409,7 +408,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData); + return new CoordinatorResult<>(List.of(record), responseData); } /** @@ -519,7 +518,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData); + return new CoordinatorResult<>(List.of(record), responseData); } /** diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java index 197b1d76e0b..9081f384a48 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java @@ -48,7 +48,7 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME = "ShareCoordinatorWrite"; public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME = "ShareCoordinatorWriteLatency"; public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME = "ShareCoordinatorStateTopicPruneSensorName"; - private Map pruneMetrics = new ConcurrentHashMap<>(); + private final Map pruneMetrics = new ConcurrentHashMap<>(); /** * Global sensors. These are shared across all metrics shards. diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java index 6541e9479fd..26ddbea2c14 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.server.share.persister.PersisterStateBatch; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.stream.Stream; @@ -70,7 +69,7 @@ public class PersisterStateBatchCombinerTest { int deliveryState, int deliveryCount ) { - return Collections.singletonList( + return List.of( new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount) ); } @@ -108,14 +107,14 @@ public class PersisterStateBatchCombinerTest { new BatchTestHolder( "Current batches with start offset midway are pruned.", BatchTestHolder.singleBatch(100, 130, 0, 1), - Collections.emptyList(), + List.of(), BatchTestHolder.singleBatch(120, 130, 0, 1), 120 ), new BatchTestHolder( "New batches with start offset midway are pruned.", - Collections.emptyList(), + List.of(), BatchTestHolder.singleBatch(100, 130, 0, 1), BatchTestHolder.singleBatch(120, 130, 0, 1), 120 @@ -123,9 +122,9 @@ public class PersisterStateBatchCombinerTest { new BatchTestHolder( "Both current and new batches empty.", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), + List.of(), + List.of(), + List.of(), 120 ) ); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java index 9de59e499d1..77e539c1dc3 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.server.share.persister.PersisterStateBatch; import org.junit.jupiter.api.Test; -import java.util.Collections; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -47,7 +47,7 @@ public class ShareCoordinatorRecordHelpersTest { .setStateEpoch(1) .setLeaderEpoch(5) .setStartOffset(0) - .setStateBatches(Collections.singletonList(batch)) + .setStateBatches(List.of(batch)) .build() ); @@ -62,7 +62,7 @@ public class ShareCoordinatorRecordHelpersTest { .setStateEpoch(1) .setLeaderEpoch(5) .setStartOffset(0) - .setStateBatches(Collections.singletonList( + .setStateBatches(List.of( new ShareSnapshotValue.StateBatch() .setFirstOffset(1L) .setLastOffset(10L) @@ -88,7 +88,7 @@ public class ShareCoordinatorRecordHelpersTest { .setStateEpoch(-1) // ignored for share update .setLeaderEpoch(5) .setStartOffset(0) - .setStateBatches(Collections.singletonList(batch)) + .setStateBatches(List.of(batch)) .build() ); @@ -102,7 +102,7 @@ public class ShareCoordinatorRecordHelpersTest { .setSnapshotEpoch(0) .setLeaderEpoch(5) .setStartOffset(0) - .setStateBatches(Collections.singletonList( + .setStateBatches(List.of( new ShareUpdateValue.StateBatch() .setFirstOffset(1L) .setLastOffset(10L) diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java index 8f0bb7e82d1..9706902f0b1 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java @@ -30,7 +30,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; -import java.util.Collections; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -221,7 +221,7 @@ public class ShareCoordinatorRecordSerdeTest { .setLeaderEpoch(2) .setStateEpoch(1) .setSnapshotEpoch(1) - .setStateBatches(Collections.singletonList(new ShareSnapshotValue.StateBatch() + .setStateBatches(List.of(new ShareSnapshotValue.StateBatch() .setFirstOffset(1) .setLastOffset(10) .setDeliveryState((byte) 0) diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index ab136f8dc31..64b7ba21e9f 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -132,14 +132,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(leaderEpoch) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -172,7 +172,7 @@ class ShareCoordinatorShardTest { .setSnapshotEpoch(0) .setStateEpoch(0) .setLeaderEpoch(leaderEpoch) - .setStateBatches(Collections.singletonList( + .setStateBatches(List.of( new ShareSnapshotValue.StateBatch() .setFirstOffset(0) .setLastOffset(10) @@ -192,7 +192,7 @@ class ShareCoordinatorShardTest { .setSnapshotEpoch(1) .setStateEpoch(1) .setLeaderEpoch(leaderEpoch + 1) - .setStateBatches(Collections.singletonList( + .setStateBatches(List.of( new ShareSnapshotValue.StateBatch() .setFirstOffset(11) .setLastOffset(12) @@ -225,14 +225,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -243,7 +243,7 @@ class ShareCoordinatorShardTest { shard.replay(0L, 0L, (short) 0, result.records().get(0)); WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); - List expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0)) )); @@ -265,14 +265,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request1 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -280,14 +280,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request2 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(11) .setLastOffset(20) .setDeliveryCount((short) 1) @@ -298,7 +298,7 @@ class ShareCoordinatorShardTest { shard.replay(0L, 0L, (short) 0, result.records().get(0)); WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); - List expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0)) )); @@ -316,7 +316,7 @@ class ShareCoordinatorShardTest { expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); // The snapshot epoch here will be 1 since this is a snapshot update record, // and it refers to parent share snapshot. - expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( + expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), 0) )); @@ -329,7 +329,7 @@ class ShareCoordinatorShardTest { assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch()); assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset()); // The batches should have combined to 1 since same state. - assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)), + assertEquals(List.of(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)), combinedState.stateBatches()); assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); } @@ -344,14 +344,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(partition) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -361,7 +361,7 @@ class ShareCoordinatorShardTest { WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toErrorResponseData( TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage()); - List expectedRecords = Collections.emptyList(); + List expectedRecords = List.of(); assertEquals(expectedData, result.response()); assertEquals(expectedRecords, result.records()); @@ -379,14 +379,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(0) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -396,7 +396,7 @@ class ShareCoordinatorShardTest { WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toErrorResponseData( TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); - List expectedRecords = Collections.emptyList(); + List expectedRecords = List.of(); assertEquals(expectedData, result.response()); assertEquals(expectedRecords, result.records()); @@ -413,14 +413,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request1 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(5) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -428,14 +428,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request2 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) .setLeaderEpoch(3) // Lower leader epoch in the second request. - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(11) .setLastOffset(20) .setDeliveryCount((short) 1) @@ -446,7 +446,7 @@ class ShareCoordinatorShardTest { shard.replay(0L, 0L, (short) 0, result.records().get(0)); WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); - List expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0)) )); @@ -462,7 +462,7 @@ class ShareCoordinatorShardTest { // Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected. expectedData = WriteShareGroupStateResponse.toErrorResponseData( TOPIC_ID, PARTITION, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message()); - expectedRecords = Collections.emptyList(); + expectedRecords = List.of(); assertEquals(expectedData, result.response()); assertEquals(expectedRecords, result.records()); @@ -479,14 +479,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request1 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(1) .setLeaderEpoch(5) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -494,14 +494,14 @@ class ShareCoordinatorShardTest { WriteShareGroupStateRequestData request2 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(0) .setStateEpoch(0) // Lower state epoch in the second request. .setLeaderEpoch(5) - .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() .setFirstOffset(11) .setLastOffset(20) .setDeliveryCount((short) 1) @@ -512,7 +512,7 @@ class ShareCoordinatorShardTest { shard.replay(0L, 0L, (short) 0, result.records().get(0)); WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); - List expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0)) )); @@ -528,7 +528,7 @@ class ShareCoordinatorShardTest { // Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected. expectedData = WriteShareGroupStateResponse.toErrorResponseData( TOPIC_ID, PARTITION, Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.message()); - expectedRecords = Collections.emptyList(); + expectedRecords = List.of(); assertEquals(expectedData, result.response()); assertEquals(expectedRecords, result.records()); @@ -547,9 +547,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(1))))); @@ -560,7 +560,7 @@ class ShareCoordinatorShardTest { PARTITION, 0, 0, - Collections.singletonList(new ReadShareGroupStateResponseData.StateBatch() + List.of(new ReadShareGroupStateResponseData.StateBatch() .setFirstOffset(0) .setLastOffset(10) .setDeliveryCount((short) 1) @@ -581,9 +581,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(1))))); @@ -611,9 +611,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(partition) .setLeaderEpoch(5))))); @@ -640,9 +640,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartition(partition) .setLeaderEpoch(5))))); @@ -669,9 +669,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(0) .setLeaderEpoch(5))))); @@ -698,9 +698,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(3))))); // Lower leaderEpoch than the one stored in leaderMap. @@ -753,9 +753,9 @@ class ShareCoordinatorShardTest { // Set initial state. WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(100) .setStateEpoch(0) @@ -784,7 +784,7 @@ class ShareCoordinatorShardTest { shard.replay(0L, 0L, (short) 0, result.records().get(0)); WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); - List expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0)) )); @@ -800,14 +800,14 @@ class ShareCoordinatorShardTest { // Acknowledge b1. WriteShareGroupStateRequestData requestUpdateB1 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(-1) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList( + .setStateBatches(List.of( new WriteShareGroupStateRequestData.StateBatch() //b1 .setFirstOffset(100) .setLastOffset(109) @@ -822,14 +822,14 @@ class ShareCoordinatorShardTest { // Ack batch 3 and move start offset. WriteShareGroupStateRequestData requestUpdateStartOffsetAndB3 = new WriteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setStartOffset(110) // 100 -> 110 .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Collections.singletonList( + .setStateBatches(List.of( new WriteShareGroupStateRequestData.StateBatch() //b3 .setFirstOffset(120) .setLastOffset(129) @@ -852,7 +852,7 @@ class ShareCoordinatorShardTest { new PersisterStateBatch(120, 129, (byte) 2, (short) 1) )) .build(); - List expectedRecordsFinal = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List expectedRecordsFinal = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, offsetFinal )); @@ -874,7 +874,7 @@ class ShareCoordinatorShardTest { .build(); when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L)); - assertEquals(new CoordinatorResult<>(Collections.emptyList(), Optional.of(10L)), shard.lastRedundantOffset()); + assertEquals(new CoordinatorResult<>(List.of(), Optional.of(10L)), shard.lastRedundantOffset()); } @Test @@ -885,9 +885,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(2) )))); @@ -900,12 +900,12 @@ class ShareCoordinatorShardTest { TOPIC_ID, PARTITION, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_STATE_EPOCH, - Collections.emptyList()); - List expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + List.of()); + List expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( GROUP_ID, TOPIC_ID, PARTITION, new ShareGroupOffset.Builder() .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) .setLeaderEpoch(2) - .setStateBatches(Collections.emptyList()) + .setStateBatches(List.of()) .setSnapshotEpoch(0) .setStateEpoch(PartitionFactory.DEFAULT_STATE_EPOCH) .build() @@ -925,9 +925,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request1 = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(2) )))); @@ -940,9 +940,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request2 = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(-1) )))); @@ -953,9 +953,9 @@ class ShareCoordinatorShardTest { ReadShareGroupStateRequestData request3 = new ReadShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() + .setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData() .setPartition(PARTITION) .setLeaderEpoch(-1) )))); @@ -974,9 +974,9 @@ class ShareCoordinatorShardTest { DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION))))); CoordinatorResult result = shard.deleteState(request); @@ -1031,9 +1031,9 @@ class ShareCoordinatorShardTest { DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() .setGroupId(GROUP_ID) - .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() .setTopicId(TOPIC_ID) - .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() .setPartition(PARTITION))))); CoordinatorResult result = shard.deleteState(request); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java index 75916187b28..eab6f2966ac 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java @@ -29,7 +29,7 @@ import java.util.Map; public class ShareCoordinatorTestConfig { - private static final List CONFIG_DEF_LIST = Collections.singletonList( + private static final List CONFIG_DEF_LIST = List.of( ShareCoordinatorConfig.CONFIG_DEF );