MINOR: Cleanups in storage module (#20270)

Cleanups including:

- Rewrite `FetchCountAndOp`  as a record class
- Replace `Tuple` by `Map.Entry`

Reviewers: TengYao Chi <frankvicky@apache.org>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Lan Ding 2025-07-31 20:55:49 +08:00 committed by GitHub
parent c7caf912aa
commit d0a9a04a02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 102 additions and 101 deletions

View File

@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -66,14 +67,11 @@ import static org.mockito.Mockito.when;
@Timeout(60) @Timeout(60)
class CoordinatorLoaderImplTest { class CoordinatorLoaderImplTest {
private record Tuple<K, V>(K key, V value) { private static class StringKeyValueDeserializer implements Deserializer<Map.Entry<String, String>> {
}
private static class StringKeyValueDeserializer implements Deserializer<Tuple<String, String>> {
@Override @Override
public Tuple<String, String> deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException { public Map.Entry<String, String> deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException {
return new Tuple<>( return Map.entry(
StandardCharsets.UTF_8.decode(key).toString(), StandardCharsets.UTF_8.decode(key).toString(),
StandardCharsets.UTF_8.decode(value).toString() StandardCharsets.UTF_8.decode(value).toString()
); );
@ -85,10 +83,10 @@ class CoordinatorLoaderImplTest {
TopicPartition tp = new TopicPartition("foo", 0); TopicPartition tp = new TopicPartition("foo", 0);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.empty(); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.empty();
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.empty(); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.empty();
Deserializer<Tuple<String, String>> serde = mock(Deserializer.class); Deserializer<Map.Entry<String, String>> serde = mock(Deserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -104,10 +102,10 @@ class CoordinatorLoaderImplTest {
TopicPartition tp = new TopicPartition("foo", 0); TopicPartition tp = new TopicPartition("foo", 0);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(mock(UnifiedLog.class)); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(mock(UnifiedLog.class));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.empty(); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.empty();
Deserializer<Tuple<String, String>> serde = mock(Deserializer.class); Deserializer<Map.Entry<String, String>> serde = mock(Deserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -125,10 +123,10 @@ class CoordinatorLoaderImplTest {
UnifiedLog log = mock(UnifiedLog.class); UnifiedLog log = mock(UnifiedLog.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(9L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(9L);
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer(); Deserializer<Map.Entry<String, String>> serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -188,13 +186,13 @@ class CoordinatorLoaderImplTest {
// Includes 7 normal + 2 control (COMMIT, ABORT) // Includes 7 normal + 2 control (COMMIT, ABORT)
assertEquals(9, summary.numRecords()); assertEquals(9, summary.numRecords());
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3")); verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4")); verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5")); verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
verify(coordinator).replay(5L, 100L, (short) 5, new Tuple<>("k6", "v6")); verify(coordinator).replay(5L, 100L, (short) 5, Map.entry("k6", "v6"));
verify(coordinator).replay(6L, 100L, (short) 5, new Tuple<>("k7", "v7")); verify(coordinator).replay(6L, 100L, (short) 5, Map.entry("k7", "v7"));
verify(coordinator).replayEndTransactionMarker(100L, (short) 5, TransactionResult.COMMIT); verify(coordinator).replayEndTransactionMarker(100L, (short) 5, TransactionResult.COMMIT);
verify(coordinator).replayEndTransactionMarker(500L, (short) 10, TransactionResult.ABORT); verify(coordinator).replayEndTransactionMarker(500L, (short) 10, TransactionResult.ABORT);
verify(coordinator).updateLastWrittenOffset(2L); verify(coordinator).updateLastWrittenOffset(2L);
@ -211,10 +209,10 @@ class CoordinatorLoaderImplTest {
UnifiedLog log = mock(UnifiedLog.class); UnifiedLog log = mock(UnifiedLog.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(100L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(100L);
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer(); Deserializer<Map.Entry<String, String>> serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -257,9 +255,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(2L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(2L);
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -278,11 +276,11 @@ class CoordinatorLoaderImplTest {
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class))) when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
.thenThrow(new Deserializer.UnknownRecordTypeException((short) 1)) .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
.thenReturn(new Tuple<>("k2", "v2")); .thenReturn(Map.entry("k2", "v2"));
loader.load(tp, coordinator).get(10, TimeUnit.SECONDS); loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
} }
} }
@ -293,9 +291,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(2L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(2L);
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -332,9 +330,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(10L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(10L);
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -359,10 +357,10 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(5L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(5L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
MockTime time = new MockTime(); MockTime time = new MockTime();
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
time, time,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -407,9 +405,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -446,13 +444,13 @@ class CoordinatorLoaderImplTest {
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3")); verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4")); verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5")); verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6")); verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7")); verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L); verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(1)).updateLastWrittenOffset(2L); verify(coordinator, times(1)).updateLastWrittenOffset(2L);
verify(coordinator, times(1)).updateLastWrittenOffset(5L); verify(coordinator, times(1)).updateLastWrittenOffset(5L);
@ -470,9 +468,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(0L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(0L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -496,9 +494,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,
@ -535,13 +533,13 @@ class CoordinatorLoaderImplTest {
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3")); verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4")); verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5")); verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6")); verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7")); verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L); verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(0)).updateLastWrittenOffset(2L); verify(coordinator, times(0)).updateLastWrittenOffset(2L);
verify(coordinator, times(0)).updateLastWrittenOffset(5L); verify(coordinator, times(0)).updateLastWrittenOffset(5L);
@ -560,9 +558,9 @@ class CoordinatorLoaderImplTest {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log); Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = mock(Function.class); Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = mock(Function.class);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class); CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>( try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM, Time.SYSTEM,
partitionLogSupplier, partitionLogSupplier,
partitionLogEndOffsetSupplier, partitionLogEndOffsetSupplier,

View File

@ -206,7 +206,7 @@ public final class TieredStorageTestBuilder {
RemoteFetchCount remoteFetchRequestCount) { RemoteFetchCount remoteFetchRequestCount) {
TopicPartition topicPartition = new TopicPartition(topic, partition); TopicPartition topicPartition = new TopicPartition(topic, partition);
assertTrue(partition >= 0, "Partition must be >= 0"); assertTrue(partition >= 0, "Partition must be >= 0");
assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0, "Expected fetch count from tiered storage must be >= 0"); assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().count() >= 0, "Expected fetch count from tiered storage must be >= 0");
assertFalse(fetchables.containsKey(topicPartition), "Consume already in progress for " + topicPartition); assertFalse(fetchables.containsKey(topicPartition), "Consume already in progress for " + topicPartition);
fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount)); fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount));
return this; return this;

View File

@ -25,6 +25,7 @@ import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
@ -141,16 +142,16 @@ public final class ConsumeAction implements TieredStorageTestAction {
.orElse(events); .orElse(events);
RemoteFetchCount remoteFetchCount = remoteFetchSpec.remoteFetchCount(); RemoteFetchCount remoteFetchCount = remoteFetchSpec.remoteFetchCount();
RemoteFetchCount.FetchCountAndOp expectedCountAndOp = switch (eventType) { FetchCountAndOp expectedCountAndOp = switch (eventType) {
case FETCH_SEGMENT -> remoteFetchCount.getSegmentFetchCountAndOp(); case FETCH_SEGMENT -> remoteFetchCount.getSegmentFetchCountAndOp();
case FETCH_OFFSET_INDEX -> remoteFetchCount.getOffsetIdxFetchCountAndOp(); case FETCH_OFFSET_INDEX -> remoteFetchCount.getOffsetIdxFetchCountAndOp();
case FETCH_TIME_INDEX -> remoteFetchCount.getTimeIdxFetchCountAndOp(); case FETCH_TIME_INDEX -> remoteFetchCount.getTimeIdxFetchCountAndOp();
case FETCH_TRANSACTION_INDEX -> remoteFetchCount.getTxnIdxFetchCountAndOp(); case FETCH_TRANSACTION_INDEX -> remoteFetchCount.getTxnIdxFetchCountAndOp();
default -> new RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO); default -> new FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO);
}; };
RemoteFetchCount.OperationType exceptedOperationType = expectedCountAndOp.getOperationType(); RemoteFetchCount.OperationType exceptedOperationType = expectedCountAndOp.operationType();
int exceptedCount = expectedCountAndOp.getCount(); int exceptedCount = expectedCountAndOp.count();
int actualCount = eventsInScope.size(); int actualCount = eventsInScope.size();
String message = errorMessage(eventType, actualCount, exceptedOperationType, exceptedCount); String message = errorMessage(eventType, actualCount, exceptedOperationType, exceptedCount);
if (exceptedCount != -1) { if (exceptedCount != -1) {

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness; import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec; import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.apache.kafka.tiered.storage.specs.RemoteFetchCount; import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
@ -28,7 +29,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp;
import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO; import static org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
/** /**

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.specs;
public record FetchCountAndOp(
int count,
RemoteFetchCount.OperationType operationType
) {
public FetchCountAndOp(int count) {
this(count, RemoteFetchCount.OperationType.EQUALS_TO);
}
@Override
public String toString() {
return "FetchCountAndOp{" +
"count=" + count +
", operationType=" + operationType +
'}';
}
}

View File

@ -85,35 +85,4 @@ public class RemoteFetchCount {
GREATER_THAN_OR_EQUALS_TO, GREATER_THAN_OR_EQUALS_TO,
LESS_THAN_OR_EQUALS_TO LESS_THAN_OR_EQUALS_TO
} }
public static class FetchCountAndOp {
private final int count;
private final OperationType operationType;
public FetchCountAndOp(int count) {
this.count = count;
this.operationType = OperationType.EQUALS_TO;
}
public FetchCountAndOp(int count, OperationType operationType) {
this.count = count;
this.operationType = operationType;
}
public int getCount() {
return count;
}
public OperationType getOperationType() {
return operationType;
}
@Override
public String toString() {
return "FetchCountAndOp{" +
"count=" + count +
", operationType=" + operationType +
'}';
}
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
@ -67,15 +68,15 @@ public final class LocalTieredStorageOutput<K, V> implements LocalTieredStorageT
if (records.isEmpty()) { if (records.isEmpty()) {
output += row(segFilename, -1, ""); output += row(segFilename, -1, "");
} else { } else {
List<Tuple2<Long, String>> offsetKeyValues = records List<Map.Entry<Long, String>> offsetKeyValues = records
.stream() .stream()
.map(record -> new Tuple2<>(record.offset(), .map(record -> Map.entry(record.offset(),
"(" + des(keyDe, record.key()) + ", " + des(valueDe, record.value()) + ")")) "(" + des(keyDe, record.key()) + ", " + des(valueDe, record.value()) + ")"))
.toList(); .toList();
output += row(segFilename, offsetKeyValues.get(0).t1, offsetKeyValues.get(0).t2); output += row(segFilename, offsetKeyValues.get(0).getKey(), offsetKeyValues.get(0).getValue());
if (offsetKeyValues.size() > 1) { if (offsetKeyValues.size() > 1) {
offsetKeyValues.subList(1, records.size()).forEach(offsetKeyValue -> offsetKeyValues.subList(1, records.size()).forEach(offsetKeyValue ->
output += row("", offsetKeyValue.t1, offsetKeyValue.t2)); output += row("", offsetKeyValue.getKey(), offsetKeyValue.getValue()));
} }
} }
output += row(); output += row();
@ -91,7 +92,4 @@ public final class LocalTieredStorageOutput<K, V> implements LocalTieredStorageT
private String des(Deserializer<?> de, ByteBuffer bytes) { private String des(Deserializer<?> de, ByteBuffer bytes) {
return de.deserialize(currentTopic, Utils.toNullableArray(bytes)).toString(); return de.deserialize(currentTopic, Utils.toNullableArray(bytes)).toString();
} }
private record Tuple2<T1, T2>(T1 t1, T2 t2) {
}
} }