KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4) (#14307)

Added the below integration tests with tiered storage
 - PartitionsExpandTest
 - DeleteSegmentsByRetentionSizeTest
 - DeleteSegmentsByRetentionTimeTest and
 - EnableRemoteLogOnTopicTest
 - Enabled the test for both ZK and Kraft modes.

These are enabled for both ZK and Kraft modes.

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Kamal Chandraprakash 2023-09-05 05:13:16 +05:30 committed by GitHub
parent d34d84dbef
commit caaa4c55fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 372 additions and 21 deletions

View File

@ -302,7 +302,7 @@ public class RemoteLogManager implements Closeable {
private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) { private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
Uuid previousTopicId = topicIdByPartitionMap.put(topicIdPartition.topicPartition(), topicIdPartition.topicId()); Uuid previousTopicId = topicIdByPartitionMap.put(topicIdPartition.topicPartition(), topicIdPartition.topicId());
if (previousTopicId != null && previousTopicId != topicIdPartition.topicId()) { if (previousTopicId != null && !previousTopicId.equals(topicIdPartition.topicId())) {
LOGGER.info("Previous cached topic id {} for {} does not match updated topic id {}", LOGGER.info("Previous cached topic id {} for {} does not match updated topic id {}",
previousTopicId, topicIdPartition.topicPartition(), topicIdPartition.topicId()); previousTopicId, topicIdPartition.topicPartition(), topicIdPartition.topicId());
} }

View File

@ -463,17 +463,21 @@ class BrokerServer(
new KafkaConfig(config.originals(), true) new KafkaConfig(config.originals(), true)
// Start RemoteLogManager before broker start serving the requests. // Start RemoteLogManager before broker start serving the requests.
remoteLogManagerOpt.foreach(rlm => { remoteLogManagerOpt.foreach { rlm =>
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName() val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
if (listenerName != null) { if (listenerName != null) {
val endpoint = endpoints.stream.filter(e => e.listenerName.equals(ListenerName.normalised(listenerName))) val endpoint = endpoints.stream
.filter(e =>
e.listenerName().isPresent &&
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
)
.findFirst() .findFirst()
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP + .orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
" should be set as a listener name within valid broker listener name list.")) listenerName, "Should be set as a listener name within valid broker listener name list: " + endpoints))
rlm.onEndPointCreated(EndPoint.fromJava(endpoint)) rlm.onEndPointCreated(EndPoint.fromJava(endpoint))
} }
rlm.startup() rlm.startup()
}) }
// We're now ready to unfence the broker. This also allows this broker to transition // We're now ready to unfence the broker. This also allows this broker to transition
// from RECOVERY state to RUNNING state, once the controller unfences the broker. // from RECOVERY state to RUNNING state, once the controller unfences the broker.

View File

@ -506,17 +506,18 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
// Start RemoteLogManager before broker start serving the requests. // Start RemoteLogManager before broker start serving the requests.
remoteLogManagerOpt.foreach(rlm => { remoteLogManagerOpt.foreach { rlm =>
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName() val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
if (listenerName != null) { if (listenerName != null) {
brokerInfo.broker.endPoints brokerInfo.broker.endPoints
.find(e => e.listenerName.equals(ListenerName.normalised(listenerName))) .find(e => e.listenerName.equals(ListenerName.normalised(listenerName)))
.orElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP + .orElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
" should be set as a listener name within valid broker listener name list.")) listenerName, "Should be set as a listener name within valid broker listener name list: "
+ brokerInfo.broker.endPoints.map(_.listenerName).mkString(",")))
.foreach(e => rlm.onEndPointCreated(e)) .foreach(e => rlm.onEndPointCreated(e))
} }
rlm.startup() rlm.startup()
}) }
/* start processing requests */ /* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache, brokerEpochManager) val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache, brokerEpochManager)

View File

@ -190,13 +190,14 @@ public final class TieredStorageTestContext implements AutoCloseable {
* @param batchSize the batch size * @param batchSize the batch size
*/ */
public void produce(List<ProducerRecord<String, String>> recordsToProduce, Integer batchSize) { public void produce(List<ProducerRecord<String, String>> recordsToProduce, Integer batchSize) {
int counter = 0; int counter = 1;
for (ProducerRecord<String, String> record : recordsToProduce) { for (ProducerRecord<String, String> record : recordsToProduce) {
producer.send(record); producer.send(record);
if (counter++ % batchSize == 0) { if (counter++ % batchSize == 0) {
producer.flush(); producer.flush();
} }
} }
producer.flush();
} }
public List<ConsumerRecord<String, String>> consume(TopicPartition topicPartition, public List<ConsumerRecord<String, String>> consume(TopicPartition topicPartition,

View File

@ -33,11 +33,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.Seq; import scala.collection.Seq;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Optional; import java.util.Optional;
@ -77,9 +79,9 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
private static final Integer LOG_CLEANUP_INTERVAL_MS = 500; private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
private static final Integer RLM_TASK_INTERVAL_MS = 500; private static final Integer RLM_TASK_INTERVAL_MS = 500;
protected int numRemoteLogMetadataPartitions = 5;
private TieredStorageTestContext context; private TieredStorageTestContext context;
private String testClassName = ""; private String testClassName = "";
private String storageDirPath = "";
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
@ -89,6 +91,16 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
} }
} }
@SuppressWarnings("deprecation")
@Override
public Seq<Properties> kraftControllerConfigs() {
return JavaConverters.asScalaBuffer(Collections.singletonList(overridingProps())).toSeq();
}
protected int numRemoteLogMetadataPartitions() {
return 5;
}
public Properties overridingProps() { public Properties overridingProps() {
Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS), Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
"STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS"); "STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS");
@ -114,7 +126,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
overridingProps.setProperty( overridingProps.setProperty(
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
String.valueOf(numRemoteLogMetadataPartitions)); String.valueOf(numRemoteLogMetadataPartitions()));
overridingProps.setProperty( overridingProps.setProperty(
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP), metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
String.valueOf(brokerCount())); String.valueOf(brokerCount()));
@ -130,8 +142,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
// in every broker and throughout the test. Indeed, as brokers are restarted during the test. // in every broker and throughout the test. Indeed, as brokers are restarted during the test.
// You can override this property with a fixed path of your choice if you wish to use a non-temporary // You can override this property with a fixed path of your choice if you wish to use a non-temporary
// directory to access its content after a test terminated. // directory to access its content after a test terminated.
overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), storageDirPath);
TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath());
// This configuration will remove all the remote files when close is called in remote storage manager. // This configuration will remove all the remote files when close is called in remote storage manager.
// Storage manager close is being called while the server is actively processing the socket requests, // Storage manager close is being called while the server is actively processing the socket requests,
// so enabling this config can break the existing tests. // so enabling this config can break the existing tests.
@ -150,12 +161,15 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
@Override @Override
public void setUp(TestInfo testInfo) { public void setUp(TestInfo testInfo) {
testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault()); testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath();
super.setUp(testInfo); super.setUp(testInfo);
context = new TieredStorageTestContext(this); context = new TieredStorageTestContext(this);
} }
@Test // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
public void executeTieredStorageTest() { @ParameterizedTest(name = "{displayName}.quorum={0}")
@ValueSource(strings = {"zk", "kraft"})
public void executeTieredStorageTest(String quorum) {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder(); TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
writeTestSpecifications(builder); writeTestSpecifications(builder);
try { try {

View File

@ -36,8 +36,12 @@ public final class ExpectEmptyRemoteStorageAction implements TieredStorageTestAc
public void doExecute(TieredStorageTestContext context) throws InterruptedException { public void doExecute(TieredStorageTestContext context) throws InterruptedException {
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
LocalTieredStorageSnapshot snapshot = context.takeTieredStorageSnapshot(); LocalTieredStorageSnapshot snapshot = context.takeTieredStorageSnapshot();
return !snapshot.getTopicPartitions().contains(topicPartition) && // We don't differentiate the case between segment deletion and topic deletion so the underlying
snapshot.getFilesets(topicPartition).isEmpty(); // remote-storage-manager (RSM) doesn't know when to remove any topic-level marker files/folders.
// In case of LocalTieredStorage (RSM), there will be empty partition directories.
// With KAFKA-15166, the RSM will be able to delete the topic-level marker folders, then the
// `LocalTieredStorageSnapshot` should not contain the partition directories.
return snapshot.getFilesets(topicPartition).isEmpty();
}, 2000L, "Remote storage is not empty for " + topicPartition); }, 2000L, "Remote storage is not empty for " + topicPartition);
} }

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
@Override
public int brokerCount() {
return 1;
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker0 = 0;
final String topicA = "topicA";
final Integer p0 = 0;
final Integer partitionCount = 1;
final Integer replicationFactor = 1;
final Integer maxBatchCountPerSegment = 1;
final Map<Integer, List<Integer>> replicaAssignment = null;
final boolean enableRemoteLogStorage = true;
final int beginEpoch = 0;
final long startOffset = 3;
// Create topicA with 1 partition, 1 RF and enabled with remote storage.
builder.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment,
enableRemoteLogStorage)
// produce events to partition 0 and expect 3 segments to be offloaded
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"))
// update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
// expect that the three offloaded remote log segments are deleted
.expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 3)
.waitForRemoteLogSegmentDeletion(topicA)
// expect that the leader epoch checkpoint is updated
.expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch, startOffset)
// consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 0)
.consume(topicA, p0, 0L, 1, 0);
}
protected abstract Map<String, String> configsToBeAdded();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.Map;
public final class DeleteSegmentsByRetentionSizeTest extends BaseDeleteSegmentsTest {
@Override
protected Map<String, String> configsToBeAdded() {
return Collections.singletonMap(TopicConfig.RETENTION_BYTES_CONFIG, "1");
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.Map;
public final class DeleteSegmentsByRetentionTimeTest extends BaseDeleteSegmentsTest {
@Override
protected Map<String, String> configsToBeAdded() {
return Collections.singletonMap(TopicConfig.RETENTION_MS_CONFIG, "1");
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness {
@Override
public int brokerCount() {
return 2;
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker0 = 0;
final Integer broker1 = 1;
final String topicA = "topicA";
final Integer p0 = 0;
final Integer p1 = 1;
final Integer partitionCount = 2;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = false;
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1)),
mkEntry(p1, Arrays.asList(broker1, broker0))
);
builder
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment,
enableRemoteLogStorage)
// send records to partition 0
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// send records to partition 1
.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L)
.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// enable remote log storage
.updateTopicConfig(topicA,
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
Collections.emptyList())
// produce some more records to partition 0
// Note that the segment 0-2 gets offloaded for p0, but we cannot expect those events deterministically
// because the rlm-task-thread runs in background and this framework doesn't support it.
.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))
// produce some more records to partition 1
// Note that the segment 0-2 gets offloaded for p1, but we cannot expect those events deterministically
// because the rlm-task-thread runs in background and this framework doesn't support it.
.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new KeyValueSpec("k3", "v3"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 4L)
.produce(topicA, p1, new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))
// consume from the beginning of the topic to read data from local and remote storage for partition 0
.expectFetchFromTieredStorage(broker0, topicA, p0, 4)
.consume(topicA, p0, 0L, 5, 4)
// consume from the beginning of the topic to read data from local and remote storage for partition 1
.expectFetchFromTieredStorage(broker1, topicA, p1, 4)
.consume(topicA, p1, 0L, 5, 4);
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
public final class PartitionsExpandTest extends TieredStorageTestHarness {
@Override
public int brokerCount() {
return 2;
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker0 = 0;
final Integer broker1 = 1;
final String topicA = "topicA";
final Integer p0 = 0;
final Integer p1 = 1;
final Integer p2 = 2;
final Integer partitionCount = 1;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final List<Integer> p0Assignment = Arrays.asList(broker0, broker1);
final List<Integer> p1Assignment = Arrays.asList(broker0, broker1);
final List<Integer> p2Assignment = Arrays.asList(broker1, broker0);
builder
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
Collections.singletonMap(p0, p0Assignment), enableRemoteLogStorage)
// produce events to partition 0
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// expand the topicA partition-count to 3
.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), mkEntry(p2, p2Assignment)))
// consume from the beginning of the topic to read data from local and remote storage for partition 0
.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
.consume(topicA, p0, 0L, 3, 2)
.expectLeader(topicA, p1, broker0, false)
.expectLeader(topicA, p2, broker1, false)
// produce events to partition 1
.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// produce events to partition 2
.expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L)
.produce(topicA, p2, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// produce some more events to partition 0 and expect the segments to be offloaded
// NOTE: Support needs to be added to capture the offloaded segment event for already sent message (k2, v2)
// .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new KeyValueSpec("k4", "v4"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 5L)
.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"),
new KeyValueSpec("k5", "v5"))
// consume from the beginning of the topic to read data from local and remote storage for partition 0
.expectFetchFromTieredStorage(broker0, topicA, p0, 5)
.consume(topicA, p0, 0L, 6, 5)
// consume from the beginning of the topic to read data from local and remote storage for partition 1
.expectFetchFromTieredStorage(broker0, topicA, p1, 2)
.consume(topicA, p1, 0L, 3, 2)
// consume from the middle of the topic for partition 2
.expectFetchFromTieredStorage(broker1, topicA, p2, 1)
.consume(topicA, p2, 1L, 2, 1);
}
}

View File

@ -56,7 +56,7 @@ public final class BrokerLocalStorage {
/** /**
* Wait until the first segment offset in Apache Kafka storage for the given topic-partition is * Wait until the first segment offset in Apache Kafka storage for the given topic-partition is
* equal or greater to the provided offset. * equal to the provided offset.
* This ensures segments can be retrieved from the local tiered storage when expected. * This ensures segments can be retrieved from the local tiered storage when expected.
* *
* @param topicPartition The topic-partition to check. * @param topicPartition The topic-partition to check.