MINOR: Clean up share-coordinator (#19007)

Given that now we support Java 17 on our brokers, this PR replace the use of `Collections.singletonList()` and `Collections.emptyList()` with `List.of()`

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-02-23 08:57:38 +05:30 committed by GitHub
parent 3fc103b48b
commit a206feb4ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 104 additions and 107 deletions

View File

@ -62,7 +62,6 @@ import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -413,9 +412,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
coordinator -> coordinator.writeState(new WriteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicData.topicId())
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
@ -531,9 +530,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
ReadShareGroupStateRequestData requestForCurrentPartition = new ReadShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(partitionData))));
.setPartitions(List.of(partitionData))));
// We are issuing a scheduleWriteOperation even though the request is of read type since
// we might want to update the leader epoch, if it is the highest seen so far for the specific

View File

@ -66,7 +66,6 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -88,7 +87,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number.");
public static class Builder implements CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
private ShareCoordinatorConfig config;
private final ShareCoordinatorConfig config;
private LogContext logContext;
private SnapshotRegistry snapshotRegistry;
private CoordinatorMetrics coordinatorMetrics;
@ -326,7 +325,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
))
);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
return new CoordinatorResult<>(List.of(record), responseData);
}
/**
@ -409,7 +408,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setStateEpoch(responseData.results().get(0).partitions().get(0).stateEpoch());
CoordinatorRecord record = generateShareStateRecord(writePartitionData, key);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
return new CoordinatorResult<>(List.of(record), responseData);
}
/**
@ -519,7 +518,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
))
);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
return new CoordinatorResult<>(List.of(record), responseData);
}
/**

View File

@ -48,7 +48,7 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC
public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME = "ShareCoordinatorWrite";
public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME = "ShareCoordinatorWriteLatency";
public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME = "ShareCoordinatorStateTopicPruneSensorName";
private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new ConcurrentHashMap<>();
private final Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new ConcurrentHashMap<>();
/**
* Global sensors. These are shared across all metrics shards.

View File

@ -22,7 +22,6 @@ import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Stream;
@ -70,7 +69,7 @@ public class PersisterStateBatchCombinerTest {
int deliveryState,
int deliveryCount
) {
return Collections.singletonList(
return List.of(
new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount)
);
}
@ -108,14 +107,14 @@ public class PersisterStateBatchCombinerTest {
new BatchTestHolder(
"Current batches with start offset midway are pruned.",
BatchTestHolder.singleBatch(100, 130, 0, 1),
Collections.emptyList(),
List.of(),
BatchTestHolder.singleBatch(120, 130, 0, 1),
120
),
new BatchTestHolder(
"New batches with start offset midway are pruned.",
Collections.emptyList(),
List.of(),
BatchTestHolder.singleBatch(100, 130, 0, 1),
BatchTestHolder.singleBatch(120, 130, 0, 1),
120
@ -123,9 +122,9 @@ public class PersisterStateBatchCombinerTest {
new BatchTestHolder(
"Both current and new batches empty.",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
List.of(),
List.of(),
List.of(),
120
)
);

View File

@ -27,7 +27,7 @@ import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -47,7 +47,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setStateEpoch(1)
.setLeaderEpoch(5)
.setStartOffset(0)
.setStateBatches(Collections.singletonList(batch))
.setStateBatches(List.of(batch))
.build()
);
@ -62,7 +62,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setStateEpoch(1)
.setLeaderEpoch(5)
.setStartOffset(0)
.setStateBatches(Collections.singletonList(
.setStateBatches(List.of(
new ShareSnapshotValue.StateBatch()
.setFirstOffset(1L)
.setLastOffset(10L)
@ -88,7 +88,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setStateEpoch(-1) // ignored for share update
.setLeaderEpoch(5)
.setStartOffset(0)
.setStateBatches(Collections.singletonList(batch))
.setStateBatches(List.of(batch))
.build()
);
@ -102,7 +102,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setSnapshotEpoch(0)
.setLeaderEpoch(5)
.setStartOffset(0)
.setStateBatches(Collections.singletonList(
.setStateBatches(List.of(
new ShareUpdateValue.StateBatch()
.setFirstOffset(1L)
.setLastOffset(10L)

View File

@ -30,7 +30,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -221,7 +221,7 @@ public class ShareCoordinatorRecordSerdeTest {
.setLeaderEpoch(2)
.setStateEpoch(1)
.setSnapshotEpoch(1)
.setStateBatches(Collections.singletonList(new ShareSnapshotValue.StateBatch()
.setStateBatches(List.of(new ShareSnapshotValue.StateBatch()
.setFirstOffset(1)
.setLastOffset(10)
.setDeliveryState((byte) 0)

View File

@ -132,14 +132,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(leaderEpoch)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -172,7 +172,7 @@ class ShareCoordinatorShardTest {
.setSnapshotEpoch(0)
.setStateEpoch(0)
.setLeaderEpoch(leaderEpoch)
.setStateBatches(Collections.singletonList(
.setStateBatches(List.of(
new ShareSnapshotValue.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
@ -192,7 +192,7 @@ class ShareCoordinatorShardTest {
.setSnapshotEpoch(1)
.setStateEpoch(1)
.setLeaderEpoch(leaderEpoch + 1)
.setStateBatches(Collections.singletonList(
.setStateBatches(List.of(
new ShareSnapshotValue.StateBatch()
.setFirstOffset(11)
.setLastOffset(12)
@ -225,14 +225,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -243,7 +243,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
));
@ -265,14 +265,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request1 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -280,14 +280,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request2 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
@ -298,7 +298,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0))
));
@ -316,7 +316,7 @@ class ShareCoordinatorShardTest {
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
// The snapshot epoch here will be 1 since this is a snapshot update record,
// and it refers to parent share snapshot.
expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), 0)
));
@ -329,7 +329,7 @@ class ShareCoordinatorShardTest {
assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch());
assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset());
// The batches should have combined to 1 since same state.
assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)),
assertEquals(List.of(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)),
combinedState.stateBatches());
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
}
@ -344,14 +344,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -361,7 +361,7 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
List<CoordinatorRecord> expectedRecords = Collections.emptyList();
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@ -379,14 +379,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(0)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -396,7 +396,7 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
List<CoordinatorRecord> expectedRecords = Collections.emptyList();
List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@ -413,14 +413,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request1 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(5)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -428,14 +428,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request2 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(3) // Lower leader epoch in the second request.
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
@ -446,7 +446,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0))
));
@ -462,7 +462,7 @@ class ShareCoordinatorShardTest {
// Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected.
expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, PARTITION, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message());
expectedRecords = Collections.emptyList();
expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@ -479,14 +479,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request1 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(5)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -494,14 +494,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request2 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0) // Lower state epoch in the second request.
.setLeaderEpoch(5)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
@ -512,7 +512,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0))
));
@ -528,7 +528,7 @@ class ShareCoordinatorShardTest {
// Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected.
expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, PARTITION, Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.message());
expectedRecords = Collections.emptyList();
expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@ -547,9 +547,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(1)))));
@ -560,7 +560,7 @@ class ShareCoordinatorShardTest {
PARTITION,
0,
0,
Collections.singletonList(new ReadShareGroupStateResponseData.StateBatch()
List.of(new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@ -581,9 +581,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(1)))));
@ -611,9 +611,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(5)))));
@ -640,9 +640,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(5)))));
@ -669,9 +669,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(0)
.setLeaderEpoch(5)))));
@ -698,9 +698,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(3))))); // Lower leaderEpoch than the one stored in leaderMap.
@ -753,9 +753,9 @@ class ShareCoordinatorShardTest {
// Set initial state.
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(100)
.setStateEpoch(0)
@ -784,7 +784,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
));
@ -800,14 +800,14 @@ class ShareCoordinatorShardTest {
// Acknowledge b1.
WriteShareGroupStateRequestData requestUpdateB1 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(-1)
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(
.setStateBatches(List.of(
new WriteShareGroupStateRequestData.StateBatch() //b1
.setFirstOffset(100)
.setLastOffset(109)
@ -822,14 +822,14 @@ class ShareCoordinatorShardTest {
// Ack batch 3 and move start offset.
WriteShareGroupStateRequestData requestUpdateStartOffsetAndB3 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(110) // 100 -> 110
.setStateEpoch(0)
.setLeaderEpoch(0)
.setStateBatches(Collections.singletonList(
.setStateBatches(List.of(
new WriteShareGroupStateRequestData.StateBatch() //b3
.setFirstOffset(120)
.setLastOffset(129)
@ -852,7 +852,7 @@ class ShareCoordinatorShardTest {
new PersisterStateBatch(120, 129, (byte) 2, (short) 1)
))
.build();
List<CoordinatorRecord> expectedRecordsFinal = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List<CoordinatorRecord> expectedRecordsFinal = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, offsetFinal
));
@ -874,7 +874,7 @@ class ShareCoordinatorShardTest {
.build();
when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L));
assertEquals(new CoordinatorResult<>(Collections.emptyList(), Optional.of(10L)), shard.lastRedundantOffset());
assertEquals(new CoordinatorResult<>(List.of(), Optional.of(10L)), shard.lastRedundantOffset());
}
@Test
@ -885,9 +885,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(2)
))));
@ -900,12 +900,12 @@ class ShareCoordinatorShardTest {
TOPIC_ID, PARTITION,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList());
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
List.of());
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, new ShareGroupOffset.Builder()
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
.setLeaderEpoch(2)
.setStateBatches(Collections.emptyList())
.setStateBatches(List.of())
.setSnapshotEpoch(0)
.setStateEpoch(PartitionFactory.DEFAULT_STATE_EPOCH)
.build()
@ -925,9 +925,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request1 = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(2)
))));
@ -940,9 +940,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request2 = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(-1)
))));
@ -953,9 +953,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request3 = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(-1)
))));
@ -974,9 +974,9 @@ class ShareCoordinatorShardTest {
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);
@ -1031,9 +1031,9 @@ class ShareCoordinatorShardTest {
DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData()
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request);

View File

@ -29,7 +29,7 @@ import java.util.Map;
public class ShareCoordinatorTestConfig {
private static final List<ConfigDef> CONFIG_DEF_LIST = Collections.singletonList(
private static final List<ConfigDef> CONFIG_DEF_LIST = List.of(
ShareCoordinatorConfig.CONFIG_DEF
);