MINOR: Cleanup Share Coordinator (#19770)
CI / build (push) Waiting to run Details

Now that Kafka Brokers support Java 17, this PR updates the share
coordinator module to get rid of older code. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
- Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sanskar Jhajharia 2025-05-20 17:03:20 +05:30 committed by GitHub
parent d3f8979486
commit 9f293866ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 8 additions and 10 deletions

View File

@ -190,7 +190,7 @@ public class ShareGroupOffset {
} }
public Builder setStateBatches(List<PersisterStateBatch> stateBatches) { public Builder setStateBatches(List<PersisterStateBatch> stateBatches) {
this.stateBatches = stateBatches == null ? Collections.emptyList() : stateBatches.stream().toList(); this.stateBatches = stateBatches == null ? List.of() : stateBatches.stream().toList();
return this; return this;
} }

View File

@ -31,8 +31,8 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.MetricsRegistry;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -91,7 +91,7 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC
@Override @Override
public void close() throws Exception { public void close() throws Exception {
Arrays.asList( List.of(
SHARE_COORDINATOR_WRITE_SENSOR_NAME, SHARE_COORDINATOR_WRITE_SENSOR_NAME,
SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME
).forEach(metrics::removeSensor); ).forEach(metrics::removeSensor);

View File

@ -59,8 +59,6 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -760,7 +758,7 @@ class ShareCoordinatorShardTest {
// -Share coordinator writes the snapshot with startOffset 110 and batch 3. // -Share coordinator writes the snapshot with startOffset 110 and batch 3.
// -batch2 should NOT be lost // -batch2 should NOT be lost
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder() ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder()
.setConfigOverrides(Collections.singletonMap(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, "0")) .setConfigOverrides(Map.of(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, "0"))
.build(); .build();
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
@ -775,7 +773,7 @@ class ShareCoordinatorShardTest {
.setStartOffset(100) .setStartOffset(100)
.setStateEpoch(0) .setStateEpoch(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setStateBatches(Arrays.asList( .setStateBatches(List.of(
new WriteShareGroupStateRequestData.StateBatch() //b1 new WriteShareGroupStateRequestData.StateBatch() //b1
.setFirstOffset(100) .setFirstOffset(100)
.setLastOffset(109) .setLastOffset(109)
@ -862,7 +860,7 @@ class ShareCoordinatorShardTest {
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setStateEpoch(0) .setStateEpoch(0)
.setSnapshotEpoch(2) // since 2nd share snapshot .setSnapshotEpoch(2) // since 2nd share snapshot
.setStateBatches(Arrays.asList( .setStateBatches(List.of(
new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost
new PersisterStateBatch(120, 129, (byte) 2, (short) 1) new PersisterStateBatch(120, 129, (byte) 2, (short) 1)
)) ))

View File

@ -26,8 +26,8 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME; import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME;
@ -42,7 +42,7 @@ public class ShareCoordinatorMetricsTest {
public void testMetricNames() { public void testMetricNames() {
Metrics metrics = new Metrics(); Metrics metrics = new Metrics();
HashSet<MetricName> expectedMetrics = new HashSet<>(Arrays.asList( HashSet<MetricName> expectedMetrics = new HashSet<>(List.of(
metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),