KAFKA-15439: Transactions test with tiered storage (#14347)

This test extends the existing TransactionsTest. It configures the broker and topic with tiered storage and expects at-least one log segment to be uploaded to the remote storage.

Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>,  Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Kamal Chandraprakash 2023-09-14 07:22:13 +05:30 committed by GitHub
parent 5aecd28256
commit dacb3b31d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 377 additions and 204 deletions

View File

@ -35,8 +35,9 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn import scala.annotation.nowarn
import scala.collection.Seq
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer import scala.collection.mutable.{Buffer, ListBuffer}
import scala.concurrent.ExecutionException import scala.concurrent.ExecutionException
class TransactionsTest extends IntegrationTestHarness { class TransactionsTest extends IntegrationTestHarness {
@ -54,26 +55,43 @@ class TransactionsTest extends IntegrationTestHarness {
val transactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]() val transactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]()
val nonTransactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]() val nonTransactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]()
serverConfig.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) def overridingProps(): Properties = {
// Set a smaller value for the number of partitions for the __consumer_offsets topic val props = new Properties()
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long props.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
serverConfig.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
serverConfig.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString) props.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverConfig.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString) props.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
serverConfig.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString) props.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
serverConfig.put(KafkaConfig.ControlledShutdownEnableProp, true.toString) props.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
serverConfig.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString) props.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) props.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
serverConfig.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") props.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
serverConfig.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200") props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
props.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
props
}
override protected def modifyConfigs(props: Seq[Properties]): Unit = {
props.foreach(p => p.putAll(overridingProps()))
}
override protected def kraftControllerConfigs(): Seq[Properties] = {
Seq(overridingProps())
}
def topicConfig(): Properties = {
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
topicConfig
}
@BeforeEach @BeforeEach
override def setUp(testInfo: TestInfo): Unit = { override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo) super.setUp(testInfo)
val topicConfig = new Properties() createTopic(topic1, numPartitions, brokerCount, topicConfig())
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) createTopic(topic2, numPartitions, brokerCount, topicConfig())
createTopic(topic1, numPartitions, brokerCount, topicConfig)
createTopic(topic2, numPartitions, brokerCount, topicConfig)
for (_ <- 0 until transactionalProducerCount) for (_ <- 0 until transactionalProducerCount)
createTransactionalProducer("transactional-producer") createTransactionalProducer("transactional-producer")
@ -97,20 +115,25 @@ class TransactionsTest extends IntegrationTestHarness {
val producer = transactionalProducers.head val producer = transactionalProducers.head
val consumer = transactionalConsumers.head val consumer = transactionalConsumers.head
val unCommittedConsumer = nonTransactionalConsumers.head val unCommittedConsumer = nonTransactionalConsumers.head
val tp11 = new TopicPartition(topic1, 1)
val tp22 = new TopicPartition(topic2, 2)
producer.initTransactions() producer.initTransactions()
producer.beginTransaction() producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = false)) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "2", "2", willBeCommitted = false))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "4", "4", willBeCommitted = false)) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false))
producer.flush() producer.flush()
producer.abortTransaction() producer.abortTransaction()
producer.beginTransaction() producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = true)) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "1", "1", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = true)) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "3", "3", willBeCommitted = true))
producer.commitTransaction() producer.commitTransaction()
maybeWaitForAtLeastOneSegmentUpload(tp11, tp22)
consumer.subscribe(List(topic1, topic2).asJava) consumer.subscribe(List(topic1, topic2).asJava)
unCommittedConsumer.subscribe(List(topic1, topic2).asJava) unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
@ -199,6 +222,7 @@ class TransactionsTest extends IntegrationTestHarness {
def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = { def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
val producer1 = transactionalProducers.head val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other") val producer2 = createTransactionalProducer("other")
val tp10 = new TopicPartition(topic1, 0)
producer1.initTransactions() producer1.initTransactions()
producer2.initTransactions() producer2.initTransactions()
@ -218,13 +242,15 @@ class TransactionsTest extends IntegrationTestHarness {
producer1.abortTransaction() producer1.abortTransaction()
producer2.commitTransaction() producer2.commitTransaction()
maybeWaitForAtLeastOneSegmentUpload(tp10)
// ensure that the consumer's fetch will sit in purgatory // ensure that the consumer's fetch will sit in purgatory
val consumerProps = new Properties() val consumerProps = new Properties()
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000") consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100") consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100")
val readCommittedConsumer = createReadCommittedConsumer(props = consumerProps) val readCommittedConsumer = createReadCommittedConsumer(props = consumerProps)
readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0)).asJava) readCommittedConsumer.assign(Set(tp10).asJava)
val records = consumeRecords(readCommittedConsumer, numRecords = 2) val records = consumeRecords(readCommittedConsumer, numRecords = 2)
assertEquals(2, records.size) assertEquals(2, records.size)
@ -309,6 +335,12 @@ class TransactionsTest extends IntegrationTestHarness {
consumer.close() consumer.close()
} }
val partitions = ListBuffer.empty[TopicPartition]
for (partition <- 0 until numPartitions) {
partitions += new TopicPartition(topic2, partition)
}
maybeWaitForAtLeastOneSegmentUpload(partitions.toSeq: _*)
// In spite of random aborts, we should still have exactly 500 messages in topic2. I.e. we should not // In spite of random aborts, we should still have exactly 500 messages in topic2. I.e. we should not
// re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally. // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
val verifyingConsumer = transactionalConsumers(0) val verifyingConsumer = transactionalConsumers(0)
@ -592,10 +624,8 @@ class TransactionsTest extends IntegrationTestHarness {
val unCommittedConsumer = nonTransactionalConsumers.head val unCommittedConsumer = nonTransactionalConsumers.head
val topicWith10Partitions = "largeTopic" val topicWith10Partitions = "largeTopic"
val topicWith10PartitionsAndOneReplica = "largeTopicOneReplica" val topicWith10PartitionsAndOneReplica = "largeTopicOneReplica"
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
createTopic(topicWith10Partitions, 10, brokerCount, topicConfig) createTopic(topicWith10Partitions, 10, brokerCount, topicConfig())
createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties()) createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties())
firstProducer.initTransactions() firstProducer.initTransactions()
@ -802,4 +832,7 @@ class TransactionsTest extends IntegrationTestHarness {
transactionalProducers += producer transactionalProducers += producer
producer producer
} }
def maybeWaitForAtLeastOneSegmentUpload(topicPartitions: TopicPartition*): Unit = {
}
} }

View File

@ -16,26 +16,25 @@
*/ */
package org.apache.kafka.tiered.storage; package org.apache.kafka.tiered.storage;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import kafka.api.IntegrationTestHarness; import kafka.api.IntegrationTestHarness;
import kafka.log.remote.RemoteLogManager; import kafka.log.remote.RemoteLogManager;
import kafka.server.KafkaBroker; import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import org.apache.kafka.common.replica.ReplicaSelector; import org.apache.kafka.common.replica.ReplicaSelector;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.Seq;
import java.util.ArrayList; import java.util.ArrayList;
@ -44,22 +43,10 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.collection.JavaConverters; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
/** /**
* Base class for integration tests exercising the tiered storage functionality in Apache Kafka. * Base class for integration tests exercising the tiered storage functionality in Apache Kafka.
@ -69,16 +56,6 @@ import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STOR
@Tag("integration") @Tag("integration")
public abstract class TieredStorageTestHarness extends IntegrationTestHarness { public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
/**
* InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka.
* Hence, we need to wait at least that amount of time before segments eligible for deletion
* gets physically removed.
*/
private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
// The default value of log cleanup interval is 30 secs, and it increases the test execution time.
private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
private static final Integer RLM_TASK_INTERVAL_MS = 500;
private TieredStorageTestContext context; private TieredStorageTestContext context;
private String testClassName = ""; private String testClassName = "";
private String storageDirPath = ""; private String storageDirPath = "";
@ -102,52 +79,10 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
} }
public Properties overridingProps() { public Properties overridingProps() {
Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS), Properties overridingProps = createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(),
"STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS"); numRemoteLogMetadataPartitions(), new Properties());
Properties overridingProps = new Properties();
// Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background
// activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test.
//
// The replication factor of the remote log metadata topic needs to be chosen so that in resiliency
// tests, metadata can survive the loss of one replica for its topic-partitions.
//
// The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred
// data files on the local file system.
overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
TopicBasedRemoteLogMetadataManager.class.getName());
overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(""));
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(""));
overridingProps.setProperty(
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
String.valueOf(numRemoteLogMetadataPartitions()));
overridingProps.setProperty(
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
String.valueOf(brokerCount()));
// This configuration ensures inactive log segments are deleted fast enough so that
// the integration tests can confirm a given log segment is present only in the second-tier storage.
// Note that this does not impact the eligibility of a log segment to be offloaded to the
// second-tier storage.
overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), LOG_CLEANUP_INTERVAL_MS.toString());
// This can be customized to read remote log segments from followers.
readReplicaSelectorClass() readReplicaSelectorClass()
.ifPresent(c -> overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName())); .ifPresent(c -> overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName()));
// The directory of the second-tier storage needs to be constant across all instances of storage managers
// in every broker and throughout the test. Indeed, as brokers are restarted during the test.
// You can override this property with a fixed path of your choice if you wish to use a non-temporary
// directory to access its content after a test terminated.
overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), storageDirPath);
// This configuration will remove all the remote files when close is called in remote storage manager.
// Storage manager close is being called while the server is actively processing the socket requests,
// so enabling this config can break the existing tests.
// NOTE: When using TestUtils#tempDir(), the folder gets deleted when VM terminates.
overridingProps.setProperty(storageConfigPrefix(DELETE_ON_CLOSE_CONFIG), "false");
return overridingProps; return overridingProps;
} }
@ -193,14 +128,6 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
} }
} }
private String storageConfigPrefix(String key) {
return "rsm.config." + testClassName + "." + key;
}
private String metadataConfigPrefix(String key) {
return "rlmm.config." + testClassName + "." + key;
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public static List<LocalTieredStorage> remoteStorageManagers(Seq<KafkaBroker> brokers) { public static List<LocalTieredStorage> remoteStorageManagers(Seq<KafkaBroker> brokers) {
List<LocalTieredStorage> storages = new ArrayList<>(); List<LocalTieredStorage> storages = new ArrayList<>();

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.tieredStorageRecords; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo; import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -19,11 +19,13 @@ package org.apache.kafka.tiered.storage.actions;
import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.TopicSpec; import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.common.config.TopicConfig;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
public final class CreateTopicAction implements TieredStorageTestAction { public final class CreateTopicAction implements TieredStorageTestAction {
private final TopicSpec spec; private final TopicSpec spec;
@ -34,23 +36,13 @@ public final class CreateTopicAction implements TieredStorageTestAction {
@Override @Override
public void doExecute(TieredStorageTestContext context) throws ExecutionException, InterruptedException { public void doExecute(TieredStorageTestContext context) throws ExecutionException, InterruptedException {
// Ensure offset and time indexes are generated for every record. boolean enableRemoteStorage = true;
spec.getProperties().put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1"); Map<String, String> topicConfigs = createTopicConfigForRemoteStorage(
// Leverage the use of the segment index size to create a log-segment accepting one and only one record. enableRemoteStorage, spec.getMaxBatchCountPerSegment());
// The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the topicConfigs.putAll(spec.getProperties());
// time index. Hence, since the topic is configured to generate index entries for every record with, for
// a "small" number of records (i.e. such that the average record size times the number of records is spec.getProperties().clear();
// much less than the segment size), the number of records which hold in a segment is the multiple of 12 spec.getProperties().putAll(topicConfigs);
// defined below.
if (spec.getMaxBatchCountPerSegment() != -1) {
spec.getProperties().put(
TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * spec.getMaxBatchCountPerSegment()));
}
// To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
// want to delete log segments as soon as possible. When tiered storage is active, an inactive log
// segment is not eligible for deletion until it has been offloaded, which guarantees all segments
// should be offloaded before deletion, and their consumption is possible thereafter.
spec.getProperties().put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
context.createTopic(spec); context.createTopic(spec);
} }

View File

@ -25,7 +25,7 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream; import java.io.PrintStream;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ExpectBrokerInISRAction implements TieredStorageTestAction { public final class ExpectBrokerInISRAction implements TieredStorageTestAction {

View File

@ -36,7 +36,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ExpectLeaderAction implements TieredStorageTestAction { public final class ExpectLeaderAction implements TieredStorageTestAction {

View File

@ -27,7 +27,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
public final class ExpectTopicIdToMatchInRemoteStorageAction implements TieredStorageTestAction { public final class ExpectTopicIdToMatchInRemoteStorageAction implements TieredStorageTestAction {

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopics; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopics;
public final class ExpectUserTopicMappedToMetadataPartitionsAction implements TieredStorageTestAction { public final class ExpectUserTopicMappedToMetadataPartitionsAction implements TieredStorageTestAction {

View File

@ -39,7 +39,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.tieredStorageRecords; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo; import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -33,7 +33,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ReassignReplicaAction implements TieredStorageTestAction { public final class ReassignReplicaAction implements TieredStorageTestAction {

View File

@ -33,7 +33,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ShrinkReplicaAction implements TieredStorageTestAction { public final class ShrinkReplicaAction implements TieredStorageTestAction {

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import kafka.api.TransactionsTest;
import kafka.server.HostedPartition;
import kafka.server.KafkaBroker;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import scala.collection.JavaConverters;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
public class TransactionsWithTieredStoreTest extends TransactionsTest {
private String testClassName;
private String storageDirPath;
@BeforeEach
@Override
public void setUp(TestInfo testInfo) {
testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath();
super.setUp(testInfo);
}
@Override
public Properties overridingProps() {
Properties props = super.overridingProps();
int numRemoteLogMetadataPartitions = 3;
return createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(),
numRemoteLogMetadataPartitions, props);
}
@Override
public Properties topicConfig() {
boolean enableRemoteStorage = true;
int maxBatchCountPerSegment = 1;
Properties overridingTopicProps = super.topicConfig();
overridingTopicProps.putAll(createTopicConfigForRemoteStorage(
enableRemoteStorage, maxBatchCountPerSegment));
return overridingTopicProps;
}
@SuppressWarnings("deprecation")
public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.immutable.Seq<TopicPartition> topicPartitions) {
JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> {
List<BrokerLocalStorage> localStorages = JavaConverters.bufferAsJavaList(brokers()).stream()
.map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
.collect(Collectors.toList());
localStorages
.stream()
// Select brokers which are assigned a replica of the topic-partition
.filter(s -> isAssignedReplica(topicPartition, s.getBrokerId()))
// Filter out inactive brokers, which may still contain log segments we would expect
// to be deleted based on the retention configuration.
.filter(s -> isAlive(s.getBrokerId()))
.forEach(localStorage ->
// Wait until the brokers local storage have been cleared from the inactive log segments.
localStorage.waitForAtLeastEarliestLocalOffset(topicPartition, 1L));
});
}
@SuppressWarnings("deprecation")
private boolean isAssignedReplica(TopicPartition topicPartition,
Integer replicaId) {
Optional<KafkaBroker> brokerOpt = JavaConverters.seqAsJavaList(brokers())
.stream()
.filter(b -> b.config().brokerId() == replicaId).findFirst();
boolean isAssigned = false;
if (brokerOpt.isPresent()) {
HostedPartition hostedPartition = brokerOpt.get().replicaManager().getPartition(topicPartition);
if (hostedPartition instanceof HostedPartition.Online) {
isAssigned = true;
}
}
return isAssigned;
}
private boolean isAlive(Integer brokerId) {
return aliveBrokers().exists(b -> b.config().brokerId() == brokerId);
}
}

View File

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.utils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class ActionUtils {
public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
throws ExecutionException, InterruptedException {
return describeTopics(context, Collections.singletonList(topic)).get(topic);
}
public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
List<String> topics)
throws ExecutionException, InterruptedException {
return context.admin()
.describeTopics(topics)
.allTopicNames()
.get();
}
/**
* Get the records found in the local tiered storage.
* Snapshot does not sort the filesets by base offset.
* @param context The test context.
* @param topicPartition The topic-partition of the records.
* @return The records found in the local tiered storage.
*/
public static List<Record> tieredStorageRecords(TieredStorageTestContext context,
TopicPartition topicPartition) {
return context.takeTieredStorageSnapshot()
.getFilesets(topicPartition)
.stream()
.map(fileset -> {
try {
return fileset.getRecords();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.sorted(Comparator.comparingLong(records -> records.get(0).offset()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.utils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.junit.jupiter.api.Assertions;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
public class TieredStorageTestUtils {
/**
* InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka.
* Hence, we need to wait at least that amount of time before segments eligible for deletion
* gets physically removed.
*/
public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
// The default value of log cleanup interval is 30 secs, and it increases the test execution time.
private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
private static final Integer RLM_TASK_INTERVAL_MS = 500;
private static final Integer RLMM_INIT_RETRY_INTERVAL_MS = 300;
public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
throws ExecutionException, InterruptedException {
return describeTopics(context, Collections.singletonList(topic)).get(topic);
}
public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
List<String> topics)
throws ExecutionException, InterruptedException {
return context.admin()
.describeTopics(topics)
.allTopicNames()
.get();
}
/**
* Get the records found in the local tiered storage.
* Snapshot does not sort the filesets by base offset.
* @param context The test context.
* @param topicPartition The topic-partition of the records.
* @return The records found in the local tiered storage.
*/
public static List<Record> tieredStorageRecords(TieredStorageTestContext context,
TopicPartition topicPartition) {
return context.takeTieredStorageSnapshot()
.getFilesets(topicPartition)
.stream()
.map(fileset -> {
try {
return fileset.getRecords();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.sorted(Comparator.comparingLong(records -> records.get(0).offset()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
public static Properties createPropsForRemoteStorage(String testClassName,
String storageDirPath,
int brokerCount,
int numRemoteLogMetadataPartitions,
Properties overridingProps) {
Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
"STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS");
// Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background
// activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test.
//
// The replication factor of the remote log metadata topic needs to be chosen so that in resiliency
// tests, metadata can survive the loss of one replica for its topic-partitions.
//
// The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred
// data files on the local file system.
overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
TopicBasedRemoteLogMetadataManager.class.getName());
overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(testClassName, ""));
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(testClassName, ""));
overridingProps.setProperty(
metadataConfigPrefix(testClassName, TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
String.valueOf(numRemoteLogMetadataPartitions));
overridingProps.setProperty(
metadataConfigPrefix(testClassName, TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
String.valueOf(brokerCount));
// This configuration ensures inactive log segments are deleted fast enough so that
// the integration tests can confirm a given log segment is present only in the second-tier storage.
// Note that this does not impact the eligibility of a log segment to be offloaded to the
// second-tier storage.
overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), LOG_CLEANUP_INTERVAL_MS.toString());
// The directory of the second-tier storage needs to be constant across all instances of storage managers
// in every broker and throughout the test. Indeed, as brokers are restarted during the test.
// You can override this property with a fixed path of your choice if you wish to use a non-temporary
// directory to access its content after a test terminated.
overridingProps.setProperty(storageConfigPrefix(testClassName, STORAGE_DIR_CONFIG), storageDirPath);
// This configuration will remove all the remote files when close is called in remote storage manager.
// Storage manager close is being called while the server is actively processing the socket requests,
// so enabling this config can break the existing tests.
// NOTE: When using TestUtils#tempDir(), the folder gets deleted when VM terminates.
overridingProps.setProperty(storageConfigPrefix(testClassName, DELETE_ON_CLOSE_CONFIG), "false");
// Set a small number of retry interval for retrying RemoteLogMetadataManager resources initialization to speed up the test
overridingProps.setProperty(metadataConfigPrefix(testClassName, REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP), RLMM_INIT_RETRY_INTERVAL_MS.toString());
return overridingProps;
}
public static Map<String, String> createTopicConfigForRemoteStorage(boolean enableRemoteStorage,
int maxRecordBatchPerSegment) {
Map<String, String> topicProps = new HashMap<>();
// Enables remote log storage for this topic.
topicProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(enableRemoteStorage));
// Ensure offset and time indexes are generated for every record.
topicProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
// Leverage the use of the segment index size to create a log-segment accepting one and only one record.
// The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the
// time index. Hence, since the topic is configured to generate index entries for every record with, for
// a "small" number of records (i.e. such that the average record size times the number of records is
// much less than the segment size), the number of records which hold in a segment is the multiple of 12
// defined below.
topicProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * maxRecordBatchPerSegment));
// To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
// want to delete log segments as soon as possible. When tiered storage is active, an inactive log
// segment is not eligible for deletion until it has been offloaded, which guarantees all segments
// should be offloaded before deletion, and their consumption is possible thereafter.
topicProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
return topicProps;
}
private static String storageConfigPrefix(String testClassName, String key) {
return "rsm.config." + testClassName + "." + key;
}
private static String metadataConfigPrefix(String testClassName, String key) {
return "rlmm.config." + testClassName + "." + key;
}
}