From d0a9a04a0202f2ce6354fd72488aab665d046ee7 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Thu, 31 Jul 2025 20:55:49 +0800 Subject: [PATCH] MINOR: Cleanups in storage module (#20270) Cleanups including: - Rewrite `FetchCountAndOp` as a record class - Replace `Tuple` by `Map.Entry` Reviewers: TengYao Chi , Chia-Ping Tsai --- .../runtime/CoordinatorLoaderImplTest.java | 112 +++++++++--------- .../storage/TieredStorageTestBuilder.java | 2 +- .../tiered/storage/actions/ConsumeAction.java | 9 +- .../OffloadAndTxnConsumeFromLeaderTest.java | 2 +- .../tiered/storage/specs/FetchCountAndOp.java | 35 ++++++ .../storage/specs/RemoteFetchCount.java | 31 ----- .../utils/LocalTieredStorageOutput.java | 12 +- 7 files changed, 102 insertions(+), 101 deletions(-) create mode 100644 storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java index 0113fbd657d..8760e9347a1 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java @@ -43,6 +43,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -66,14 +67,11 @@ import static org.mockito.Mockito.when; @Timeout(60) class CoordinatorLoaderImplTest { - private record Tuple(K key, V value) { - } - - private static class StringKeyValueDeserializer implements Deserializer> { + private static class StringKeyValueDeserializer implements Deserializer> { @Override - public Tuple deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException { - return new Tuple<>( + public Map.Entry deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException { + return Map.entry( StandardCharsets.UTF_8.decode(key).toString(), StandardCharsets.UTF_8.decode(value).toString() ); @@ -85,10 +83,10 @@ class CoordinatorLoaderImplTest { TopicPartition tp = new TopicPartition("foo", 0); Function> partitionLogSupplier = partition -> Optional.empty(); Function> partitionLogEndOffsetSupplier = partition -> Optional.empty(); - Deserializer> serde = mock(Deserializer.class); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + Deserializer> serde = mock(Deserializer.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -104,10 +102,10 @@ class CoordinatorLoaderImplTest { TopicPartition tp = new TopicPartition("foo", 0); Function> partitionLogSupplier = partition -> Optional.of(mock(UnifiedLog.class)); Function> partitionLogEndOffsetSupplier = partition -> Optional.empty(); - Deserializer> serde = mock(Deserializer.class); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + Deserializer> serde = mock(Deserializer.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -125,10 +123,10 @@ class CoordinatorLoaderImplTest { UnifiedLog log = mock(UnifiedLog.class); Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(9L); - Deserializer> serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + Deserializer> serde = new StringKeyValueDeserializer(); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -188,13 +186,13 @@ class CoordinatorLoaderImplTest { // Includes 7 normal + 2 control (COMMIT, ABORT) assertEquals(9, summary.numRecords()); - verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); - verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); - verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3")); - verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4")); - verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5")); - verify(coordinator).replay(5L, 100L, (short) 5, new Tuple<>("k6", "v6")); - verify(coordinator).replay(6L, 100L, (short) 5, new Tuple<>("k7", "v7")); + verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1")); + verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2")); + verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3")); + verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4")); + verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5")); + verify(coordinator).replay(5L, 100L, (short) 5, Map.entry("k6", "v6")); + verify(coordinator).replay(6L, 100L, (short) 5, Map.entry("k7", "v7")); verify(coordinator).replayEndTransactionMarker(100L, (short) 5, TransactionResult.COMMIT); verify(coordinator).replayEndTransactionMarker(500L, (short) 10, TransactionResult.ABORT); verify(coordinator).updateLastWrittenOffset(2L); @@ -211,10 +209,10 @@ class CoordinatorLoaderImplTest { UnifiedLog log = mock(UnifiedLog.class); Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(100L); - Deserializer> serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + Deserializer> serde = new StringKeyValueDeserializer(); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -257,9 +255,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(2L); StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -278,11 +276,11 @@ class CoordinatorLoaderImplTest { when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class))) .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1)) - .thenReturn(new Tuple<>("k2", "v2")); + .thenReturn(Map.entry("k2", "v2")); loader.load(tp, coordinator).get(10, TimeUnit.SECONDS); - verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); + verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2")); } } @@ -293,9 +291,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(2L); StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -332,9 +330,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(10L); StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -359,10 +357,10 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(5L); StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); MockTime time = new MockTime(); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( time, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -407,9 +405,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -446,13 +444,13 @@ class CoordinatorLoaderImplTest { assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); - verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); - verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); - verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3")); - verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4")); - verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5")); - verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6")); - verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7")); + verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1")); + verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2")); + verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3")); + verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4")); + verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5")); + verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6")); + verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7")); verify(coordinator, times(0)).updateLastWrittenOffset(0L); verify(coordinator, times(1)).updateLastWrittenOffset(2L); verify(coordinator, times(1)).updateLastWrittenOffset(5L); @@ -470,9 +468,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(0L); StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -496,9 +494,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, @@ -535,13 +533,13 @@ class CoordinatorLoaderImplTest { assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); - verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); - verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); - verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3")); - verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4")); - verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5")); - verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6")); - verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7")); + verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1")); + verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2")); + verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3")); + verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4")); + verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5")); + verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6")); + verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7")); verify(coordinator, times(0)).updateLastWrittenOffset(0L); verify(coordinator, times(0)).updateLastWrittenOffset(2L); verify(coordinator, times(0)).updateLastWrittenOffset(5L); @@ -560,9 +558,9 @@ class CoordinatorLoaderImplTest { Function> partitionLogSupplier = partition -> Optional.of(log); Function> partitionLogEndOffsetSupplier = mock(Function.class); StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); - CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); - try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( Time.SYSTEM, partitionLogSupplier, partitionLogEndOffsetSupplier, diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java index b8a75102b9c..db9fd4f9b9b 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java @@ -206,7 +206,7 @@ public final class TieredStorageTestBuilder { RemoteFetchCount remoteFetchRequestCount) { TopicPartition topicPartition = new TopicPartition(topic, partition); assertTrue(partition >= 0, "Partition must be >= 0"); - assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0, "Expected fetch count from tiered storage must be >= 0"); + assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().count() >= 0, "Expected fetch count from tiered storage must be >= 0"); assertFalse(fetchables.containsKey(topicPartition), "Consume already in progress for " + topicPartition); fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount)); return this; diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java index f7a83bfe1de..0288718b930 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java @@ -25,6 +25,7 @@ import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.specs.FetchCountAndOp; import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; @@ -141,16 +142,16 @@ public final class ConsumeAction implements TieredStorageTestAction { .orElse(events); RemoteFetchCount remoteFetchCount = remoteFetchSpec.remoteFetchCount(); - RemoteFetchCount.FetchCountAndOp expectedCountAndOp = switch (eventType) { + FetchCountAndOp expectedCountAndOp = switch (eventType) { case FETCH_SEGMENT -> remoteFetchCount.getSegmentFetchCountAndOp(); case FETCH_OFFSET_INDEX -> remoteFetchCount.getOffsetIdxFetchCountAndOp(); case FETCH_TIME_INDEX -> remoteFetchCount.getTimeIdxFetchCountAndOp(); case FETCH_TRANSACTION_INDEX -> remoteFetchCount.getTxnIdxFetchCountAndOp(); - default -> new RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO); + default -> new FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO); }; - RemoteFetchCount.OperationType exceptedOperationType = expectedCountAndOp.getOperationType(); - int exceptedCount = expectedCountAndOp.getCount(); + RemoteFetchCount.OperationType exceptedOperationType = expectedCountAndOp.operationType(); + int exceptedCount = expectedCountAndOp.count(); int actualCount = eventsInScope.size(); String message = errorMessage(eventType, actualCount, exceptedOperationType, exceptedCount); if (exceptedCount != -1) { diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java index ba210722c9c..9f3cd9e3637 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.FetchCountAndOp; import org.apache.kafka.tiered.storage.specs.KeyValueSpec; import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; @@ -28,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp; import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO; /** diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java new file mode 100644 index 00000000000..5d0b8ff8323 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchCountAndOp.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tiered.storage.specs; + +public record FetchCountAndOp( + int count, + RemoteFetchCount.OperationType operationType +) { + public FetchCountAndOp(int count) { + this(count, RemoteFetchCount.OperationType.EQUALS_TO); + } + + @Override + public String toString() { + return "FetchCountAndOp{" + + "count=" + count + + ", operationType=" + operationType + + '}'; + } +} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java index 67a1d8e5b49..7f3b10c84b1 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java @@ -85,35 +85,4 @@ public class RemoteFetchCount { GREATER_THAN_OR_EQUALS_TO, LESS_THAN_OR_EQUALS_TO } - - public static class FetchCountAndOp { - private final int count; - private final OperationType operationType; - - public FetchCountAndOp(int count) { - this.count = count; - this.operationType = OperationType.EQUALS_TO; - } - - public FetchCountAndOp(int count, OperationType operationType) { - this.count = count; - this.operationType = operationType; - } - - public int getCount() { - return count; - } - - public OperationType getOperationType() { - return operationType; - } - - @Override - public String toString() { - return "FetchCountAndOp{" + - "count=" + count + - ", operationType=" + operationType + - '}'; - } - } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java index 505f3c4dd7c..68cf4178eea 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/LocalTieredStorageOutput.java @@ -25,6 +25,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; @@ -67,15 +68,15 @@ public final class LocalTieredStorageOutput implements LocalTieredStorageT if (records.isEmpty()) { output += row(segFilename, -1, ""); } else { - List> offsetKeyValues = records + List> offsetKeyValues = records .stream() - .map(record -> new Tuple2<>(record.offset(), + .map(record -> Map.entry(record.offset(), "(" + des(keyDe, record.key()) + ", " + des(valueDe, record.value()) + ")")) .toList(); - output += row(segFilename, offsetKeyValues.get(0).t1, offsetKeyValues.get(0).t2); + output += row(segFilename, offsetKeyValues.get(0).getKey(), offsetKeyValues.get(0).getValue()); if (offsetKeyValues.size() > 1) { offsetKeyValues.subList(1, records.size()).forEach(offsetKeyValue -> - output += row("", offsetKeyValue.t1, offsetKeyValue.t2)); + output += row("", offsetKeyValue.getKey(), offsetKeyValue.getValue())); } } output += row(); @@ -91,7 +92,4 @@ public final class LocalTieredStorageOutput implements LocalTieredStorageT private String des(Deserializer de, ByteBuffer bytes) { return de.deserialize(currentTopic, Utils.toNullableArray(bytes)).toString(); } - - private record Tuple2(T1 t1, T2 t2) { - } }