MINOR: Cleanup the storage module unit tests (#16202)

- Use SystemTime instead of MockTime when time is not mocked
- Use static assertions to reduce the line length
- Fold the lines if it exceeds the limit
- rename tp0 to tpId0 when it refers to TopicIdPartition

Reviewers: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kamal Chandraprakash 2024-06-06 21:26:08 +05:30 committed by GitHub
parent f36a873642
commit 0ed104c3dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 169 additions and 185 deletions

View File

@ -24,7 +24,6 @@ import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteL
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -35,6 +34,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
public class RemoteLogMetadataFormatterTest { public class RemoteLogMetadataFormatterTest {
private static final Uuid TOPIC_ID = Uuid.randomUuid(); private static final Uuid TOPIC_ID = Uuid.randomUuid();
@ -51,12 +51,12 @@ public class RemoteLogMetadataFormatterTest {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID); RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID);
Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10])); Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1, remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED,
123L, 1024, customMetadata, segLeaderEpochs);
RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes); ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
"__remote_log_metadata", 0, 0, null, metadataBytes);
String expected = String.format( String expected = String.format(
"partition: 0, offset: 0, value: " + "partition: 0, offset: 0, value: " +
@ -68,9 +68,11 @@ public class RemoteLogMetadataFormatterTest {
TOPIC_ID, SEGMENT_ID); TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) { PrintStream ps = new PrintStream(baos)) {
RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new RemoteLogMetadataSerde.RemoteLogMetadataFormatter(); try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter =
new RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) {
formatter.writeTo(metadataRecord, ps); formatter.writeTo(metadataRecord, ps);
assertEquals(expected, baos.toString()); assertEquals(expected, baos.toString());
} }
} }
} }
}

View File

@ -19,48 +19,48 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class RemoteLogMetadataSerdeTest { public class RemoteLogMetadataSerdeTest {
public static final String TOPIC = "foo"; public static final String TOPIC = "foo";
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
private final Time time = new MockTime(1); private final Time time = new SystemTime();
@Test @Test
public void testRemoteLogSegmentMetadataSerde() { public void testRemoteLogSegmentMetadataSerde() {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata();
doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
} }
@Test @Test
public void testRemoteLogSegmentMetadataUpdateSerde() { public void testRemoteLogSegmentMetadataUpdateSerde() {
RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate();
doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
} }
@Test @Test
public void testRemotePartitionDeleteMetadataSerde() { public void testRemotePartitionDeleteMetadataSerde() {
RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata(); RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata();
doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata); doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
} }
@ -71,23 +71,18 @@ public class RemoteLogMetadataSerdeTest {
segLeaderEpochs.put(2, 80L); segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_STARTED, segLeaderEpochs);
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
segLeaderEpochs
);
} }
private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 2);
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
} }
private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() {
return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED, return new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_MARKED, time.milliseconds(), 0);
time.milliseconds(), 0);
} }
private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) {
@ -96,16 +91,17 @@ public class RemoteLogMetadataSerdeTest {
byte[] metadataBytes = serializer.serialize(remoteLogMetadata); byte[] metadataBytes = serializer.serialize(remoteLogMetadata);
// Deserialize the bytes and check the RemoteLogMetadata object is as expected. // Deserialize the bytes and check the RemoteLogMetadata object is as expected.
// Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances. // Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and
// deserializer having their own instances.
RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();
RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes); RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes);
Assertions.assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata); assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
} }
@Test @Test
public void testInvalidRemoteStorageMetadata() { public void testInvalidRemoteStorageMetadata() {
// Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes. // Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes.
Assertions.assertThrows(IllegalArgumentException.class, assertThrows(IllegalArgumentException.class,
() -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds()))); () -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds())));
} }

View File

@ -19,7 +19,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
@ -29,43 +29,40 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RemoteLogMetadataTransformTest { public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new MockTime(1); private final Time time = new SystemTime();
@Test @Test
public void testRemoteLogSegmentMetadataTransform() { public void testRemoteLogSegmentMetadataTransform() {
RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform(); RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform();
RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata(); RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata();
ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata); ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata);
RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform
.fromApiMessageAndVersion(apiMessageAndVersion); .fromApiMessageAndVersion(apiMessageAndVersion);
assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
} }
@Test @Test
public void testRemoteLogSegmentMetadataUpdateTransform() { public void testRemoteLogSegmentMetadataUpdateTransform() {
RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform(); RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform();
RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate(
RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(), Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 1);
Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate); ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion); RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord =
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord); assertEquals(metadataUpdate, metadataUpdateFromRecord);
} }
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
@ -77,12 +74,11 @@ public class RemoteLogMetadataTransformTest {
@Test @Test
public void testRemoteLogPartitionMetadataTransform() { public void testRemoteLogPartitionMetadataTransform() {
RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform(); RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform();
RemotePartitionDeleteMetadata partitionDeleteMetadata RemotePartitionDeleteMetadata partitionDeleteMetadata
= new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1); = new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_STARTED, time.milliseconds(), 1);
ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata); ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata);
RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion); RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord =
transform.fromApiMessageAndVersion(apiMessageAndVersion);
Assertions.assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord); assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord);
} }
} }

View File

@ -20,10 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.HashMap; import java.util.HashMap;
@ -38,10 +35,10 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
public class TopicBasedRemoteLogMetadataManagerConfigTest { import static org.junit.jupiter.api.Assertions.assertEquals;
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
private static final String BOOTSTRAP_SERVERS = "localhost:9091"; public class TopicBasedRemoteLogMetadataManagerConfigTest {
private static final String BOOTSTRAP_SERVERS = "localhost:2222";
@Test @Test
public void testValidConfig() { public void testValidConfig() {
@ -60,41 +57,32 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
// Check for topic properties // Check for topic properties
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount());
// Check for common client configs. // Check for common client configs.
Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) { for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
log.info("Checking config: " + entry.getKey()); assertEquals(entry.getValue(), rlmmConfig.commonProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(), assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey()));
rlmmConfig.commonProperties().get(entry.getKey())); assertEquals(entry.getValue(), rlmmConfig.consumerProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
rlmmConfig.producerProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
rlmmConfig.consumerProperties().get(entry.getKey()));
} }
// Check for producer configs. // Check for producer configs.
for (Map.Entry<String, Object> entry : producerConfig.entrySet()) { for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
log.info("Checking config: " + entry.getKey()); assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
rlmmConfig.producerProperties().get(entry.getKey()));
} }
// Check for consumer configs. // Check for consumer configs.
for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) { for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
log.info("Checking config: " + entry.getKey()); assertEquals(entry.getValue(), rlmmConfig.consumerProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
rlmmConfig.consumerProperties().get(entry.getKey()));
} }
} }
@Test @Test
public void testCommonProducerConsumerOverridesConfig() { public void testCommonProducerConsumerOverridesConfig() {
Map.Entry<String, Long> overrideEntry = new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L); Map.Entry<String, Long> overrideEntry =
new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L);
Map<String, Object> commonClientConfig = new HashMap<>(); Map<String, Object> commonClientConfig = new HashMap<>();
commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10); commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L); commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L);
@ -114,12 +102,9 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Map<String, Object> props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig); Map<String, Object> props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig);
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
Assertions.assertEquals(overrideCommonPropValue, assertEquals(overrideCommonPropValue, rlmmConfig.commonProperties().get(overrideEntry.getKey()));
rlmmConfig.commonProperties().get(overrideEntry.getKey())); assertEquals(overriddenProducerPropValue, rlmmConfig.producerProperties().get(overrideEntry.getKey()));
Assertions.assertEquals(overriddenProducerPropValue, assertEquals(overriddenConsumerPropValue, rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
Assertions.assertEquals(overriddenConsumerPropValue,
rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
} }
private Map<String, Object> createValidConfigProps(Map<String, Object> commonClientConfig, private Map<String, Object> createValidConfigProps(Map<String, Object> commonClientConfig,
@ -129,7 +114,6 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(BROKER_ID, 1); props.put(BROKER_ID, 1);
props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath()); props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath());
props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3); props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3);
props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10); props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10);
props.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 60 * 60 * 1000L); props.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 60 * 60 * 1000L);
@ -138,17 +122,14 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) { for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + entry.getKey(), entry.getValue()); props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + entry.getKey(), entry.getValue());
} }
// producer configs // producer configs
for (Map.Entry<String, Object> entry : producerConfig.entrySet()) { for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_PRODUCER_PREFIX + entry.getKey(), entry.getValue()); props.put(REMOTE_LOG_METADATA_PRODUCER_PREFIX + entry.getKey(), entry.getValue());
} }
//consumer configs //consumer configs
for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) { for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), entry.getValue()); props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), entry.getValue());
} }
return props; return props;
} }
} }

View File

@ -27,12 +27,11 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -44,6 +43,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -55,10 +58,7 @@ import static org.mockito.Mockito.verify;
@ClusterTestDefaults(brokers = 3) @ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
private final ClusterInstance clusterInstance; private final ClusterInstance clusterInstance;
private final Time time = new SystemTime();
private static final int SEG_SIZE = 1024 * 1024;
private final Time time = new MockTime(1);
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) { TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance; this.clusterInstance = clusterInstance;
@ -125,24 +125,25 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
// Add segments for these partitions but an exception is received as they have not yet been subscribed. // Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager // These messages would have been published to the respective metadata topic partitions but the ConsumerManager
// has not yet been subscribing as they are not yet registered. // has not yet been subscribing as they are not yet registered.
int segSize = 1048576;
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, 0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); ExecutionException exception = assertThrows(ExecutionException.class,
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage()); exception.getMessage());
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, 0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); exception = assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage()); exception.getMessage());
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)); assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition));
Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.emptySet()); Collections.emptySet());
@ -156,8 +157,8 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
// leader partitions would have received as it is registered, but follower partition is not yet registered, // leader partitions would have received as it is registered, but follower partition is not yet registered,
// hence it throws an exception. // hence it throws an exception.
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext()); assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext());
Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
// Register follower partition // Register follower partition
// Phaser::bulkRegister and Phaser::register provide the "countUp" feature // Phaser::bulkRegister and Phaser::register provide the "countUp" feature
@ -172,15 +173,15 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition); verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata); verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
// In this state, all the metadata should be available in RLMM for both leader and follower partitions. // In this state, all the metadata should be available in RLMM for both leader and follower partitions.
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
} }
} }
private void createTopic(String topic, Map<Integer, List<Integer>> replicasAssignments) { private void createTopic(String topic, Map<Integer, List<Integer>> replicasAssignments) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments))); admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments)));
Assertions.assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size())); assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size()));
} }
} }
} }

View File

@ -24,12 +24,11 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -37,14 +36,13 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class) @ExtendWith(value = ClusterTestExtensions.class)
@Tag("integration") @Tag("integration")
public class TopicBasedRemoteLogMetadataManagerRestartTest { public class TopicBasedRemoteLogMetadataManagerRestartTest {
private static final int SEG_SIZE = 1024 * 1024; private final Time time = new SystemTime();
private final Time time = new MockTime(1);
private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath(); private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
private final ClusterInstance clusterInstance; private final ClusterInstance clusterInstance;
@ -76,16 +74,17 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
clusterInstance.waitForTopic(leaderTopic, 1); clusterInstance.waitForTopic(leaderTopic, 1);
clusterInstance.waitForTopic(followerTopic, 1); clusterInstance.waitForTopic(followerTopic, 1);
final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
int segSize = 1048576;
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata( RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, 0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata( RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, 0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) { try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) {
// Register these partitions to RemoteLogMetadataManager. // Register these partitions to RemoteLogMetadataManager.
@ -115,12 +114,14 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
101, 200, -1L, 0, 101, 200, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L)); time.milliseconds(), segSize, Collections.singletonMap(0, 101L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get(); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
// Check that both the stored segment and recently added segment are available. // Check that both the stored segment and recently added segment are available.
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), assertTrue(TestUtils.sameElementsWithoutOrder(
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))); Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
);
} }
} }
} }

View File

@ -25,14 +25,13 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException; import java.io.IOException;
@ -42,6 +41,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -50,11 +53,10 @@ import static org.mockito.Mockito.verify;
@ExtendWith(ClusterTestExtensions.class) @ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(brokers = 3) @ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerTest { public class TopicBasedRemoteLogMetadataManagerTest {
private static final int SEG_SIZE = 1024 * 1024; private static final int SEG_SIZE = 1048576;
private final ClusterInstance clusterInstance; private final ClusterInstance clusterInstance;
private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore());
private final Time time = new MockTime(1); private final Time time = new SystemTime();
private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager; private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager;
TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {
@ -83,7 +85,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get(); admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get();
clusterInstance.waitForTopic(topic, 1); clusterInstance.waitForTopic(topic, 1);
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
Assertions.assertTrue(doesTopicExist); assertTrue(doesTopicExist);
} }
} }
@ -92,7 +94,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
try (Admin admin = clusterInstance.createAdminClient()) { try (Admin admin = clusterInstance.createAdminClient()) {
String topic = "dummy-test-topic"; String topic = "dummy-test-topic";
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
Assertions.assertFalse(doesTopicExist); assertFalse(doesTopicExist);
} }
} }
@ -139,37 +141,37 @@ public class TopicBasedRemoteLogMetadataManagerTest {
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()), RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, 0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()), RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, 0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition)); assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition)); assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition), topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
Collections.singleton(newFollowerTopicIdPartition)); Collections.singleton(newFollowerTopicIdPartition));
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache. // fetching those events and build the cache.
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));
verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
} }
@ClusterTest @ClusterTest
public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() { public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0)); assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
} }
@ClusterTest @ClusterTest
@ -206,8 +208,8 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache. // fetching those events and build the cache.
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));
verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
@ -215,7 +217,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata);
Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0); Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);
Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize); assertEquals(SEG_SIZE * 6, remoteLogSize);
} }
@ClusterTest @ClusterTest
@ -251,16 +253,16 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache. // fetching those events and build the cache.
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));
verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata);
Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
} }
@ClusterTest @ClusterTest
@ -293,13 +295,13 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache. // fetching those events and build the cache.
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));
verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001)); assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
} }
} }

View File

@ -23,33 +23,39 @@ import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils; import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore; import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
//import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED;
@Tag("integration") @Tag("integration")
@ExtendWith(ClusterTestExtensions.class) @ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(brokers = 3) @ClusterTestDefaults(brokers = 3)
public class RemoteLogMetadataManagerTest { public class RemoteLogMetadataManagerTest {
private final ClusterInstance clusterInstance; private final ClusterInstance clusterInstance;
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private static final int SEG_SIZE = 1024 * 1024; private static final int SEG_SIZE = 1048576;
private static final int BROKER_ID_0 = 0; private static final int BROKER_ID_0 = 0;
private static final int BROKER_ID_1 = 1; private static final int BROKER_ID_1 = 1;
private final Time time = new SystemTime();
private final Time time = new MockTime(1);
RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance; this.clusterInstance = clusterInstance;
@ -71,26 +77,25 @@ public class RemoteLogMetadataManagerTest {
// 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available. // 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available.
Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L); Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0, RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); segmentId, 101L, 200L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
// Wait until the segment is added successfully. // Wait until the segment is added successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
// Search should not return the above segment. // Search should not return the above segment.
Assertions.assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent());
// 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available. // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available.
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
Optional.empty(), segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, BROKER_ID_1);
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
BROKER_ID_1);
// Wait until the segment is updated successfully. // Wait until the segment is updated successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get());
RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
// Search should return the above segment. // Search should return the above segment.
Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150); Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 =
Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150);
assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150);
} }
} }
@ -111,46 +116,46 @@ public class RemoteLogMetadataManagerTest {
segmentLeaderEpochs.put(2, 50L); segmentLeaderEpochs.put(2, 50L);
segmentLeaderEpochs.put(3, 80L); segmentLeaderEpochs.put(3, 80L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L, RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentId, 0L, 100L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
segmentLeaderEpochs);
// Wait until the segment is added successfully. // Wait until the segment is added successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
segmentId, time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, BROKER_ID_1);
// Wait until the segment is updated successfully. // Wait until the segment is updated successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get());
RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
// Check that the segment exists in RLMM. // Check that the segment exists in RLMM.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 =
Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L);
assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1);
// Mark the partition for deletion and wait for it to be updated successfully. // Mark the partition for deletion and wait for it to be updated successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED)).get()); createRemotePartitionDeleteMetadata(DELETE_PARTITION_MARKED)).get());
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark =
1, 30L); remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark); assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark);
// Set the partition deletion state as started. Partition and segments should still be accessible as they are not // Set the partition deletion state as started. Partition and segments should still be accessible as they are not
// yet deleted. Wait until the segment state is updated successfully. // yet deleted. Wait until the segment state is updated successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED)).get()); createRemotePartitionDeleteMetadata(DELETE_PARTITION_STARTED)).get());
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart =
1, 30L); remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart); assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart);
// Set the partition deletion state as finished. RLMM should clear all its internal state for that partition. // Set the partition deletion state as finished. RLMM should clear all its internal state for that partition.
// Wait until the segment state is updated successfully. // Wait until the segment state is updated successfully.
Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED)).get()); createRemotePartitionDeleteMetadata(DELETE_PARTITION_FINISHED)).get());
Assertions.assertThrows(RemoteResourceNotFoundException.class, assertThrows(RemoteResourceNotFoundException.class,
() -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L)); () -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L));
} }
} }