MINOR: Clean up coordinator-common and server modules (#19009)

Given that now we support Java 17 on our brokers, this PR replace the
use of :

Collections.singletonList() and Collections.emptyList() with List.of()
Collections.singletonMap() and Collections.emptyMap() with Map.of()
Collections.singleton() and Collections.emptySet() with Set.of()

Affected modules: server and coordinator-common

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-02-27 14:39:21 +05:30 committed by GitHub
parent 269e2d898b
commit a39fcac95c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 114 additions and 123 deletions

View File

@ -52,7 +52,6 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -2534,7 +2533,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
public List<TopicPartition> activeTopicPartitions() {
if (coordinators == null || coordinators.isEmpty()) {
return Collections.emptyList();
return List.of();
}
return coordinators.entrySet().stream()

View File

@ -23,7 +23,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@ -71,7 +71,7 @@ public class CoordinatorExecutorImplTest {
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, String> op =
args.getArgument(3);
assertEquals(
new CoordinatorResult<>(Collections.singletonList("record"), null),
new CoordinatorResult<>(List.of("record"), null),
op.generateRecordsAndResult(coordinatorShard)
);
return CompletableFuture.completedFuture(null);
@ -95,7 +95,7 @@ public class CoordinatorExecutorImplTest {
operationCalled.set(true);
assertEquals("Hello!", result);
assertNull(exception);
return new CoordinatorResult<>(Collections.singletonList("record"), null);
return new CoordinatorResult<>(List.of("record"), null);
};
executor.schedule(
@ -130,7 +130,7 @@ public class CoordinatorExecutorImplTest {
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, String> op =
args.getArgument(3);
assertEquals(
new CoordinatorResult<>(Collections.emptyList(), null),
new CoordinatorResult<>(List.of(), null),
op.generateRecordsAndResult(coordinatorShard)
);
return CompletableFuture.completedFuture(null);
@ -154,7 +154,7 @@ public class CoordinatorExecutorImplTest {
assertNull(result);
assertNotNull(exception);
assertEquals("Oh no!", exception.getMessage());
return new CoordinatorResult<>(Collections.emptyList(), null);
return new CoordinatorResult<>(List.of(), null);
};
executor.schedule(
@ -301,7 +301,7 @@ public class CoordinatorExecutorImplTest {
AtomicBoolean operationCalled = new AtomicBoolean(false);
CoordinatorExecutor.TaskOperation<String, String> taskOperation = (result, exception) -> {
operationCalled.set(true);
return new CoordinatorResult<>(Collections.emptyList(), null);
return new CoordinatorResult<>(List.of(), null);
};
executor.schedule(

View File

@ -18,7 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -26,8 +26,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class CoordinatorResultTest {
@Test
public void testAttributes() {
CoordinatorResult<String, CoordinatorRecord> result = new CoordinatorResult<>(Collections.emptyList(), "response");
assertEquals(Collections.emptyList(), result.records());
CoordinatorResult<String, CoordinatorRecord> result = new CoordinatorResult<>(List.of(), "response");
assertEquals(List.of(), result.records());
assertEquals("response", result.response());
}
@ -38,8 +38,8 @@ public class CoordinatorResultTest {
@Test
public void testEquals() {
CoordinatorResult<String, CoordinatorRecord> result1 = new CoordinatorResult<>(Collections.emptyList(), "response");
CoordinatorResult<String, CoordinatorRecord> result2 = new CoordinatorResult<>(Collections.emptyList(), "response");
CoordinatorResult<String, CoordinatorRecord> result1 = new CoordinatorResult<>(List.of(), "response");
CoordinatorResult<String, CoordinatorRecord> result2 = new CoordinatorResult<>(List.of(), "response");
assertEquals(result1, result2);
}
}

View File

@ -60,11 +60,11 @@ import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
@ -237,7 +237,7 @@ public class CoordinatorRuntimeTest {
}
public MockCoordinatorLoader() {
this(null, Collections.emptyList(), Collections.emptyList());
this(null, List.of(), List.of());
}
@Override
@ -448,7 +448,7 @@ public class CoordinatorRuntimeTest {
Set<String> pendingRecords(long producerId) {
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.get(producerId);
if (pending == null) return Collections.emptySet();
if (pending == null) return Set.of();
return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
@ -1342,7 +1342,7 @@ public class CoordinatorRuntimeTest {
// Write #3 but without any records.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.emptyList(), "response3"));
state -> new CoordinatorResult<>(List.of(), "response3"));
// Verify that the write is not committed yet.
assertFalse(write3.isDone());
@ -1385,7 +1385,7 @@ public class CoordinatorRuntimeTest {
// Write #4 but without records.
CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.emptyList(), "response4"));
state -> new CoordinatorResult<>(List.of(), "response4"));
// It is completed immediately because the state is fully committed.
assertTrue(write4.isDone());
@ -1414,7 +1414,7 @@ public class CoordinatorRuntimeTest {
// Scheduling a write fails with a NotCoordinatorException because the coordinator
// does not exist.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.emptyList(), "response1"));
state -> new CoordinatorResult<>(List.of(), "response1"));
assertFutureThrows(NotCoordinatorException.class, write);
}
@ -1696,7 +1696,7 @@ public class CoordinatorRuntimeTest {
verify(writer, times(1)).registerListener(eq(TP), any());
// Prepare the log config.
when(writer.config(TP)).thenReturn(new LogConfig(Collections.emptyMap()));
when(writer.config(TP)).thenReturn(new LogConfig(Map.of()));
// Prepare the transaction verification.
VerificationGuard guard = new VerificationGuard();
@ -1910,7 +1910,7 @@ public class CoordinatorRuntimeTest {
expectedType = ControlRecordType.COMMIT;
} else {
// Or they are gone if aborted.
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Set.of(), ctx.coordinator.coordinator().records());
expectedType = ControlRecordType.ABORT;
}
@ -2039,7 +2039,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Set.of(), ctx.coordinator.coordinator().records());
// Complete transaction #1. It should fail.
CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
@ -2058,7 +2058,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Set.of(), ctx.coordinator.coordinator().records());
}
@Test
@ -2125,7 +2125,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Set.of(), ctx.coordinator.coordinator().records());
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
@ -2147,7 +2147,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Set.of(), ctx.coordinator.coordinator().records());
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
@ -2680,7 +2680,7 @@ public class CoordinatorRuntimeTest {
assertTrue(processor.poll());
// Verify that no operation was executed.
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Set.of(), ctx.coordinator.coordinator().records());
assertEquals(0, ctx.timer.size());
}
@ -3010,8 +3010,8 @@ public class CoordinatorRuntimeTest {
startTimeMs + 500,
30,
3000),
Collections.emptyList(),
Collections.emptyList()))
List.of(),
List.of()))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
@ -3127,8 +3127,8 @@ public class CoordinatorRuntimeTest {
1500,
30,
3000),
Collections.emptyList(),
Collections.emptyList()))
List.of(),
List.of()))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
@ -3569,7 +3569,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Write #2 with one record.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
@ -3588,7 +3588,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Write #3 with one record. This one cannot go into the existing batch
// so the existing batch should be flushed and a new one should be created.
@ -3758,7 +3758,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Write #4. This write cannot make it in the current batch. So the current batch
// is flushed. It will fail. So we expect all writes to fail.
@ -3776,8 +3776,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(), writer.entries(TP));
}
@Test
@ -3860,7 +3860,7 @@ public class CoordinatorRuntimeTest {
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Write #2. It should fail.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
@ -3874,8 +3874,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(), writer.entries(TP));
}
@Test
@ -3921,9 +3921,9 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1"), ctx.coordinator.coordinator().records());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Transactional write #2 with one record. This will flush the current batch.
CompletableFuture<String> write2 = runtime.scheduleTransactionalWriteOperation(
@ -3989,7 +3989,7 @@ public class CoordinatorRuntimeTest {
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1", "record#2", "record#3"), ctx.coordinator.coordinator().records());
assertEquals(List.of(
records(timer.time().milliseconds(), "record#1"),
@ -4168,7 +4168,7 @@ public class CoordinatorRuntimeTest {
// Schedule a write operation that does not generate any records.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.emptyList(), "response1"));
state -> new CoordinatorResult<>(List.of(), "response1"));
// The write operation should not be done.
assertFalse(write.isDone());
@ -4356,7 +4356,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Let's write the 4th record which is too large. This will flush the current
// pending batch, allocate a new batch, and put the record into it.
@ -4454,7 +4454,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
// Write #4. This write cannot make it in the current batch. So the current batch
// is flushed. It will fail. So we expect all writes to fail.
@ -4472,8 +4472,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(), writer.entries(TP));
}
@Test
@ -4516,7 +4516,7 @@ public class CoordinatorRuntimeTest {
// Write #2, with no records.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.emptyList(), "response2"));
state -> new CoordinatorResult<>(List.of(), "response2"));
// Write #2 should not be attached to the empty batch.
assertTrue(write2.isDone());
@ -4601,7 +4601,7 @@ public class CoordinatorRuntimeTest {
);
// Verify the state. Records are replayed but no batch written.
assertEquals(Collections.emptyList(), writer.entries(TP));
assertEquals(List.of(), writer.entries(TP));
verify(runtimeMetrics, times(0)).recordFlushTime(10);
// Write #3 with one record. This one cannot go into the existing batch
@ -4779,7 +4779,7 @@ public class CoordinatorRuntimeTest {
// Records have been written to the log.
long writeTimestamp = timer.time().milliseconds();
assertEquals(Collections.singletonList(
assertEquals(List.of(
records(writeTimestamp, "record1")
), writer.entries(TP));
@ -4923,10 +4923,10 @@ public class CoordinatorRuntimeTest {
(result, exception) -> {
assertEquals("task result", result);
assertNull(exception);
return new CoordinatorResult<>(Collections.singletonList("record2"), null);
return new CoordinatorResult<>(List.of("record2"), null);
}
);
return new CoordinatorResult<>(Collections.singletonList("record1"), "response1");
return new CoordinatorResult<>(List.of("record1"), "response1");
}
);

View File

@ -89,7 +89,7 @@ public class InMemoryPartitionWriter implements PartitionWriter {
@Override
public LogConfig config(TopicPartition tp) {
return new LogConfig(Collections.emptyMap());
return new LogConfig(Map.of());
}
@Override

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -42,11 +42,11 @@ public class KafkaMetricHistogramTest {
);
Set<MetricName> expected = Set.of(
new MetricName("test-metric-max", "test-group", "test description", Collections.emptyMap()),
new MetricName("test-metric-p999", "test-group", "test description", Collections.emptyMap()),
new MetricName("test-metric-p99", "test-group", "test description", Collections.emptyMap()),
new MetricName("test-metric-p95", "test-group", "test description", Collections.emptyMap()),
new MetricName("test-metric-p50", "test-group", "test description", Collections.emptyMap())
new MetricName("test-metric-max", "test-group", "test description", Map.of()),
new MetricName("test-metric-p999", "test-group", "test description", Map.of()),
new MetricName("test-metric-p99", "test-group", "test description", Map.of()),
new MetricName("test-metric-p95", "test-group", "test description", Map.of()),
new MetricName("test-metric-p50", "test-group", "test description", Map.of())
);
Set<MetricName> actual = histogram.stats().stream().map(CompoundStat.NamedMeasurable::name).collect(Collectors.toSet());
assertEquals(expected, actual);

View File

@ -17,7 +17,6 @@
package org.apache.kafka.network;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -37,21 +36,21 @@ public class ConnectionQuotaEntity {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + listenerName,
CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
Collections.singletonMap("listener", listenerName));
Map.of("listener", listenerName));
}
public static ConnectionQuotaEntity brokerQuotaEntity() {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME,
"broker-" + ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
Collections.emptyMap());
Map.of());
}
public static ConnectionQuotaEntity ipQuotaEntity(InetAddress ip) {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + ip.getHostAddress(),
CONNECTION_RATE_METRIC_NAME,
TimeUnit.HOURS.toSeconds(1),
Collections.singletonMap(IP_METRIC_TAG, ip.getHostAddress()));
Map.of(IP_METRIC_TAG, ip.getHostAddress()));
}
private final String sensorName;

View File

@ -24,7 +24,6 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -87,7 +86,7 @@ public class RequestMetrics {
public RequestMetrics(String name) {
this.name = name;
tags = Collections.singletonMap("request", name);
tags = Map.of("request", name);
// time a request spent in a request queue
requestQueueTimeHist = metricsGroup.newHistogram(REQUEST_QUEUE_TIME_MS, true, tags);
// time a request takes to be processed at the local broker

View File

@ -23,7 +23,6 @@ import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@ -62,7 +61,7 @@ public class AclEntry {
case TRANSACTIONAL_ID:
return new HashSet<>(Arrays.asList(DESCRIBE, WRITE));
case DELEGATION_TOKEN:
return Collections.singleton(DESCRIBE);
return Set.of(DESCRIBE);
case USER:
return new HashSet<>(Arrays.asList(CREATE_TOKENS, DESCRIBE_TOKENS));
default:

View File

@ -46,7 +46,6 @@ import com.yammer.metrics.core.MetricsRegistry;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -181,7 +180,7 @@ public final class AssignmentsManager {
this.directoryIdToDescription = directoryIdToDescription;
this.metadataImageSupplier = metadataImageSupplier;
this.ready = new ConcurrentHashMap<>();
this.inflight = Collections.emptyMap();
this.inflight = Map.of();
this.metricsRegistry = metricsRegistry;
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
@Override
@ -363,7 +362,7 @@ public final class AssignmentsManager {
Map<TopicIdPartition, Assignment> sent,
Optional<ClientResponse> assignmentResponse
) {
inflight = Collections.emptyMap();
inflight = Map.of();
Optional<String> globalResponseError = globalResponseError(assignmentResponse);
if (globalResponseError.isPresent()) {
previousGlobalFailures++;

View File

@ -596,7 +596,7 @@ public class ClientMetricsManager implements AutoCloseable {
Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST);
unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(),
ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, Collections.emptyMap()));
ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, Map.of()));
sensorsName.add(unknownSubscriptionRequestCountSensor.name());
}
@ -607,7 +607,7 @@ public class ClientMetricsManager implements AutoCloseable {
return;
}
Map<String, String> tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString());
Map<String, String> tags = Map.of(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString());
Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId);
throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags));

View File

@ -23,7 +23,7 @@ import org.apache.kafka.image.MetadataProvenance;
import com.yammer.metrics.core.Histogram;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -36,7 +36,7 @@ public final class BrokerServerMetrics implements AutoCloseable {
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "BrokerMetadataListener");
private final com.yammer.metrics.core.MetricName batchProcessingTimeHistName =
metricsGroup.metricName("MetadataBatchProcessingTimeUs", Collections.emptyMap());
metricsGroup.metricName("MetadataBatchProcessingTimeUs", Map.of());
/**
* A histogram tracking the time in microseconds it took to process batches of events.
@ -45,7 +45,7 @@ public final class BrokerServerMetrics implements AutoCloseable {
.newHistogram(batchProcessingTimeHistName, true);
private final com.yammer.metrics.core.MetricName batchSizeHistName =
metricsGroup.metricName("MetadataBatchSizes", Collections.emptyMap());
metricsGroup.metricName("MetadataBatchSizes", Map.of());
/**
* A histogram tracking the sizes of batches that we have processed.

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -99,9 +98,9 @@ public class ClientMetricsConfigs extends AbstractConfig {
));
private static final ConfigDef CONFIG = new ConfigDef()
.define(SUBSCRIPTION_METRICS, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Subscription metrics list")
.define(SUBSCRIPTION_METRICS, Type.LIST, List.of(), Importance.MEDIUM, "Subscription metrics list")
.define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS, Importance.MEDIUM, "Push interval in milliseconds")
.define(CLIENT_MATCH_PATTERN, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Client match pattern list");
.define(CLIENT_MATCH_PATTERN, Type.LIST, List.of(), Importance.MEDIUM, "Client match pattern list");
public ClientMetricsConfigs(Properties props) {
super(CONFIG, props);
@ -165,7 +164,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
*/
public static Map<String, Pattern> parseMatchingPatterns(List<String> patterns) {
if (patterns == null || patterns.isEmpty()) {
return Collections.emptyMap();
return Map.of();
}
Map<String, Pattern> patternsMap = new HashMap<>();

View File

@ -27,8 +27,8 @@ import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
/**
* The share fetch context for a final share fetch request.
@ -55,7 +55,7 @@ public class FinalContext extends ShareFetchContext {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
log.debug("Final context returning {}", partitionsToLogString(updates.keySet()));
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
updates.entrySet().iterator(), Collections.emptyList()));
updates.entrySet().iterator(), List.of()));
}
@Override

View File

@ -27,6 +27,7 @@ import org.apache.kafka.server.share.session.ShareSession;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
/**
* The context for every share fetch request. The context is responsible for tracking the topic partitions present in
@ -50,7 +51,7 @@ public abstract class ShareFetchContext {
*/
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
Collections.emptyIterator(), Collections.emptyList()));
Collections.emptyIterator(), List.of()));
}
/**

View File

@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
@ -103,7 +104,7 @@ public class ShareSessionContext extends ShareFetchContext {
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
if (!isSubsequent) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
Collections.emptyIterator(), Collections.emptyList()));
Collections.emptyIterator(), List.of()));
}
int expectedEpoch = ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@ -114,10 +115,10 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session {} expected epoch {}, but got {}. " +
"Possible duplicate request.", session.key(), expectedEpoch, sessionEpoch);
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
throttleTimeMs, Collections.emptyIterator(), Collections.emptyList()));
throttleTimeMs, Collections.emptyIterator(), List.of()));
}
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
Collections.emptyIterator(), Collections.emptyList()));
Collections.emptyIterator(), List.of()));
}
/**
@ -196,7 +197,7 @@ public class ShareSessionContext extends ShareFetchContext {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
if (!isSubsequent) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(
Errors.NONE, 0, updates.entrySet().iterator(), Collections.emptyList()));
Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
} else {
int expectedEpoch = ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@ -207,7 +208,7 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session {} expected epoch {}, but got {}. Possible duplicate request.",
session.key(), expectedEpoch, sessionEpoch);
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
0, Collections.emptyIterator(), Collections.emptyList()));
0, Collections.emptyIterator(), List.of()));
}
// Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partitionIterator = new PartitionIterator(
@ -218,7 +219,7 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session context with session key {} returning {}", session.key(),
partitionsToLogString(updates.keySet()));
return new ShareFetchResponse(ShareFetchResponse.toMessage(
Errors.NONE, 0, updates.entrySet().iterator(), Collections.emptyList()));
Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
}
}

View File

@ -36,7 +36,6 @@ import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -70,7 +69,7 @@ public class RequestConvertToJsonTest {
}
}
}
assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled request keys");
assertEquals(List.of(), unhandledKeys, "Unhandled request keys");
}
@Test
@ -116,7 +115,7 @@ public class RequestConvertToJsonTest {
}
}
}
assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled response keys");
assertEquals(List.of(), unhandledKeys, "Unhandled response keys");
}
@Test

View File

@ -58,7 +58,6 @@ import org.slf4j.LoggerFactory;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@ -490,25 +489,25 @@ public class AssignmentsManagerTest {
setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_1).
setPartitions(Collections.singletonList(
setPartitions(List.of(
new AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(2))),
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_2).
setPartitions(Collections.singletonList(
setPartitions(List.of(
new AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(5))))),
new AssignReplicasToDirsRequestData.DirectoryData().
setId(DIR_3).
setTopics(Collections.singletonList(
setTopics(List.of(
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_1).
setPartitions(Collections.singletonList(
setPartitions(List.of(
new AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(3))))),
new AssignReplicasToDirsRequestData.DirectoryData().
setId(DIR_1).
setTopics(Collections.singletonList(
setTopics(List.of(
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_1).
setPartitions(Arrays.asList(

View File

@ -25,7 +25,6 @@ import org.apache.kafka.image.MetadataProvenance;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
@ -43,12 +42,12 @@ public final class BrokerServerMetricsTest {
// Metric description is not use for metric name equality
Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
new MetricName("last-applied-record-offset", expectedGroup, "", Collections.emptyMap()),
new MetricName("last-applied-record-timestamp", expectedGroup, "", Collections.emptyMap()),
new MetricName("last-applied-record-lag-ms", expectedGroup, "", Collections.emptyMap()),
new MetricName("metadata-load-error-count", expectedGroup, "", Collections.emptyMap()),
new MetricName("metadata-apply-error-count", expectedGroup, "", Collections.emptyMap()),
new MetricName("ignored-static-voters", expectedGroup, "", Collections.emptyMap())
new MetricName("last-applied-record-offset", expectedGroup, "", Map.of()),
new MetricName("last-applied-record-timestamp", expectedGroup, "", Map.of()),
new MetricName("last-applied-record-lag-ms", expectedGroup, "", Map.of()),
new MetricName("metadata-load-error-count", expectedGroup, "", Map.of()),
new MetricName("metadata-apply-error-count", expectedGroup, "", Map.of()),
new MetricName("ignored-static-voters", expectedGroup, "", Map.of())
));
try (BrokerServerMetrics ignored = new BrokerServerMetrics(metrics)) {

View File

@ -22,7 +22,6 @@ import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
@ -37,25 +36,25 @@ public class ClientMetricsInstanceMetadataTest {
Uuid uuid = Uuid.randomUuid();
ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, ClientMetricsTestUtils.requestContext());
// We consider empty/missing client matching patterns as valid
assertTrue(instanceMetadata.isMatch(Collections.emptyMap()));
assertTrue(instanceMetadata.isMatch(Map.of()));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, Pattern.compile(".*"))));
Map.of(ClientMetricsConfigs.CLIENT_ID, Pattern.compile(".*"))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer-1"))));
Map.of(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer-1"))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer.*"))));
Map.of(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer.*"))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid.toString()))));
Map.of(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid.toString()))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-java"))));
Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-java"))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.2"))));
Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.2"))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile(
Map.of(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile(
InetAddress.getLocalHost().getHostAddress()))));
assertTrue(instanceMetadata.isMatch(
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_PORT, Pattern.compile(
Map.of(ClientMetricsConfigs.CLIENT_SOURCE_PORT, Pattern.compile(
String.valueOf(ClientMetricsTestUtils.CLIENT_PORT)))));
}
@ -125,9 +124,9 @@ public class ClientMetricsInstanceMetadataTest {
ClientMetricsTestUtils.requestContext());
// Unknown key in pattern map
assertFalse(instanceMetadata.isMatch(Collections.singletonMap("unknown", Pattern.compile(".*"))));
assertFalse(instanceMetadata.isMatch(Map.of("unknown", Pattern.compile(".*"))));
// '*' key is considered as invalid regex pattern
assertFalse(instanceMetadata.isMatch(Collections.singletonMap("*", Pattern.compile(".*"))));
assertFalse(instanceMetadata.isMatch(Map.of("*", Pattern.compile(".*"))));
}
@Test
@ -136,9 +135,9 @@ public class ClientMetricsInstanceMetadataTest {
ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
ClientMetricsTestUtils.requestContextWithNullClientInfo());
assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
assertFalse(instanceMetadata.isMatch(Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
Pattern.compile(".*"))));
assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
assertFalse(instanceMetadata.isMatch(Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
Pattern.compile(".*"))));
}
}

View File

@ -24,8 +24,8 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -61,7 +61,7 @@ public class ShareSessionCacheTest {
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
assertNotNull(key1);
assertShareCacheContains(cache, new ArrayList<>(Collections.singletonList(key1)));
assertShareCacheContains(cache, new ArrayList<>(List.of(key1)));
ShareSession session1 = cache.get(key1);
assertEquals(2, session1.size());
assertEquals(2, cache.totalPartitions());
@ -82,7 +82,7 @@ public class ShareSessionCacheTest {
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.remove(key1);
assertShareCacheContains(cache, new ArrayList<>(Collections.singletonList(key2)));
assertShareCacheContains(cache, new ArrayList<>(List.of(key2)));
assertEquals(1, cache.size());
assertEquals(4, cache.totalPartitions());

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -46,10 +45,10 @@ public class ShareSessionTest {
@Test
public void testPartitionsToLogStringEmpty() {
String response = ShareSession.partitionsToLogString(Collections.emptyList(), false);
String response = ShareSession.partitionsToLogString(List.of(), false);
assertEquals("0 partition(s)", response);
response = ShareSession.partitionsToLogString(Collections.emptyList(), true);
response = ShareSession.partitionsToLogString(List.of(), true);
assertEquals("( [] )", response);
}
}