MINOR: Cleanup Storage Module (#19072)

Given that now we support Java 17 on our brokers, this PR replace the
use of the following in storage module:

- Collections.singletonList() and Collections.emptyList() with List.of()
- Collections.singletonMap() and Collections.emptyMap() with Map.of()
- Collections.singleton() and Collections.emptySet() with Set.of()
- Arrays.asList() with List.of()

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-04-03 23:45:58 +05:30 committed by GitHub
parent db4e74b46e
commit 03b1b720e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
81 changed files with 320 additions and 407 deletions

View File

@ -85,11 +85,11 @@ class ConsumerTask implements Runnable, Closeable {
private final Object assignPartitionsLock = new Object();
// Remote log metadata topic partitions that consumer is assigned to.
private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet();
private volatile Set<Integer> assignedMetadataPartitions = Set.of();
// User topic partitions that this broker is a leader/follower for.
private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap();
private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Map.of();
private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Set.of();
private long uninitializedAt;
private boolean isAllUserTopicPartitionsInitialized;
@ -299,11 +299,11 @@ class ConsumerTask implements Runnable, Closeable {
}
void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
updateAssignments(Objects.requireNonNull(partitions), Collections.emptySet());
updateAssignments(Objects.requireNonNull(partitions), Set.of());
}
void removeAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
updateAssignments(Collections.emptySet(), Objects.requireNonNull(partitions));
updateAssignments(Set.of(), Objects.requireNonNull(partitions));
}
private void updateAssignments(final Set<TopicIdPartition> addedPartitions,

View File

@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
@ -154,8 +153,8 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
// Clear the entries by creating unmodifiable empty maps.
// Practically, we do not use the same instances that are closed.
idToPartitionDeleteMetadata = Collections.emptyMap();
idToRemoteLogMetadataCache = Collections.emptyMap();
idToPartitionDeleteMetadata = Map.of();
idToRemoteLogMetadataCache = Map.of();
}
@Override

View File

@ -58,7 +58,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
@ -471,7 +470,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
boolean doesTopicExist(Admin adminClient, String topic) {
try {
TopicDescription description = adminClient.describeTopics(Collections.singleton(topic))
TopicDescription description = adminClient.describeTopics(Set.of(topic))
.topicNameValues()
.get(topic)
.get();
@ -491,7 +490,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
String topicName) throws InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and replication factor.");
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName))
TopicDescription topicDescription = adminClient.describeTopics(Set.of(topicName))
.topicNameValues().get(topicName).get();
int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
int topicPartitionsSize = topicDescription.partitions().size();
@ -525,14 +524,14 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
try {
doesTopicExist = doesTopicExist(adminClient, topic);
if (!doesTopicExist) {
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
CreateTopicsResult result = adminClient.createTopics(Set.of(newTopic));
result.all().get();
List<String> overriddenConfigs = result.config(topic).get()
.entries()
.stream()
.filter(entry -> !entry.isDefault())
.map(entry -> entry.name() + "=" + entry.value())
.collect(Collectors.toList());
.toList();
log.info("Topic {} created. TopicId: {}, numPartitions: {}, replicationFactor: {}, config: {}",
topic, result.topicId(topic).get(), result.numPartitions(topic).get(),
result.replicationFactor(topic).get(), overriddenConfigs);

View File

@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
@ -52,7 +51,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetad
.map(entry -> new RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry()
.setLeaderEpoch(entry.getKey())
.setOffset(entry.getValue()))
.collect(Collectors.toList());
.toList();
}
@Override

View File

@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
@ -56,7 +55,7 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans
.map(entry -> new RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry()
.setLeaderEpoch(entry.getKey())
.setOffset(entry.getValue()))
.collect(Collectors.toList());
.toList();
}
private RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadata data) {

View File

@ -32,7 +32,6 @@ import org.apache.kafka.server.quota.SensorAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -102,7 +101,7 @@ public class RLMQuotaManager {
}
private MetricName metricName() {
return metrics.metricName("byte-rate", quotaType.toString(), description, Collections.emptyMap());
return metrics.metricName("byte-rate", quotaType.toString(), description, Map.of());
}
private Sensor sensor() {

View File

@ -454,7 +454,7 @@ public final class RemoteLogManagerConfig {
public Map<String, Object> getConfigProps(String configPrefixProp) {
String prefixProp = config.getString(configPrefixProp);
return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp));
return prefixProp == null ? Map.of() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp));
}
public int remoteLogManagerCopyNumQuotaSamples() {

View File

@ -26,7 +26,6 @@ import org.slf4j.Logger;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -406,7 +405,7 @@ public final class LeaderEpochFileCache {
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), x -> x.startOffset >= endOffset);
}
return Collections.emptyList();
return List.of();
}
public OptionalInt epochForOffset(long offset) {

View File

@ -36,7 +36,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -51,7 +50,6 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.require;
import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
@ -433,11 +431,11 @@ public class LocalLog {
config.preallocate);
segments.add(newSegment);
reason.logReason(singletonList(segmentToDelete));
reason.logReason(List.of(segmentToDelete));
if (newOffset != segmentToDelete.baseOffset()) {
segments.remove(segmentToDelete.baseOffset());
}
deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
deleteSegmentFiles(List.of(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
return newSegment;
}
@ -619,7 +617,7 @@ public class LocalLog {
File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset);
File timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset);
File txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset);
for (File file : Arrays.asList(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) {
for (File file : List.of(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) {
if (file.exists()) {
logger.warn("Newly rolled segment file {} already exists; deleting it first", file.getAbsolutePath());
Files.delete(file.toPath());
@ -791,7 +789,7 @@ public class LocalLog {
private static FetchDataInfo emptyFetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, boolean includeAbortedTxns) {
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions = includeAbortedTxns
? Optional.of(Collections.emptyList())
? Optional.of(List.of())
: Optional.empty();
return new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, false, abortedTransactions);
}
@ -943,7 +941,7 @@ public class LocalLog {
}
// replace old segment with new ones
LOG.info("{}Replacing overflowed segment {} with split segments {}", logPrefix, segment, newSegments);
List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, singletonList(segment),
List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, List.of(segment),
dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false);
return new SplitSegmentResult(deletedSegments, newSegments);
} catch (Exception e) {
@ -1035,7 +1033,7 @@ public class LocalLog {
existingSegments.remove(segment.baseOffset());
}
deleteSegmentFiles(
singletonList(segment),
List.of(segment),
true,
dir,
topicPartition,

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordValidationStats;
import org.apache.kafka.common.requests.ProduceResponse.RecordError;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -74,7 +73,7 @@ public class LogAppendInfo {
int validBytes,
long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset,
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(),
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, List.of(),
LeaderHwChange.NONE);
}

View File

@ -48,7 +48,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@ -105,7 +104,7 @@ public class LogConfig extends AbstractConfig {
@Override
public List<String> headers() {
return asList("Name", "Description", "Type", "Default", "Valid Values", SERVER_DEFAULT_HEADER_NAME, "Importance");
return List.of("Name", "Description", "Type", "Default", "Valid Values", SERVER_DEFAULT_HEADER_NAME, "Importance");
}
// Visible for testing
@ -300,7 +299,7 @@ public class LogConfig extends AbstractConfig {
private final Map<?, ?> props;
public LogConfig(Map<?, ?> props) {
this(props, Collections.emptySet());
this(props, Set.of());
}
@SuppressWarnings({"this-escape"})
@ -326,11 +325,11 @@ public class LogConfig extends AbstractConfig {
this.minCleanableRatio = getDouble(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG);
this.compact = getList(TopicConfig.CLEANUP_POLICY_CONFIG).stream()
.map(c -> c.toLowerCase(Locale.ROOT))
.collect(Collectors.toList())
.toList()
.contains(TopicConfig.CLEANUP_POLICY_COMPACT);
this.delete = getList(TopicConfig.CLEANUP_POLICY_CONFIG).stream()
.map(c -> c.toLowerCase(Locale.ROOT))
.collect(Collectors.toList())
.toList()
.contains(TopicConfig.CLEANUP_POLICY_DELETE);
this.uncleanLeaderElectionEnable = getBoolean(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG);
this.minInSyncReplicas = getInt(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
@ -447,7 +446,7 @@ public class LogConfig extends AbstractConfig {
}
public static List<String> configNames() {
return CONFIG.names().stream().sorted().collect(Collectors.toList());
return CONFIG.names().stream().sorted().toList();
}
public static Optional<String> serverConfigName(String configName) {
@ -610,7 +609,7 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
public static void validate(Properties props) {
validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
validate(Map.of(), props, Map.of(), false);
}
public static void validate(Map<String, String> existingConfigs,

View File

@ -43,6 +43,7 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.attribute.FileTime;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Callable;
@ -50,7 +51,6 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing
@ -772,7 +772,7 @@ public class LogSegment implements Closeable {
*/
public void deleteIfExists() throws IOException {
try {
Utils.tryAll(asList(
Utils.tryAll(List.of(
() -> deleteTypeIfExists(() -> log.deleteIfExists(), "log", log.file(), true),
() -> deleteTypeIfExists(() -> lazyOffsetIndex.deleteIfExists(), "offset index", offsetIndexFile(), true),
() -> deleteTypeIfExists(() -> lazyTimeIndex.deleteIfExists(), "time index", timeIndexFile(), true),

View File

@ -22,14 +22,13 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* This class encapsulates a thread-safe navigable map of LogSegment instances and provides the
@ -141,7 +140,7 @@ public class LogSegments implements Closeable {
* @return the base offsets of all segments
*/
public Collection<Long> baseOffsets() {
return values().stream().map(LogSegment::baseOffset).collect(Collectors.toList());
return values().stream().map(LogSegment::baseOffset).toList();
}
/**
@ -182,7 +181,7 @@ public class LogSegments implements Closeable {
public Collection<LogSegment> values(long from, long to) {
if (from == to) {
// Handle non-segment-aligned empty sets
return Collections.emptyList();
return List.of();
} else if (to < from) {
throw new IllegalArgumentException("Invalid log segment range: requested segments in " + topicPartition +
" from offset " + from + " which is greater than limit offset " + to);
@ -197,7 +196,7 @@ public class LogSegments implements Closeable {
public Collection<LogSegment> nonActiveLogSegmentsFrom(long from) {
LogSegment activeSegment = lastSegment().get();
if (from > activeSegment.baseOffset())
return Collections.emptyList();
return List.of();
else
return values(from, activeSegment.baseOffset());
}
@ -314,7 +313,7 @@ public class LogSegments implements Closeable {
Long higherOffset = segments.higherKey(baseOffset);
if (higherOffset != null)
return segments.tailMap(higherOffset, true).values();
return Collections.emptyList();
return List.of();
}
/**
@ -334,7 +333,7 @@ public class LogSegments implements Closeable {
* @param predicate the predicate to be used for filtering segments.
*/
public Collection<LogSegment> filter(Predicate<LogSegment> predicate) {
return values().stream().filter(predicate).collect(Collectors.toList());
return values().stream().filter(predicate).toList();
}
/**

View File

@ -45,7 +45,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class LogValidator {
@ -612,7 +611,7 @@ public class LogValidator {
private static void processRecordErrors(List<ApiRecordError> recordErrors) {
if (!recordErrors.isEmpty()) {
List<RecordError> errors = recordErrors.stream().map(e -> e.recordError).collect(Collectors.toList());
List<RecordError> errors = recordErrors.stream().map(e -> e.recordError).toList();
if (recordErrors.stream().anyMatch(e -> e.apiError == Errors.INVALID_TIMESTAMP)) {
throw new RecordValidationException(new InvalidTimestampException(
"One or more records have been rejected due to invalid timestamp"), errors);

View File

@ -49,7 +49,6 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@ -701,10 +700,10 @@ public class ProducerStateManager {
if (dir.exists() && dir.isDirectory()) {
try (Stream<Path> paths = Files.list(dir.toPath())) {
return paths.filter(ProducerStateManager::isSnapshotFile)
.map(path -> new SnapshotFile(path.toFile())).collect(Collectors.toList());
.map(path -> new SnapshotFile(path.toFile())).toList();
}
} else {
return Collections.emptyList();
return List.of();
}
}

View File

@ -44,7 +44,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -318,7 +317,7 @@ public class RemoteIndexCache implements Closeable {
internalCache.put(uuid, entry);
} else {
// Delete all of them if any one of those indexes is not available for a specific segment id
tryAll(Arrays.asList(
tryAll(List.of(
() -> {
Files.deleteIfExists(offsetIndexFile.toPath());
return null;
@ -598,7 +597,7 @@ public class RemoteIndexCache implements Closeable {
if (!cleanStarted) {
cleanStarted = true;
List<StorageAction<Void, Exception>> actions = Arrays.asList(() -> {
List<StorageAction<Void, Exception>> actions = List.of(() -> {
offsetIndex.deleteIfExists();
return null;
}, () -> {

View File

@ -20,9 +20,6 @@ import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigException;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
public class ThrottledReplicaListValidator implements Validator {
public static final Validator INSTANCE = new ThrottledReplicaListValidator();
@ -30,13 +27,13 @@ public class ThrottledReplicaListValidator implements Validator {
private ThrottledReplicaListValidator() { }
public static void ensureValidString(String name, String value) {
INSTANCE.ensureValid(name, asList(value.split(",")));
INSTANCE.ensureValid(name, List.of(value.split(",")));
}
@Override
public void ensureValid(String name, Object value) {
if (value instanceof java.util.List<?>) {
List<String> proposed = ((List<?>) value).stream().map(element -> element.toString().trim()).collect(Collectors.toList());
List<String> proposed = ((List<?>) value).stream().map(element -> element.toString().trim()).toList();
if (!(proposed.stream().allMatch(s -> s.matches("([0-9]+:[0-9]+)?"))
|| String.join("", proposed).equals("*")))
throw new ConfigException(name, value, name +

View File

@ -28,7 +28,6 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -229,7 +228,7 @@ public class TransactionIndex implements Closeable {
private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) {
FileChannel channel = channelOrNull();
if (channel == null)
return Collections.emptyList();
return List.of();
PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);

View File

@ -21,7 +21,6 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Meter;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -71,7 +70,7 @@ public final class BrokerTopicMetrics {
}
private BrokerTopicMetrics(Optional<String> name, boolean remoteStorageEnabled) {
this.tags = name.map(s -> Collections.singletonMap("topic", s)).orElse(Collections.emptyMap());
this.tags = name.map(s -> Map.of("topic", s)).orElse(Map.of());
metricTypeMap.put(MESSAGE_IN_PER_SEC, new MeterWrapper(MESSAGE_IN_PER_SEC, "messages"));
metricTypeMap.put(BYTES_IN_PER_SEC, new MeterWrapper(BYTES_IN_PER_SEC, "bytes"));

View File

@ -36,9 +36,9 @@ import org.apache.kafka.common.test.api.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@ -108,7 +108,7 @@ public class LogAppendTimeTest {
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(
Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name())
)) {
consumer.subscribe(Collections.singleton(TOPIC));
consumer.subscribe(Set.of(TOPIC));
ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

View File

@ -40,7 +40,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -161,7 +160,7 @@ public class ConsumerTaskTest {
consumerTask.ingestRecords();
assertTrue(consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent());
final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId);
final Set<TopicIdPartition> removePartitions = Set.of(tpId);
consumerTask.removeAssignmentsForPartitions(removePartitions);
consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : allPartitions) {
@ -196,7 +195,7 @@ public class ConsumerTaskTest {
fail(e.getMessage());
}
}
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
consumerTask.addAssignmentsForPartitions(Set.of(partition));
partitionsAssigned++;
}
isAllPartitionsAssigned.set(true);
@ -230,8 +229,8 @@ public class ConsumerTaskTest {
assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId2));
final int metadataPartition = partitioner.metadataPartition(tpId0);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L));
final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(metadataPartition), 0L));
final Set<TopicIdPartition> assignments = Set.of(tpId0);
consumerTask.addAssignmentsForPartitions(assignments);
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId0), "Partition " + tpId0 + " has not been assigned");
@ -243,7 +242,7 @@ public class ConsumerTaskTest {
assertEquals(2, handler.metadataCounter);
// should only read the tpId1 records
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
consumerTask.addAssignmentsForPartitions(Set.of(tpId1));
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId1), "Partition " + tpId1 + " has not been assigned");
@ -271,9 +270,9 @@ public class ConsumerTaskTest {
final int metadataPartition = partitioner.metadataPartition(tpId0);
final int anotherMetadataPartition = partitioner.metadataPartition(tpId3);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition), 0L));
final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(metadataPartition), 0L));
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(anotherMetadataPartition), 0L));
final Set<TopicIdPartition> assignments = Set.of(tpId0);
consumerTask.addAssignmentsForPartitions(assignments);
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId0), "Partition " + tpId0 + " has not been assigned");
@ -288,7 +287,7 @@ public class ConsumerTaskTest {
assertEquals(1, handler.metadataCounter);
// Adding assignment for tpId1 after related metadata records have already been read
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
consumerTask.addAssignmentsForPartitions(Set.of(tpId1));
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId1), "Partition " + tpId1 + " has not been assigned");
@ -312,8 +311,8 @@ public class ConsumerTaskTest {
public void testMaybeMarkUserPartitionsAsReady() {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 2L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(metadataPartition), 2L));
consumerTask.addAssignmentsForPartitions(Set.of(tpId));
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned");
@ -330,9 +329,9 @@ public class ConsumerTaskTest {
public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, long endOffset) {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), beginOffset));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), endOffset));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
consumer.updateBeginningOffsets(Map.of(toRemoteLogPartition(metadataPartition), beginOffset));
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(metadataPartition), endOffset));
consumerTask.addAssignmentsForPartitions(Set.of(tpId));
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned");
@ -350,11 +349,11 @@ public class ConsumerTaskTest {
final CountDownLatch latch = new CountDownLatch(1);
final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)), 0L));
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(partitioner.metadataPartition(tpId)), 0L));
final Thread assignmentThread = new Thread(() -> {
try {
latch.await();
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
consumerTask.addAssignmentsForPartitions(Set.of(tpId));
} catch (final InterruptedException e) {
fail("Shouldn't have thrown an exception");
}
@ -382,8 +381,8 @@ public class ConsumerTaskTest {
public void testConsumerShouldNotCloseOnRetriableError() {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(metadataPartition), 1L));
consumerTask.addAssignmentsForPartitions(Set.of(tpId));
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned");
@ -406,8 +405,8 @@ public class ConsumerTaskTest {
public void testConsumerShouldCloseOnNonRetriableError() {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
consumer.updateEndOffsets(Map.of(toRemoteLogPartition(metadataPartition), 1L));
consumerTask.addAssignmentsForPartitions(Set.of(tpId));
consumerTask.ingestRecords();
assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned");
@ -425,7 +424,7 @@ public class ConsumerTaskTest {
final TopicIdPartition idPartition,
final long recordOffset) {
final RemoteLogSegmentId segmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid());
final RemoteLogMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L));
final RemoteLogMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Map.of(0, 0L));
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME, metadataPartition, recordOffset, null, serde.serialize(metadata));
consumer.addRecord(record);
}

View File

@ -26,7 +26,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundExceptio
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -47,7 +46,7 @@ class RemoteLogLeaderEpochStateTest {
@Test
void testListAllRemoteLogSegmentsOnEmpty() throws RemoteResourceNotFoundException {
assertFalse(epochState.listAllRemoteLogSegments(Collections.emptyMap()).hasNext());
assertFalse(epochState.listAllRemoteLogSegments(Map.of()).hasNext());
}
@Test
@ -84,7 +83,7 @@ class RemoteLogLeaderEpochStateTest {
segmentIdToMetadataMap.put(segmentId4, createRemoteLogSegmentMetadata(segmentId4, 9L));
// segments should be sorted by start-offset
List<RemoteLogSegmentId> expectedList = Arrays.asList(segmentId1, segmentId2, segmentId4, segmentId3);
List<RemoteLogSegmentId> expectedList = List.of(segmentId1, segmentId2, segmentId4, segmentId3);
List<RemoteLogSegmentId> actualList = new ArrayList<>();
epochState.listAllRemoteLogSegments(segmentIdToMetadataMap)
.forEachRemaining(metadata -> actualList.add(metadata.remoteLogSegmentId()));
@ -142,7 +141,7 @@ class RemoteLogLeaderEpochStateTest {
assertEquals(1, epochState.referencedSegmentIds().size());
assertEquals(segmentId4, epochState.floorEntry(11L));
assertEquals(3, epochState.unreferencedSegmentIds().size());
assertTrue(epochState.unreferencedSegmentIds().containsAll(Arrays.asList(segmentId1, segmentId2, segmentId3)));
assertTrue(epochState.unreferencedSegmentIds().containsAll(List.of(segmentId1, segmentId2, segmentId3)));
assertEquals(200L, epochState.highestLogOffset());
}
@ -184,7 +183,7 @@ class RemoteLogLeaderEpochStateTest {
epochState.handleSegmentWithDeleteSegmentStartedState(101L, segmentId2);
assertTrue(epochState.referencedSegmentIds().isEmpty());
assertEquals(2, epochState.unreferencedSegmentIds().size());
assertTrue(epochState.unreferencedSegmentIds().containsAll(Arrays.asList(segmentId1, segmentId2)));
assertTrue(epochState.unreferencedSegmentIds().containsAll(List.of(segmentId1, segmentId2)));
}
@Test

View File

@ -32,8 +32,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -60,7 +60,7 @@ public class RemoteLogMetadataCacheTest {
if (state != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0, 100L,
-1L, brokerId0, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L));
-1L, brokerId0, time.milliseconds(), segmentSize, Map.of(0, 0L));
RemoteLogSegmentMetadata updatedMetadata = segmentMetadata.createWithUpdates(
new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), Optional.empty(),
state, brokerId1));
@ -102,7 +102,7 @@ public class RemoteLogMetadataCacheTest {
long offset = 10L;
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, offset, 100L,
-1L, brokerId0, time.milliseconds(), segmentSize, Collections.singletonMap(leaderEpoch, offset));
-1L, brokerId0, time.milliseconds(), segmentSize, Map.of(leaderEpoch, offset));
cache.addCopyInProgressSegment(segmentMetadata);
// invalid-transition-1. COPY_SEGMENT_STARTED -> DELETE_SEGMENT_FINISHED

View File

@ -19,7 +19,6 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.test.TestUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@ -46,7 +45,7 @@ public class RemoteLogMetadataManagerTestUtils {
public static class Builder {
private String bootstrapServers;
private boolean startConsumerThread;
private Map<String, Object> overrideRemoteLogMetadataManagerProps = Collections.emptyMap();
private Map<String, Object> overrideRemoteLogMetadataManagerProps = Map.of();
private Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStore = RemotePartitionMetadataStore::new;
private Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new;

View File

@ -32,7 +32,7 @@ import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
@ -68,7 +68,7 @@ public class RemoteLogMetadataTransformTest {
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
time.milliseconds(), 1024, Map.of(0, 0L));
}
@Test

View File

@ -32,13 +32,12 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
@ -78,7 +77,7 @@ public class RemoteLogSegmentLifecycleTest {
@ClusterTest
public void testRemoteLogSegmentLifeCycle() throws Exception {
try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) {
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
metadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// segment 0
// offsets: [0-100]
@ -113,7 +112,7 @@ public class RemoteLogSegmentLifecycleTest {
// offsets: [101 - 200]
// no changes in leadership with in this segment
// leader epochs (2, 101)
Map<Integer, Long> leaderEpochSegment1 = Collections.singletonMap(2, 101L);
Map<Integer, Long> leaderEpochSegment1 = Map.of(2, 101L);
RemoteLogSegmentMetadata metadataSegment1 = upsertSegmentState(metadataManager, leaderEpochSegment1,
101L, 200L, COPY_SEGMENT_FINISHED);
@ -248,12 +247,12 @@ public class RemoteLogSegmentLifecycleTest {
@ClusterTest
public void testCacheSegmentWithCopySegmentStartedState() throws Exception {
try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) {
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
metadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the
// segments.
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L,
-1L, brokerId0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
-1L, brokerId0, time.milliseconds(), segSize, Map.of(0, 0L));
metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
@ -270,11 +269,11 @@ public class RemoteLogSegmentLifecycleTest {
@ClusterTest
public void testCacheSegmentWithCopySegmentFinishedState() throws Exception {
try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) {
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
metadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// Create a segment and move it to state COPY_SEGMENT_FINISHED. and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
metadataManager, Collections.singletonMap(0, 101L), 101L, 200L, COPY_SEGMENT_FINISHED);
metadataManager, Map.of(0, 101L), 101L, 200L, COPY_SEGMENT_FINISHED);
// Search should return the above segment.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 =
@ -289,11 +288,11 @@ public class RemoteLogSegmentLifecycleTest {
@ClusterTest
public void testCacheSegmentWithDeleteSegmentStartedState() throws Exception {
try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) {
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
metadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// Create a segment and move it to state DELETE_SEGMENT_STARTED, and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
metadataManager, Collections.singletonMap(0, 201L), 201L, 300L, DELETE_SEGMENT_STARTED);
metadataManager, Map.of(0, 201L), 201L, 300L, DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader epoch state is cleared.
Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset250Epoch0 =
@ -306,11 +305,11 @@ public class RemoteLogSegmentLifecycleTest {
@ClusterTest
public void testCacheSegmentsWithDeleteSegmentFinishedState() throws Exception {
try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) {
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
metadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// Create a segment and move it to state DELETE_SEGMENT_FINISHED, and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
metadataManager, Collections.singletonMap(0, 301L), 301L, 400L, DELETE_SEGMENT_STARTED);
metadataManager, Map.of(0, 301L), 301L, 400L, DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader epoch state is cleared.
assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 350).isPresent());
@ -330,11 +329,11 @@ public class RemoteLogSegmentLifecycleTest {
@ClusterTest
public void testCacheListSegments() throws Exception {
try (RemoteLogMetadataManager metadataManager = createTopicBasedRemoteLogMetadataManager()) {
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
metadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// Create a few segments and add them to the cache.
RemoteLogSegmentMetadata segment0 = upsertSegmentState(metadataManager, Collections.singletonMap(0, 0L),
RemoteLogSegmentMetadata segment0 = upsertSegmentState(metadataManager, Map.of(0, 0L),
0, 100, COPY_SEGMENT_FINISHED);
RemoteLogSegmentMetadata segment1 = upsertSegmentState(metadataManager, Collections.singletonMap(0, 101L),
RemoteLogSegmentMetadata segment1 = upsertSegmentState(metadataManager, Map.of(0, 101L),
101, 200, COPY_SEGMENT_FINISHED);
Map<Integer, Long> leaderEpochSegment2 = new HashMap<>();
leaderEpochSegment2.put(0, 201L);
@ -343,14 +342,14 @@ public class RemoteLogSegmentLifecycleTest {
201, 400, COPY_SEGMENT_FINISHED);
// listRemoteLogSegments(0) and listAllRemoteLogSegments() should contain all the above segments.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = Arrays.asList(segment0, segment1, segment2);
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = List.of(segment0, segment1, segment2);
assertTrue(TestUtils.sameElementsWithOrder(
expectedSegmentsForEpoch0.iterator(), metadataManager.listRemoteLogSegments(topicIdPartition, 0)));
assertTrue(TestUtils.sameElementsWithoutOrder(
expectedSegmentsForEpoch0.iterator(), metadataManager.listRemoteLogSegments(topicIdPartition)));
// listRemoteLogSegments(1) should contain only segment2.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 = Collections.singletonList(segment2);
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 = List.of(segment2);
assertTrue(TestUtils.sameElementsWithOrder(
expectedSegmentsForEpoch1.iterator(), metadataManager.listRemoteLogSegments(topicIdPartition, 1)));
}

View File

@ -31,10 +31,9 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
@ -63,15 +62,15 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
// Create topics.
String leaderTopic = "leader";
// Set broker id 0 as the first entry which is taken as the leader.
createTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
createTopic(leaderTopic, Map.of(0, List.of(0, 1, 2)));
String followerTopic = "follower";
// Set broker id 1 as the first entry which is taken as the leader.
createTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
createTopic(followerTopic, Map.of(0, List.of(1, 2, 0)));
String topicWithNoMessages = "no-messages-topic";
// Set broker id 1 as the first entry which is taken as the leader.
createTopic(topicWithNoMessages, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
createTopic(topicWithNoMessages, Map.of(0, List.of(1, 2, 0)));
final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
@ -122,7 +121,7 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
int segSize = 1048576;
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
time.milliseconds(), segSize, Map.of(0, 0L));
ExecutionException exception = assertThrows(ExecutionException.class,
() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
@ -130,7 +129,7 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
time.milliseconds(), segSize, Map.of(0, 0L));
exception = assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());
@ -139,8 +138,8 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition));
assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.emptySet());
remoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(leaderTopicIdPartition),
Set.of());
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS); // similar to CountdownLatch::await
@ -158,8 +157,8 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
// Phaser::bulkRegister and Phaser::register provide the "countUp" feature
initializationPhaser.bulkRegister(2); // 1 for emptyTopicIdPartition and 1 for followerTopicIdPartition
handleRemoteLogSegmentMetadataPhaser.register(); // 1 for followerTopicIdPartition, emptyTopicIdPartition doesn't have a RemoteLogSegmentMetadata event
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
Collections.singleton(followerTopicIdPartition));
remoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(emptyTopicIdPartition),
Set.of(followerTopicIdPartition));
initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
@ -173,8 +172,8 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
}
private void createTopic(String topic, Map<Integer, List<Integer>> replicasAssignments) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments)));
try (Admin admin = Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
admin.createTopics(List.of(new NewTopic(topic, replicasAssignments)));
assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size()));
}
}

View File

@ -28,8 +28,9 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.test.TestUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -49,7 +50,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
.bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new)
.overrideRemoteLogMetadataManagerProps(Collections.singletonMap(LOG_DIR, logDir))
.overrideRemoteLogMetadataManagerProps(Map.of(LOG_DIR, logDir))
.build();
}
@ -60,10 +61,10 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
String followerTopic = "new-follower";
try (Admin admin = clusterInstance.admin()) {
// Set broker id 0 as the first entry which is taken as the leader.
NewTopic newLeaderTopic = new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
NewTopic newLeaderTopic = new NewTopic(leaderTopic, Map.of(0, List.of(0, 1, 2)));
// Set broker id 1 as the first entry which is taken as the leader.
NewTopic newFollowerTopic = new NewTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
admin.createTopics(Arrays.asList(newLeaderTopic, newFollowerTopic)).all().get();
NewTopic newFollowerTopic = new NewTopic(followerTopic, Map.of(0, List.of(1, 2, 0)));
admin.createTopics(List.of(newLeaderTopic, newFollowerTopic)).all().get();
}
clusterInstance.waitForTopic(leaderTopic, 1);
clusterInstance.waitForTopic(followerTopic, 1);
@ -74,16 +75,16 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
time.milliseconds(), segSize, Map.of(0, 0L));
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
time.milliseconds(), segSize, Map.of(0, 0L));
try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) {
// Register these partitions to RemoteLogMetadataManager.
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(
Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
Set.of(leaderTopicIdPartition), Set.of(followerTopicIdPartition));
// Add segments for these partitions, but they are not available as they have not yet been subscribed.
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get();
@ -93,27 +94,27 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) {
// Register these partitions to RemoteLogMetadataManager, which loads the respective metadata snapshots.
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(
Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
Set.of(leaderTopicIdPartition), Set.of(followerTopicIdPartition));
// Check for the stored entries from the earlier run.
TestUtils.waitForCondition(() ->
TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(),
TestUtils.sameElementsWithoutOrder(Set.of(leaderSegmentMetadata).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)),
"Remote log segment metadata not available");
TestUtils.waitForCondition(() ->
TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
TestUtils.sameElementsWithoutOrder(Set.of(followerSegmentMetadata).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)),
"Remote log segment metadata not available");
// Add one more segment
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
101, 200, -1L, 0,
time.milliseconds(), segSize, Collections.singletonMap(0, 101L));
time.milliseconds(), segSize, Map.of(0, 101L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
// Check that both the stored segment and recently added segment are available.
assertTrue(TestUtils.sameElementsWithoutOrder(
Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(),
List.of(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
);
}

View File

@ -34,9 +34,9 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -81,7 +81,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
public void testDoesTopicExist() throws ExecutionException, InterruptedException {
try (Admin admin = clusterInstance.admin()) {
String topic = "test-topic-exist";
admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get();
admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1))).all().get();
clusterInstance.waitForTopic(topic, 1);
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
assertTrue(doesTopicExist);
@ -111,9 +111,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
String followerTopic = "new-follower";
try (Admin admin = clusterInstance.admin()) {
// Set broker id 0 as the first entry which is taken as the leader.
admin.createTopics(Collections.singletonList(new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2))))).all().get();
admin.createTopics(List.of(new NewTopic(leaderTopic, Map.of(0, List.of(0, 1, 2))))).all().get();
clusterInstance.waitForTopic(leaderTopic, 1);
admin.createTopics(Collections.singletonList(new NewTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0))))).all().get();
admin.createTopics(List.of(new NewTopic(followerTopic, Map.of(0, List.of(1, 2, 0))))).all().get();
clusterInstance.waitForTopic(followerTopic, 1);
}
@ -139,12 +139,12 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// has not yet been subscribing as they are not yet registered.
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
time.milliseconds(), SEG_SIZE, Map.of(0, 0L));
assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
time.milliseconds(), SEG_SIZE, Map.of(0, 0L));
assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
@ -154,8 +154,8 @@ public class TopicBasedRemoteLogMetadataManagerTest {
assertFalse(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
assertFalse(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
Collections.singleton(newFollowerTopicIdPartition));
topicBasedRlmm().onPartitionLeadershipChanges(Set.of(newLeaderTopicIdPartition),
Set.of(newFollowerTopicIdPartition));
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
@ -199,17 +199,17 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Map.of(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L));
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Map.of(0, 0L));
RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L));
200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Map.of(0, 0L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
@ -244,17 +244,17 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Map.of(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Map.of(1, 100L));
RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L));
200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Map.of(2, 200L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
@ -289,14 +289,14 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Map.of(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Map.of(1, 100L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(topicIdPartition), Set.of());
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.

View File

@ -26,7 +26,7 @@ import org.apache.kafka.server.quota.QuotaType;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -37,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RLMQuotaManagerTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time);
private final Metrics metrics = new Metrics(new MetricConfig(), List.of(), time);
private static final QuotaType QUOTA_TYPE = QuotaType.RLM_FETCH;
private static final String DESCRIPTION = "Tracking byte rate";

View File

@ -18,7 +18,7 @@ package org.apache.kafka.server.log.remote.storage;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -38,7 +38,7 @@ public class ClassLoaderAwareRemoteStorageManagerTest {
}).when(delegate).configure(any());
assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader());
rsm.configure(Collections.emptyMap());
rsm.configure(Map.of());
assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader());
}

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Arrays.stream;
import static java.util.Collections.unmodifiableMap;
@ -69,7 +68,7 @@ public final class LocalTieredStorageHistory {
matchingTypeEvents = new ArrayList<>(matchingTypeEvents);
}
return matchingTypeEvents.stream().filter(matches(topicPartition)).collect(Collectors.toList());
return matchingTypeEvents.stream().filter(matches(topicPartition)).toList();
}
/**

View File

@ -22,12 +22,10 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.Remote
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static java.lang.String.format;
@ -43,16 +41,15 @@ public final class LocalTieredStorageSnapshot {
}
public List<TopicPartition> getTopicPartitions() {
final List<TopicPartition> topicPartitions = snapshot.topicIdPartitions.stream()
return snapshot.topicIdPartitions.stream()
.map(TopicIdPartition::topicPartition)
.collect(Collectors.toList());
return Collections.unmodifiableList(topicPartitions);
.toList();
}
public List<RemoteLogSegmentFileset> getFilesets(final TopicPartition topicPartition) {
return snapshot.records.values().stream()
.filter(fileset -> fileset.getRemoteLogSegmentId().topicIdPartition().topicPartition().equals(topicPartition))
.collect(Collectors.toList());
.toList();
}
public int size() {

View File

@ -47,19 +47,15 @@ import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static java.nio.ByteBuffer.wrap;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot.takeSnapshot;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
@ -103,7 +99,7 @@ public final class LocalTieredStorageTest {
@BeforeEach
public void before(TestInfo testInfo) {
init(Collections.emptyMap(), testInfo.getDisplayName());
init(Map.of(), testInfo.getDisplayName());
}
@AfterEach
@ -249,7 +245,7 @@ public final class LocalTieredStorageTest {
@Test
public void segmentsAreNotDeletedIfDeleteApiIsDisabled(TestInfo testInfo) throws RemoteStorageException {
init(Collections.singletonMap(LocalTieredStorage.ENABLE_DELETE_API_CONFIG, "false"), testInfo.getDisplayName());
init(Map.of(LocalTieredStorage.ENABLE_DELETE_API_CONFIG, "false"), testInfo.getDisplayName());
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final LogSegmentData segment = localLogSegments.nextSegment();
@ -303,8 +299,8 @@ public final class LocalTieredStorageTest {
final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage);
assertEquals(Collections.singletonList(topicPartition), snapshot.getTopicPartitions());
assertEquals(asList(wrap(record1), wrap(record2)), extractRecordsValue(snapshot, id));
assertEquals(List.of(topicPartition), snapshot.getTopicPartitions());
assertEquals(List.of(wrap(record1), wrap(record2)), extractRecordsValue(snapshot, id));
}
@Test
@ -323,14 +319,14 @@ public final class LocalTieredStorageTest {
final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage);
final Map<RemoteLogSegmentId, List<ByteBuffer>> expected = new HashMap<>();
expected.put(idA, asList(wrap(record1a), wrap(record2a)));
expected.put(idB, asList(wrap(record1b), wrap(record2b)));
expected.put(idA, List.of(wrap(record1a), wrap(record2a)));
expected.put(idB, List.of(wrap(record1b), wrap(record2b)));
final Map<RemoteLogSegmentId, List<ByteBuffer>> actual = new HashMap<>();
actual.put(idA, extractRecordsValue(snapshot, idA));
actual.put(idB, extractRecordsValue(snapshot, idB));
assertEquals(Collections.singletonList(topicPartition), snapshot.getTopicPartitions());
assertEquals(List.of(topicPartition), snapshot.getTopicPartitions());
assertEquals(expected, actual);
}
@ -358,7 +354,7 @@ public final class LocalTieredStorageTest {
private RemoteLogSegmentMetadata newRemoteLogSegmentMetadata(final RemoteLogSegmentId id) {
return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000L,
1024, Collections.singletonMap(0, 0L));
1024, Map.of(0, 0L));
}
private RemoteLogSegmentId newRemoteLogSegmentId() {
@ -403,7 +399,7 @@ public final class LocalTieredStorageTest {
final String uuid = metadata.remoteLogSegmentId().id().toString();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
return Arrays.asList(
return List.of(
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
@ -537,7 +533,7 @@ public final class LocalTieredStorageTest {
private RemoteLogSegmentMetadata newMetadata(final RemoteLogSegmentId id) {
return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000,
1024, Collections.singletonMap(0, 0L));
1024, Map.of(0, 0L));
}
private String getStorageRootDirectory() {
@ -649,7 +645,7 @@ public final class LocalTieredStorageTest {
}
void deleteAll() throws IOException {
List<Path> paths = Files.list(segmentPath).collect(Collectors.toList());
List<Path> paths = Files.list(segmentPath).toList();
for (final Path path : paths) {
Files.delete(path);
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -33,8 +32,8 @@ public class RemoteLogManagerConfigTest {
public void testValidConfigs() {
String rsmPrefix = "__custom.rsm.";
String rlmmPrefix = "__custom.rlmm.";
Map<String, Object> rsmProps = Collections.singletonMap("rsm.prop", "val");
Map<String, Object> rlmmProps = Collections.singletonMap("rlmm.prop", "val");
Map<String, Object> rsmProps = Map.of("rsm.prop", "val");
Map<String, Object> rlmmProps = Map.of("rlmm.prop", "val");
Map<String, Object> props = getRLMProps(rsmPrefix, rlmmPrefix);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
@ -68,7 +67,7 @@ public class RemoteLogManagerConfigTest {
@Test
public void testValidateEmptyStringConfig() {
// Test with a empty string props should throw ConfigException
Map<String, Object> emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "");
Map<String, Object> emptyStringProps = Map.of(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "");
assertThrows(ConfigException.class, () ->
new RLMTestConfig(emptyStringProps).remoteLogManagerConfig());
}

View File

@ -27,10 +27,10 @@ import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataMana
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_FINISHED;
@ -65,10 +65,10 @@ public class RemoteLogMetadataManagerTest {
@ClusterTest
public void testFetchSegments() throws Exception {
try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = topicBasedRlmm()) {
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
remoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(TP0), Set.of());
// 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available.
Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L);
Map<Integer, Long> segmentLeaderEpochs = Map.of(0, 101L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(
segmentId, 101L, 200L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
@ -95,8 +95,8 @@ public class RemoteLogMetadataManagerTest {
@ClusterTest
public void testRemotePartitionDeletion() throws Exception {
try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = topicBasedRlmm()) {
remoteLogMetadataManager.configure(Collections.emptyMap());
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
remoteLogMetadataManager.configure(Map.of());
remoteLogMetadataManager.onPartitionLeadershipChanges(Set.of(TP0), Set.of());
// Create remote log segment metadata and add them to RLMM.

View File

@ -219,7 +219,7 @@ public final class RemoteLogSegmentFileset {
public List<Record> getRecords() throws IOException {
return StreamSupport
.stream(FileRecords.open(files.get(SEGMENT)).records().spliterator(), false)
.collect(Collectors.toList());
.toList();
}
public void copy(final Transferer transferer, final LogSegmentData data) throws IOException {

View File

@ -29,10 +29,8 @@ import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.regex.Pattern.compile;
import static java.util.stream.Collectors.toSet;
@ -91,7 +89,7 @@ public final class RemoteTopicPartitionDirectory {
}
boolean delete() {
return deleteFilesOnly(asList(directory.listFiles())) && deleteQuietly(directory);
return deleteFilesOnly(List.of(directory.listFiles())) && deleteQuietly(directory);
}
void traverse(final LocalTieredStorageTraverser traverser) {
@ -106,7 +104,7 @@ public final class RemoteTopicPartitionDirectory {
return uuids.stream()
.map(uuid -> RemoteLogSegmentFileset.openExistingFileset(this, uuid))
.collect(Collectors.toList());
.toList();
}
/**

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.GROUP_PARTITION;
@ -33,7 +32,7 @@ public class RemoteTopicPartitionDirectoryTest {
@Test
public void testSubStr() {
List<String> topics = Arrays.asList("abcd", "-abcd-10-", "abcd-0-xyz", "abcd.ef-gh-0", "abcd_10_xyz_0");
List<String> topics = List.of("abcd", "-abcd-10-", "abcd-0-xyz", "abcd.ef-gh-0", "abcd_10_xyz_0");
for (String topic : topics) {
for (int i = 0; i < 100; i++) {
Uuid uuid = Uuid.randomUuid();

View File

@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -38,7 +37,7 @@ class LeaderEpochCheckpointFileWithFailureHandlerTest {
LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(file, new LogDirFailureChannel(1));
//Given
List<EpochEntry> epochs = Arrays.asList(
List<EpochEntry> epochs = List.of(
new EpochEntry(0, 1L),
new EpochEntry(1, 2L),
new EpochEntry(2, 3L));
@ -50,7 +49,7 @@ class LeaderEpochCheckpointFileWithFailureHandlerTest {
assertEquals(epochs, checkpoint.read());
//Given overwrite
List<EpochEntry> epochs2 = Arrays.asList(
List<EpochEntry> epochs2 = List.of(
new EpochEntry(3, 4L),
new EpochEntry(4, 5L));
@ -67,7 +66,7 @@ class LeaderEpochCheckpointFileWithFailureHandlerTest {
//Given a file with data in
LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(file, new LogDirFailureChannel(1));
List<EpochEntry> epochs = Arrays.asList(
List<EpochEntry> epochs = List.of(
new EpochEntry(0, 1L),
new EpochEntry(1, 2L),
new EpochEntry(2, 3L));

View File

@ -27,8 +27,8 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -98,13 +98,13 @@ public class OffsetCheckpointFileWithFailureHandlerTest {
OffsetCheckpointFile checkpoint = new OffsetCheckpointFile(TestUtils.tempFile(), null);
//Then
assertEquals(Collections.emptyMap(), checkpoint.read());
assertEquals(Map.of(), checkpoint.read());
//When
checkpoint.write(Collections.emptyMap());
checkpoint.write(Map.of());
//Then
assertEquals(Collections.emptyMap(), checkpoint.read());
assertEquals(Map.of(), checkpoint.read());
}
@Test
@ -113,7 +113,7 @@ public class OffsetCheckpointFileWithFailureHandlerTest {
LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
CheckpointFileWithFailureHandler<OffsetCheckpointFile.TopicPartitionOffset> checkpointFile = new CheckpointFileWithFailureHandler<>(file, OffsetCheckpointFile.CURRENT_VERSION + 1,
new OffsetCheckpointFile.Formatter(), logDirFailureChannel, file.getParent());
checkpointFile.write(Collections.singletonList(new OffsetCheckpointFile.TopicPartitionOffset(new TopicPartition("foo", 5), 10L)));
checkpointFile.write(List.of(new OffsetCheckpointFile.TopicPartitionOffset(new TopicPartition("foo", 5), 10L)));
assertThrows(KafkaStorageException.class, () -> new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read());
}
@ -122,7 +122,7 @@ public class OffsetCheckpointFileWithFailureHandlerTest {
String logDir = "/tmp/kafka-logs";
OffsetCheckpointFile mockCheckpointFile = Mockito.mock(OffsetCheckpointFile.class);
LazyOffsetCheckpoints lazyCheckpoints = new LazyOffsetCheckpoints(Collections.singletonMap(logDir, mockCheckpointFile));
LazyOffsetCheckpoints lazyCheckpoints = new LazyOffsetCheckpoints(Map.of(logDir, mockCheckpointFile));
Mockito.verify(mockCheckpointFile, Mockito.never()).read();
TopicPartition partition0 = new TopicPartition("foo", 0);
@ -147,7 +147,7 @@ public class OffsetCheckpointFileWithFailureHandlerTest {
public void testLazyOffsetCheckpointFileInvalidLogDir() {
String logDir = "/tmp/kafka-logs";
OffsetCheckpointFile mockCheckpointFile = Mockito.mock(OffsetCheckpointFile.class);
LazyOffsetCheckpoints lazyCheckpoints = new LazyOffsetCheckpoints(Collections.singletonMap(logDir, mockCheckpointFile));
LazyOffsetCheckpoints lazyCheckpoints = new LazyOffsetCheckpoints(Map.of(logDir, mockCheckpointFile));
assertThrows(IllegalArgumentException.class, () -> lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo", 0)));
}
@ -160,6 +160,6 @@ public class OffsetCheckpointFileWithFailureHandlerTest {
LeaderEpochCheckpointFile.FORMATTER, logDirFailureChannel, file.getParent());
assertTrue(dir.renameTo(new File(dir.getAbsolutePath() + "-renamed")));
checkpointFile.writeIfDirExists(Collections.singletonList(new EpochEntry(1, 42)));
checkpointFile.writeIfDirExists(List.of(new EpochEntry(1, 42)));
}
}

View File

@ -42,7 +42,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -101,7 +100,7 @@ class LocalLogTest {
}
private List<SimpleRecord> kvsToRecords(List<KeyValue> keyValues) {
return keyValues.stream().map(KeyValue::toRecord).collect(Collectors.toList());
return keyValues.stream().map(KeyValue::toRecord).toList();
}
private List<KeyValue> recordsToKvs(Iterable<Record> records) {
@ -497,7 +496,7 @@ class LocalLogTest {
private List<Long> nonActiveBaseOffsetsFrom(long startOffset) {
return log.segments().nonActiveLogSegmentsFrom(startOffset).stream()
.map(LogSegment::baseOffset)
.collect(Collectors.toList());
.toList();
}
private String topicPartitionName(String topic, String partition) {
@ -653,12 +652,12 @@ class LocalLogTest {
List<KeyValue> keyValues2 = List.of(new KeyValue("k2", "v2"));
appendRecords(keyValues2.stream()
.map(kv -> kv.toRecord(MOCK_TIME.milliseconds() + 10))
.collect(Collectors.toList()),
.toList(),
1L);
assertEquals(2, log.logEndOffset(), "Expect two records in the log");
FetchDataInfo readResult = readRecords(0L);
assertEquals(2L, Utils.toList(readResult.records.records()).size());
assertEquals(Stream.concat(keyValues1.stream(), keyValues2.stream()).collect(Collectors.toList()), recordsToKvs(readResult.records.records()));
assertEquals(Stream.concat(keyValues1.stream(), keyValues2.stream()).toList(), recordsToKvs(readResult.records.records()));
// roll so that active segment is empty
log.roll(0L);

View File

@ -51,7 +51,6 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -263,7 +262,7 @@ public class LogSegmentTest {
// check that we can read back both messages
FetchDataInfo read = seg.read(offset, 10000);
assertIterableEquals(Arrays.asList(ms1.records().iterator().next(), ms2.records().iterator().next()), read.records.records());
assertIterableEquals(List.of(ms1.records().iterator().next(), ms2.records().iterator().next()), read.records.records());
// Now truncate off the last message
seg.truncateTo(offset + 1);
@ -540,7 +539,7 @@ public class LogSegmentTest {
new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
seg.recover(newProducerStateManager(), cache);
assertEquals(Arrays.asList(
assertEquals(List.of(
new EpochEntry(0, 104L),
new EpochEntry(1, 106L),
new EpochEntry(2, 110L)), cache.epochEntries());

View File

@ -28,8 +28,6 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -132,7 +130,7 @@ public class LogSegmentsTest {
LogSegment seg4 = createSegment(offset4);
// Test firstEntry, lastEntry
List<LogSegment> segmentList = Arrays.asList(seg1, seg2, seg3, seg4);
List<LogSegment> segmentList = List.of(seg1, seg2, seg3, seg4);
for (LogSegment seg : segmentList) {
segments.add(seg);
assertEntry(seg1, segments.firstEntry().get());
@ -143,28 +141,28 @@ public class LogSegmentsTest {
}
// Test baseOffsets
assertEquals(Arrays.asList(offset1, offset2, offset3, offset4), segments.baseOffsets());
assertEquals(List.of(offset1, offset2, offset3, offset4), segments.baseOffsets());
// Test values
assertEquals(Arrays.asList(seg1, seg2, seg3, seg4), new ArrayList<>(segments.values()));
assertEquals(List.of(seg1, seg2, seg3, seg4), new ArrayList<>(segments.values()));
// Test values(to, from)
assertThrows(IllegalArgumentException.class, () -> segments.values(2, 1));
assertEquals(Collections.emptyList(), segments.values(1, 1));
assertEquals(Collections.singletonList(seg1), new ArrayList<>(segments.values(1, 2)));
assertEquals(Arrays.asList(seg1, seg2), new ArrayList<>(segments.values(1, 3)));
assertEquals(Arrays.asList(seg1, seg2, seg3), new ArrayList<>(segments.values(1, 4)));
assertEquals(Arrays.asList(seg2, seg3), new ArrayList<>(segments.values(2, 4)));
assertEquals(Collections.singletonList(seg3), new ArrayList<>(segments.values(3, 4)));
assertEquals(Collections.emptyList(), new ArrayList<>(segments.values(4, 4)));
assertEquals(Collections.singletonList(seg4), new ArrayList<>(segments.values(4, 5)));
assertEquals(List.of(), segments.values(1, 1));
assertEquals(List.of(seg1), new ArrayList<>(segments.values(1, 2)));
assertEquals(List.of(seg1, seg2), new ArrayList<>(segments.values(1, 3)));
assertEquals(List.of(seg1, seg2, seg3), new ArrayList<>(segments.values(1, 4)));
assertEquals(List.of(seg2, seg3), new ArrayList<>(segments.values(2, 4)));
assertEquals(List.of(seg3), new ArrayList<>(segments.values(3, 4)));
assertEquals(List.of(), new ArrayList<>(segments.values(4, 4)));
assertEquals(List.of(seg4), new ArrayList<>(segments.values(4, 5)));
// Test activeSegment
assertEquals(seg4, segments.activeSegment());
// Test nonActiveLogSegmentsFrom
assertEquals(Arrays.asList(seg2, seg3), new ArrayList<>(segments.nonActiveLogSegmentsFrom(2)));
assertEquals(Collections.emptyList(), new ArrayList<>(segments.nonActiveLogSegmentsFrom(4)));
assertEquals(List.of(seg2, seg3), new ArrayList<>(segments.nonActiveLogSegmentsFrom(2)));
assertEquals(List.of(), new ArrayList<>(segments.nonActiveLogSegmentsFrom(4)));
}
}
@ -176,7 +174,7 @@ public class LogSegmentsTest {
LogSegment seg3 = createSegment(5L);
LogSegment seg4 = createSegment(7L);
Arrays.asList(seg1, seg2, seg3, seg4).forEach(segments::add);
List.of(seg1, seg2, seg3, seg4).forEach(segments::add);
// Test floorSegment
assertEquals(Optional.of(seg1), segments.floorSegment(2));
@ -203,12 +201,12 @@ public class LogSegmentsTest {
LogSegment seg4 = createSegment(7L);
LogSegment seg5 = createSegment(9L);
Arrays.asList(seg1, seg2, seg3, seg4, seg5).forEach(segments::add);
List.of(seg1, seg2, seg3, seg4, seg5).forEach(segments::add);
// higherSegments(0) should return all segments in order
{
final Iterator<LogSegment> iterator = segments.higherSegments(0).iterator();
Arrays.asList(seg1, seg2, seg3, seg4, seg5).forEach(segment -> {
List.of(seg1, seg2, seg3, seg4, seg5).forEach(segment -> {
assertTrue(iterator.hasNext());
assertEquals(segment, iterator.next());
});
@ -218,7 +216,7 @@ public class LogSegmentsTest {
// higherSegments(1) should return all segments in order except seg1
{
final Iterator<LogSegment> iterator = segments.higherSegments(1).iterator();
Arrays.asList(seg2, seg3, seg4, seg5).forEach(segment -> {
List.of(seg2, seg3, seg4, seg5).forEach(segment -> {
assertTrue(iterator.hasNext());
assertEquals(segment, iterator.next());
});
@ -248,9 +246,9 @@ public class LogSegmentsTest {
when(logSegment.size()).thenReturn(Integer.MAX_VALUE);
assertEquals(Integer.MAX_VALUE, LogSegments.sizeInBytes(Collections.singletonList(logSegment)));
assertEquals(largeSize, LogSegments.sizeInBytes(Arrays.asList(logSegment, logSegment)));
assertTrue(LogSegments.sizeInBytes(Arrays.asList(logSegment, logSegment)) > Integer.MAX_VALUE);
assertEquals(Integer.MAX_VALUE, LogSegments.sizeInBytes(List.of(logSegment)));
assertEquals(largeSize, LogSegments.sizeInBytes(List.of(logSegment, logSegment)));
assertTrue(LogSegments.sizeInBytes(List.of(logSegment, logSegment)) > Integer.MAX_VALUE);
try (LogSegments logSegments = new LogSegments(topicPartition)) {
logSegments.add(logSegment);

View File

@ -59,7 +59,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@ -123,7 +122,7 @@ public class LogValidatorTest {
ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression);
List<Long> recordsResult = new ArrayList<>();
result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset()));
assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult);
assertEquals(LongStream.range(0, numRecords).boxed().toList(), recordsResult);
}
});
}
@ -296,7 +295,7 @@ public class LogValidatorTest {
public void checkRecompression(byte magic) {
long now = System.currentTimeMillis();
// Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
List<Long> timestampSeq = List.of(now - 1, now + 1, now);
long producerId;
short producerEpoch;
@ -416,7 +415,7 @@ public class LogValidatorTest {
private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte magicValue, Compression compression, int numRecords) {
List<SimpleRecord> records = IntStream.range(0, numRecords)
.mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
.collect(Collectors.toList());
.toList();
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magicValue, compression, TimestampType.CREATE_TIME, 0L);
@ -475,7 +474,7 @@ public class LogValidatorTest {
private MemoryRecords createRecords(byte magicValue,
long timestamp,
Compression codec) {
List<byte[]> records = Arrays.asList("hello".getBytes(), "there".getBytes(), "beautiful".getBytes());
List<byte[]> records = List.of("hello".getBytes(), "there".getBytes(), "beautiful".getBytes());
return createRecords(records, magicValue, timestamp, codec);
}
@ -484,7 +483,7 @@ public class LogValidatorTest {
public void checkCompressed(byte magic) {
long now = System.currentTimeMillis();
// set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
List<Long> timestampSeq = List.of(now - 1, now + 1, now);
long producerId;
short producerEpoch;
@ -506,7 +505,7 @@ public class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
}
List<SimpleRecord> recordList = Arrays.asList(
List<SimpleRecord> recordList = List.of(
new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
@ -1668,7 +1667,7 @@ public class LogValidatorTest {
@Test
public void testDifferentLevelDoesNotCauseRecompression() {
List<byte[]> records = Arrays.asList(
List<byte[]> records = List.of(
String.join("", Collections.nCopies(256, "some")).getBytes(),
String.join("", Collections.nCopies(256, "data")).getBytes()
);
@ -1709,7 +1708,7 @@ public class LogValidatorTest {
@Test
public void testDifferentCodecCausesRecompression() {
List<byte[]> records = Arrays.asList(
List<byte[]> records = List.of(
"somedata".getBytes(),
"moredata".getBytes()
);

View File

@ -55,10 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -606,20 +602,20 @@ public class ProducerStateManagerTest {
appendClientEntry(stateManager, producerId, epoch, 1, 1L, false);
stateManager.takeSnapshot();
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(2L), currentSnapshotOffsets());
assertEquals(Set.of(2L), currentSnapshotOffsets());
appendClientEntry(stateManager, producerId, epoch, 2, 2L, false);
stateManager.takeSnapshot();
assertEquals(2, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(new HashSet<>(asList(2L, 3L)), currentSnapshotOffsets());
assertEquals(new HashSet<>(List.of(2L, 3L)), currentSnapshotOffsets());
stateManager.deleteSnapshotsBefore(3L);
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(3L), currentSnapshotOffsets());
assertEquals(Set.of(3L), currentSnapshotOffsets());
stateManager.deleteSnapshotsBefore(4L);
assertEquals(0, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(emptySet(), currentSnapshotOffsets());
assertEquals(Set.of(), currentSnapshotOffsets());
}
@Test
@ -628,22 +624,22 @@ public class ProducerStateManagerTest {
appendClientEntry(stateManager, producerId, epoch, 1, 1L, false);
stateManager.takeSnapshot();
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(2L), currentSnapshotOffsets());
assertEquals(Set.of(2L), currentSnapshotOffsets());
appendClientEntry(stateManager, producerId, epoch, 2, 2L, false);
stateManager.takeSnapshot();
assertEquals(2, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(new HashSet<>(asList(2L, 3L)), currentSnapshotOffsets());
assertEquals(new HashSet<>(List.of(2L, 3L)), currentSnapshotOffsets());
stateManager.truncateFullyAndStartAt(0L);
assertEquals(0, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(emptySet(), currentSnapshotOffsets());
assertEquals(Set.of(), currentSnapshotOffsets());
appendClientEntry(stateManager, producerId, epoch, 0, 0L, false);
stateManager.takeSnapshot();
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(1L), currentSnapshotOffsets());
assertEquals(Set.of(1L), currentSnapshotOffsets());
}
@Test
@ -659,12 +655,12 @@ public class ProducerStateManagerTest {
appendClientEntry(stateManager, producerId, epoch, 4, 4L, false);
stateManager.takeSnapshot();
assertEquals(2, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(new HashSet<>(asList(3L, 5L)), currentSnapshotOffsets());
assertEquals(new HashSet<>(List.of(3L, 5L)), currentSnapshotOffsets());
// Truncate to the range (3, 5), this will delete the earlier snapshot until offset 3.
stateManager.truncateAndReload(3, 5, time.milliseconds());
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(5L), currentSnapshotOffsets());
assertEquals(Set.of(5L), currentSnapshotOffsets());
// Add the snapshot files until offset 3 to the log dir.
pathAndDataList.forEach((path, data) -> assertDoesNotThrow(() -> Files.write(path, data)));
@ -673,7 +669,7 @@ public class ProducerStateManagerTest {
stateManager.truncateFullyAndReloadSnapshots();
assertEquals(OptionalLong.of(3), stateManager.latestSnapshotOffset());
assertEquals(singleton(3L), currentSnapshotOffsets());
assertEquals(Set.of(3L), currentSnapshotOffsets());
}
@Test
@ -727,12 +723,12 @@ public class ProducerStateManagerTest {
stateManager.takeSnapshot();
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(1L), currentSnapshotOffsets());
assertEquals(Set.of(1L), currentSnapshotOffsets());
// nothing changed so there should be no new snapshot
stateManager.takeSnapshot();
assertEquals(1, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(singleton(1L), currentSnapshotOffsets());
assertEquals(Set.of(1L), currentSnapshotOffsets());
}
@Test
@ -920,17 +916,17 @@ public class ProducerStateManagerTest {
Files.createFile(LogFileUtils.producerSnapshotFile(logDir, 42).toPath()); // not stray
// claim that we only have one segment with a base offset of 5
stateManager.removeStraySnapshots(singletonList(5L));
stateManager.removeStraySnapshots(List.of(5L));
// The snapshot file at offset 2 should be considered a stray, but the snapshot at 42 should be kept
// around because it is the largest snapshot.
assertEquals(OptionalLong.of(42), stateManager.latestSnapshotOffset());
assertEquals(OptionalLong.of(5), stateManager.oldestSnapshotOffset());
assertEquals(asList(5L, 42L), ProducerStateManager.listSnapshotFiles(logDir)
assertEquals(List.of(5L, 42L), ProducerStateManager.listSnapshotFiles(logDir)
.stream()
.map(file -> file.offset)
.sorted()
.collect(Collectors.toList()));
.toList());
}
@Test
@ -943,12 +939,12 @@ public class ProducerStateManagerTest {
Files.createFile(LogFileUtils.producerSnapshotFile(logDir, 2).toPath()); // stray
Files.createFile(LogFileUtils.producerSnapshotFile(logDir, 42).toPath()); // not stray
stateManager.removeStraySnapshots(singletonList(42L));
assertEquals(singletonList(42L), ProducerStateManager.listSnapshotFiles(logDir)
stateManager.removeStraySnapshots(List.of(42L));
assertEquals(List.of(42L), ProducerStateManager.listSnapshotFiles(logDir)
.stream()
.map(file -> file.offset)
.sorted()
.collect(Collectors.toList()));
.toList());
}
@ -1237,7 +1233,7 @@ public class ProducerStateManagerTest {
assertEquals(expectedFirstDataOffset, lastEntry.firstDataOffset());
assertEquals(expectedLastDataOffset, lastEntry.lastDataOffset());
assertEquals(expectedCurrentTxnFirstOffset, lastEntry.currentTxnFirstOffset());
assertTxnMetadataEquals(singletonList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions());
assertTxnMetadataEquals(List.of(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions());
}
private void assertTxnMetadataEquals(List<TxnMetadata> expected, List<TxnMetadata> actual) {

View File

@ -24,8 +24,6 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -46,7 +44,7 @@ public class TransactionIndexTest {
@Test
public void testPositionSetCorrectlyWhenOpened() throws IOException {
List<AbortedTxn> abortedTxns = new ArrayList<>(Arrays.asList(
List<AbortedTxn> abortedTxns = new ArrayList<>(List.of(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
@ -63,7 +61,7 @@ public class TransactionIndexTest {
@Test
public void testSanityCheck() throws IOException {
List<AbortedTxn> abortedTxns = Arrays.asList(
List<AbortedTxn> abortedTxns = List.of(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
@ -93,7 +91,7 @@ public class TransactionIndexTest {
@Test
public void testCollectAbortedTransactions() {
List<AbortedTxn> abortedTransactions = Arrays.asList(
List<AbortedTxn> abortedTransactions = List.of(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
@ -128,7 +126,7 @@ public class TransactionIndexTest {
@Test
public void testTruncate() throws IOException {
List<AbortedTxn> abortedTransactions = Arrays.asList(
List<AbortedTxn> abortedTransactions = List.of(
new AbortedTxn(0L, 0, 10, 2),
new AbortedTxn(1L, 5, 15, 16),
new AbortedTxn(2L, 18, 35, 25),
@ -143,7 +141,7 @@ public class TransactionIndexTest {
assertEquals(abortedTransactions.subList(0, 3), index.collectAbortedTxns(0L, 100L).abortedTransactions);
index.reset();
assertEquals(Collections.emptyList(), index.collectAbortedTxns(0L, 100L).abortedTransactions);
assertEquals(List.of(), index.collectAbortedTxns(0L, 100L).abortedTransactions);
}
@Test

View File

@ -62,7 +62,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -366,7 +365,7 @@ public final class TieredStorageTestBuilder {
.map(spec ->
new OffloadedSegmentSpec(spec.getSourceBrokerId(), topicPartition, spec.getBaseOffset(),
spec.getRecords()))
.collect(Collectors.toList());
.toList();
ProduceAction action = new ProduceAction(topicPartition, offloadedSegmentSpecs, recordsToProduce,
producableSpec.getBatchSize(), producableSpec.getEarliestLocalLogOffset());
actions.add(action);
@ -416,7 +415,7 @@ public final class TieredStorageTestBuilder {
.map(spec -> new RemoteDeleteSegmentSpec(spec.getSourceBrokerId(), partition,
spec.getEventType(), spec.getEventCount()));
})
.collect(Collectors.toList());
.toList();
deleteSegmentSpecList.forEach(spec -> deletables.remove(spec.getTopicPartition()));
return deleteSegmentSpecList;
}

View File

@ -54,7 +54,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -107,7 +106,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
producer = harness.createProducer(ser, ser, producerOverrideProps);
consumer = harness.createConsumer(de, de, commonOverrideProps,
CollectionConverters.asScala(Collections.<String>emptyList()).toList());
CollectionConverters.asScala(List.<String>of()).toList());
admin = harness.createAdminClient(listenerName, commonOverrideProps);
}
@ -125,7 +124,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
newTopic = new NewTopic(spec.getTopicName(), replicasAssignments);
}
newTopic.configs(spec.getProperties());
admin.createTopics(Collections.singletonList(newTopic)).all().get();
admin.createTopics(List.of(newTopic)).all().get();
TestUtils.waitForAllPartitionsMetadata(harness.brokers(), spec.getTopicName(), spec.getPartitionCount());
synchronized (this) {
topicSpecs.put(spec.getTopicName(), spec);
@ -141,10 +140,10 @@ public final class TieredStorageTestContext implements AutoCloseable {
List<List<Integer>> newAssignments = assignment.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList());
.toList();
newPartitions = NewPartitions.increaseTo(spec.getPartitionCount(), newAssignments);
}
Map<String, NewPartitions> partitionsMap = Collections.singletonMap(spec.getTopicName(), newPartitions);
Map<String, NewPartitions> partitionsMap = Map.of(spec.getTopicName(), newPartitions);
admin.createPartitions(partitionsMap).all().get();
TestUtils.waitForAllPartitionsMetadata(harness.brokers(), spec.getTopicName(), spec.getPartitionCount());
}
@ -176,7 +175,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
alterEntries.add(new AlterConfigOp(new ConfigEntry(k, v), AlterConfigOp.OpType.SET)));
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000);
Map<ConfigResource, Collection<AlterConfigOp>> configsMap =
Collections.singletonMap(configResource, alterEntries);
Map.of(configResource, alterEntries);
admin.incrementalAlterConfigs(configsMap, alterOptions).all().get(30, TimeUnit.SECONDS);
}
@ -205,7 +204,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
public List<ConsumerRecord<String, String>> consume(TopicPartition topicPartition,
Integer expectedTotalCount,
Long fetchOffset) {
consumer.assign(Collections.singletonList(topicPartition));
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, fetchOffset);
long timeoutMs = 60_000L;
@ -225,14 +224,14 @@ public final class TieredStorageTestContext implements AutoCloseable {
}
public Long nextOffset(TopicPartition topicPartition) {
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
List<TopicPartition> partitions = List.of(topicPartition);
consumer.assign(partitions);
consumer.seekToEnd(partitions);
return consumer.position(topicPartition);
}
public Long beginOffset(TopicPartition topicPartition) {
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
List<TopicPartition> partitions = List.of(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
return consumer.position(topicPartition);
@ -329,7 +328,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
throws ExecutionException, InterruptedException {
String topic = topicPartition.topic();
int partition = topicPartition.partition();
TopicDescription description = admin.describeTopics(Collections.singletonList(topicPartition.topic()))
TopicDescription description = admin.describeTopics(List.of(topicPartition.topic()))
.allTopicNames().get().get(topic);
TopicPartitionInfo partitionInfo = description.partitions().get(partition);
return partitionInfo.replicas().stream().anyMatch(node -> node.id() == replicaId);

View File

@ -38,7 +38,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
@ -71,7 +70,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
@Override
public Seq<Properties> kraftControllerConfigs(TestInfo testInfo) {
return CollectionConverters.asScala(Collections.singletonList(overridingProps())).toSeq();
return CollectionConverters.asScala(List.of(overridingProps())).toSeq();
}
protected int numRemoteLogMetadataPartitions() {

View File

@ -26,7 +26,6 @@ import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import java.io.File;
import java.io.PrintStream;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@ -62,7 +61,7 @@ public final class AlterLogDirAction implements TieredStorageTestAction {
// build alterReplicaLogDirs request content to move from sourceDir to targetDir
TopicPartitionReplica topicPartitionReplica = new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), brokerId);
Map<TopicPartitionReplica, String> logDirs = Collections.singletonMap(topicPartitionReplica, targetDir.get().getAbsolutePath());
Map<TopicPartitionReplica, String> logDirs = Map.of(topicPartitionReplica, targetDir.get().getAbsolutePath());
AlterReplicaLogDirsResult results = context.admin().alterReplicaLogDirs(logDirs);
results.values().get(topicPartitionReplica).get(30, TimeUnit.SECONDS);

View File

@ -29,11 +29,9 @@ import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
@ -128,7 +126,7 @@ public final class ConsumeAction implements TieredStorageTestAction {
assertThat(storedRecords, correspondTo(readRecords, topicPartition, serde, serde));
// (B) Assessment of the interactions between the source broker and the second-tier storage.
for (LocalTieredStorageEvent.EventType eventType : Arrays.asList(FETCH_SEGMENT, FETCH_OFFSET_INDEX, FETCH_TIME_INDEX, FETCH_TRANSACTION_INDEX)) {
for (LocalTieredStorageEvent.EventType eventType : List.of(FETCH_SEGMENT, FETCH_OFFSET_INDEX, FETCH_TIME_INDEX, FETCH_TRANSACTION_INDEX)) {
Optional<LocalTieredStorageEvent> latestEvent;
switch (eventType) {
case FETCH_SEGMENT:
@ -149,7 +147,7 @@ public final class ConsumeAction implements TieredStorageTestAction {
List<LocalTieredStorageEvent> events = history.getEvents(eventType, topicPartition);
List<LocalTieredStorageEvent> eventsInScope = latestEvent
.map(e -> events.stream().filter(event -> event.isAfter(e)).collect(Collectors.toList()))
.map(e -> events.stream().filter(event -> event.isAfter(e)).toList())
.orElse(events);
RemoteFetchCount remoteFetchCount = remoteFetchSpec.getRemoteFetchCount();

View File

@ -25,13 +25,11 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
import java.io.PrintStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@ -64,10 +62,10 @@ public final class DeleteRecordsAction implements TieredStorageTestAction {
spec.getTopicPartition(),
false,
spec.getEventCount()))
.collect(Collectors.toList());
.toList();
Map<TopicPartition, RecordsToDelete> recordsToDeleteMap =
Collections.singletonMap(partition, RecordsToDelete.beforeOffset(beforeOffset));
Map.of(partition, RecordsToDelete.beforeOffset(beforeOffset));
context.admin().deleteRecords(recordsToDeleteMap).all().get();
if (!tieredStorageConditions.isEmpty()) {

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
@ -61,7 +60,7 @@ public final class DeleteTopicAction implements TieredStorageTestAction {
spec.getTopicPartition(),
false,
spec.getEventCount()))
.collect(Collectors.toList());
.toList();
if (shouldDelete) {
context.deleteTopic(topic);
}

View File

@ -30,10 +30,10 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -67,7 +67,7 @@ public final class ExpectLeaderAction implements TieredStorageTestAction {
reassignPartition(context);
if (electLeader) {
context.admin().electLeaders(ElectionType.PREFERRED, Collections.singleton(topicPartition));
context.admin().electLeaders(ElectionType.PREFERRED, Set.of(topicPartition));
}
AtomicInteger actualLeader = new AtomicInteger(-1);
TestUtils.waitForCondition(() -> {
@ -106,7 +106,7 @@ public final class ExpectLeaderAction implements TieredStorageTestAction {
});
Map<TopicPartition, Optional<NewPartitionReassignment>> proposed =
Collections.singletonMap(topicPartition, Optional.of(new NewPartitionReassignment(targetReplicas)));
Map.of(topicPartition, Optional.of(new NewPartitionReassignment(targetReplicas)));
AlterPartitionReassignmentsResult result = context.admin().alterPartitionReassignments(proposed);
result.all().get(30, TimeUnit.MINUTES);
}

View File

@ -24,7 +24,7 @@ import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -48,7 +48,7 @@ public final class ExpectListOffsetsAction implements TieredStorageTestAction {
@Override
public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException {
ListOffsetsResult.ListOffsetsResultInfo listOffsetsResult = context.admin()
.listOffsets(Collections.singletonMap(partition, spec))
.listOffsets(Map.of(partition, spec))
.all()
.get()
.get(partition);

View File

@ -25,7 +25,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -46,7 +45,7 @@ public final class ExpectTopicIdToMatchInRemoteStorageAction implements TieredSt
List<TopicPartition> partitions = snapshot.getTopicPartitions()
.stream()
.filter(tp -> tp.topic().equals(topic))
.collect(Collectors.toList());
.toList();
partitions.forEach(partition ->
snapshot.getFilesets(partition)
.forEach(fileset -> assertEquals(topicId, fileset.getRemoteLogSegmentId().id()))

View File

@ -26,7 +26,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -50,7 +49,7 @@ public final class ExpectUserTopicMappedToMetadataPartitionsAction implements Ti
@Override
public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException {
String metadataTopic = TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
Map<String, TopicDescription> descriptions = describeTopics(context, Arrays.asList(topic, metadataTopic));
Map<String, TopicDescription> descriptions = describeTopics(context, List.of(topic, metadataTopic));
int metadataTopicPartitionCount = descriptions.get(metadataTopic).partitions().size();
RemoteLogMetadataTopicPartitioner partitioner =
new RemoteLogMetadataTopicPartitioner(metadataTopicPartitionCount);

View File

@ -35,7 +35,6 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT;
@ -85,7 +84,7 @@ public final class ProduceAction implements TieredStorageTestAction {
spec.getTopicPartition(),
spec.getBaseOffset(),
false))
.collect(Collectors.toList());
.toList();
// Retrieve the offset of the next record which would be consumed from the topic-partition
// before records are produced. This allows consuming only the newly produced records afterwards.
@ -140,7 +139,7 @@ public final class ProduceAction implements TieredStorageTestAction {
List<ProducerRecord<String, String>> producerRecords = offloadedSegmentSpecs.stream()
.flatMap(spec -> spec.getRecords().stream())
.collect(Collectors.toList());
.toList();
compareRecords(discoveredRecords, producerRecords, topicPartition);
}

View File

@ -26,12 +26,10 @@ import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
@ -51,7 +49,7 @@ public final class ReassignReplicaAction implements TieredStorageTestAction {
String topic = topicPartition.topic();
int partition = topicPartition.partition();
Map<TopicPartition, Optional<NewPartitionReassignment>> proposed =
Collections.singletonMap(topicPartition, Optional.of(new NewPartitionReassignment(replicaIds)));
Map.of(topicPartition, Optional.of(new NewPartitionReassignment(replicaIds)));
context.admin().alterPartitionReassignments(proposed);
TestUtils.waitForCondition(() -> {
@ -60,7 +58,7 @@ public final class ReassignReplicaAction implements TieredStorageTestAction {
List<Integer> actualReplicaIds = description.partitions().get(partition).replicas()
.stream()
.map(Node::id)
.collect(Collectors.toList());
.toList();
return replicaIds.equals(actualReplicaIds);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {

View File

@ -25,7 +25,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -60,7 +59,7 @@ public final class ShrinkReplicaAction implements TieredStorageTestAction {
}
Map<TopicPartition, Optional<NewPartitionReassignment>> proposed =
Collections.singletonMap(topicPartition, Optional.of(new NewPartitionReassignment(targetReplicaIds)));
Map.of(topicPartition, Optional.of(new NewPartitionReassignment(targetReplicaIds)));
context.admin().alterPartitionReassignments(proposed).all().get();
TestUtils.waitForCondition(() -> {

View File

@ -20,8 +20,7 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -47,7 +46,7 @@ public final class AlterLogDirTest extends TieredStorageTestHarness {
builder
// create topicB with 1 partition and 1 RF
.createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
mkMap(mkEntry(p0, Arrays.asList(broker1, broker0))), enableRemoteLogStorage)
mkMap(mkEntry(p0, List.of(broker1, broker0))), enableRemoteLogStorage)
// send records to partition 0
.expectSegmentToBeOffloaded(broker1, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker1, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
@ -55,7 +54,7 @@ public final class AlterLogDirTest extends TieredStorageTestHarness {
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// alter dir within the replica, we only expect one replicaId
.alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
.alterLogDir(topicB, p0, List.of(broker0).get(0))
// make sure the altered replica can still be elected as the leader
.expectLeader(topicB, p0, broker0, true)
// produce some more events and verify the earliest local offset

View File

@ -20,7 +20,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -60,7 +59,7 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
// segment to be rolled and deleted. We use a future timestamp to prevent that from happening.
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
// update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
.updateTopicConfig(topicA, configsToBeAdded(), List.of())
// expect that the three offloaded remote log segments are deleted
.expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 3)
.waitForRemoteLogSegmentDeletion(topicA)

View File

@ -21,7 +21,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -75,7 +74,7 @@ public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness {
.expectUserTopicMappedToMetadataPartitions(topicA, metadataPartitions)
// create topicB with 1 partition and 1 RF
.createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
mkMap(mkEntry(p0, List.of(broker0))), enableRemoteLogStorage)
// send records to partition 0
.expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))

View File

@ -18,13 +18,12 @@ package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.Map;
public final class DeleteSegmentsByRetentionSizeTest extends BaseDeleteSegmentsTest {
@Override
protected Map<String, String> configsToBeAdded() {
return Collections.singletonMap(TopicConfig.RETENTION_BYTES_CONFIG, "1");
return Map.of(TopicConfig.RETENTION_BYTES_CONFIG, "1");
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.api.Flaky;
import java.util.Collections;
import java.util.Map;
@Flaky("KAFKA-18606")
@ -27,6 +26,6 @@ public final class DeleteSegmentsByRetentionTimeTest extends BaseDeleteSegmentsT
@Override
protected Map<String, String> configsToBeAdded() {
return Collections.singletonMap(TopicConfig.RETENTION_MS_CONFIG, "1");
return Map.of(TopicConfig.RETENTION_MS_CONFIG, "1");
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -44,7 +43,7 @@ public final class DeleteSegmentsDueToLogStartOffsetBreachTest extends TieredSto
final Integer partitionCount = 1;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 2;
final Map<Integer, List<Integer>> replicaAssignment = mkMap(mkEntry(p0, Arrays.asList(broker0, broker1)));
final Map<Integer, List<Integer>> replicaAssignment = mkMap(mkEntry(p0, List.of(broker0, broker1)));
final boolean enableRemoteLogStorage = true;
final int beginEpoch = 0;
final long startOffset = 3;

View File

@ -20,8 +20,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -48,8 +46,8 @@ public final class DeleteTopicTest extends TieredStorageTestHarness {
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1)),
mkEntry(p1, Arrays.asList(broker1, broker0))
mkEntry(p0, List.of(broker0, broker1)),
mkEntry(p1, List.of(broker1, broker0))
);
builder
@ -70,7 +68,7 @@ public final class DeleteTopicTest extends TieredStorageTestHarness {
// delete the topic
.expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 2)
.expectDeletionInRemoteStorage(broker1, topicA, p1, DELETE_SEGMENT, 2)
.deleteTopic(Collections.singletonList(topicA))
.deleteTopic(List.of(topicA))
.expectEmptyRemoteStorage(topicA, p0)
.expectEmptyRemoteStorage(topicA, p1);
}

View File

@ -24,8 +24,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -59,7 +57,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1))
mkEntry(p0, List.of(broker0, broker1))
);
// local.retention.ms/bytes need to set to the same value as retention.ms/bytes when disabling remote log copy
final Map<String, String> disableRemoteCopy = new HashMap<>();
@ -88,7 +86,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
// disable remote log copy
.updateTopicConfig(topicA,
disableRemoteCopy,
Collections.emptyList())
List.of())
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
@ -97,7 +95,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
// re-enable remote log copy
.updateTopicConfig(topicA,
enableRemoteCopy,
Collections.emptyList())
List.of())
// make sure the logs can be offloaded
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
@ -106,7 +104,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
// disable remote log copy again
.updateTopicConfig(topicA,
disableRemoteCopy,
Collections.emptyList())
List.of())
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 3)
.consume(topicA, p0, 0L, 4, 3)
@ -120,7 +118,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
// disabling remote log on topicA and enabling deleteOnDisable
.updateTopicConfig(topicA,
deleteOnDisable,
Collections.emptyList())
List.of())
// make sure all remote data is deleted
.expectEmptyRemoteStorage(topicA, p0)
// verify the local log is still consumable

View File

@ -21,8 +21,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -48,8 +46,8 @@ public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness {
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = false;
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1)),
mkEntry(p1, Arrays.asList(broker1, broker0))
mkEntry(p0, List.of(broker0, broker1)),
mkEntry(p1, List.of(broker1, broker0))
);
builder
@ -65,8 +63,8 @@ public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness {
new KeyValueSpec("k2", "v2"))
// enable remote log storage
.updateTopicConfig(topicA,
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
Collections.emptyList())
Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
List.of())
// produce some more records to partition 0
// Note that the segment 0-2 gets offloaded for p0, but we cannot expect those events deterministically
// because the rlm-task-thread runs in background and this framework doesn't support it.

View File

@ -24,7 +24,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -48,8 +47,8 @@ public class FetchFromLeaderWithCorruptedCheckpointTest extends TieredStorageTes
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, Arrays.asList(broker0, broker1)));
final List<String> checkpointFiles = Arrays.asList(
final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, List.of(broker0, broker1)));
final List<String> checkpointFiles = List.of(
ReplicaManager.HighWatermarkFilename(),
LogManager.RecoveryPointCheckpointFile(),
CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);

View File

@ -27,7 +27,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -61,7 +60,7 @@ public class ListOffsetsTest extends TieredStorageTestHarness {
final int p0 = 0;
final Time time = new MockTime();
final long timestamp = time.milliseconds();
final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, Arrays.asList(broker0, broker1)));
final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, List.of(broker0, broker1)));
builder
.createTopic(topicA, 1, 2, 2, assignment, true)
@ -76,7 +75,7 @@ public class ListOffsetsTest extends TieredStorageTestHarness {
new KeyValueSpec("k2", "v2", timestamp + 2))
// switch leader and send more records to partition 0 and expect the second segment to be offloaded.
.reassignReplica(topicA, p0, Arrays.asList(broker1, broker0))
.reassignReplica(topicA, p0, List.of(broker1, broker0))
// After leader election, the partition's leader-epoch gets bumped from 0 -> 1
.expectLeader(topicA, p0, broker1, true)
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)

View File

@ -20,9 +20,8 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -46,13 +45,13 @@ public final class PartitionsExpandTest extends TieredStorageTestHarness {
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final List<Integer> p0Assignment = Arrays.asList(broker0, broker1);
final List<Integer> p1Assignment = Arrays.asList(broker0, broker1);
final List<Integer> p2Assignment = Arrays.asList(broker1, broker0);
final List<Integer> p0Assignment = List.of(broker0, broker1);
final List<Integer> p1Assignment = List.of(broker0, broker1);
final List<Integer> p2Assignment = List.of(broker1, broker0);
builder
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
Collections.singletonMap(p0, p0Assignment), enableRemoteLogStorage)
Map.of(p0, p0Assignment), enableRemoteLogStorage)
// produce events to partition 0
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tiered.storage.integration;
import java.util.Arrays;
import java.util.List;
public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {
@ -27,6 +26,6 @@ public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {
*/
@Override
protected List<Integer> replicaIds() {
return Arrays.asList(broker0, broker1);
return List.of(broker0, broker1);
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tiered.storage.integration;
import java.util.Collections;
import java.util.List;
public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {
@ -27,6 +26,6 @@ public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {
*/
@Override
protected List<Integer> replicaIds() {
return Collections.singletonList(broker1);
return List.of(broker1);
}
}

View File

@ -20,8 +20,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -60,8 +58,8 @@ public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final Map<Integer, List<Integer>> replicaAssignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1)),
mkEntry(p1, Arrays.asList(broker1, broker0))
mkEntry(p0, List.of(broker0, broker1)),
mkEntry(p1, List.of(broker1, broker0))
);
builder
@ -81,8 +79,8 @@ public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// shrink the replication factor to 1
.shrinkReplica(topicA, p0, Collections.singletonList(broker1))
.shrinkReplica(topicA, p1, Collections.singletonList(broker0))
.shrinkReplica(topicA, p0, List.of(broker1))
.shrinkReplica(topicA, p1, List.of(broker0))
.expectLeader(topicA, p0, broker1, false)
.expectLeader(topicA, p1, broker0, false)
// produce some more events to partition 0

View File

@ -21,7 +21,6 @@ import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,7 +51,7 @@ public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness {
builder.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment,
enableRemoteLogStorage)
// update the topic config such that it triggers the rolling of the active segment
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
.updateTopicConfig(topicA, configsToBeAdded(), List.of())
// produce events to partition 0 and expect all the 4 segments to be offloaded
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))

View File

@ -33,7 +33,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import scala.jdk.javaapi.CollectionConverters;
@ -77,7 +76,7 @@ public class TransactionsWithTieredStoreTest extends TransactionsTest {
CollectionConverters.asJava(topicPartitions).forEach(topicPartition -> {
List<BrokerLocalStorage> localStorages = CollectionConverters.asJava(brokers()).stream()
.map(b -> new BrokerLocalStorage(b.config().brokerId(), CollectionConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))
.collect(Collectors.toList());
.toList();
localStorages
.stream()
// Select brokers which are assigned a replica of the topic-partition

View File

@ -28,7 +28,6 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -114,7 +113,7 @@ public final class BrokerLocalStorage {
Long offset,
Function<OffsetHolder, Optional<String>> relativePosFunc) {
Timer timer = time.timer(TimeUnit.SECONDS.toMillis(storageWaitTimeoutSec));
OffsetHolder offsetHolder = new OffsetHolder(0L, Collections.emptyList());
OffsetHolder offsetHolder = new OffsetHolder(0L, List.of());
while (timer.notExpired() && offsetHolder.firstLogFileBaseOffset < offset) {
timer.sleep(TimeUnit.SECONDS.toMillis(storagePollPeriodSec));
offsetHolder = getEarliestLocalOffset(topicPartition);
@ -189,7 +188,7 @@ public final class BrokerLocalStorage {
}
public boolean dirContainsTopicPartition(TopicPartition topicPartition, File logDir) {
File[] files = getTopicPartitionFiles(topicPartition, Collections.singleton(logDir));
File[] files = getTopicPartitionFiles(topicPartition, Set.of(logDir));
return files != null && files.length > 0;
}
@ -218,7 +217,7 @@ public final class BrokerLocalStorage {
File topicPartitionDir = files[0];
return Arrays.stream(Objects.requireNonNull(topicPartitionDir.listFiles()))
.map(File::getName)
.collect(Collectors.toList());
.toList();
}
private static final class OffsetHolder {

View File

@ -25,7 +25,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
@ -72,7 +71,7 @@ public final class LocalTieredStorageOutput<K, V> implements LocalTieredStorageT
.stream()
.map(record -> new Tuple2<>(record.offset(),
"(" + des(keyDe, record.key()) + ", " + des(valueDe, record.value()) + ")"))
.collect(Collectors.toList());
.toList();
output += row(segFilename, offsetKeyValues.get(0).t1, offsetKeyValues.get(0).t2);
if (offsetKeyValues.size() > 1) {
offsetKeyValues.subList(1, records.size()).forEach(offsetKeyValue ->

View File

@ -33,7 +33,6 @@ import org.junit.jupiter.api.Assertions;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -41,7 +40,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG;
@ -69,7 +67,7 @@ public class TieredStorageTestUtils {
public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
throws ExecutionException, InterruptedException {
return describeTopics(context, Collections.singletonList(topic)).get(topic);
return describeTopics(context, List.of(topic)).get(topic);
}
public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
@ -102,7 +100,7 @@ public class TieredStorageTestUtils {
})
.sorted(Comparator.comparingLong(records -> records.get(0).offset()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
.toList();
}
public static Properties createPropsForRemoteStorage(String testClassName,