diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java index 14ec707a2eb..186cbb17c56 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java @@ -27,9 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeoutException; @@ -41,8 +39,6 @@ import java.util.concurrent.TimeoutException; */ public class ConsumerManager implements Closeable { - public static final String COMMITTED_OFFSETS_FILE_NAME = "_rlmm_committed_offsets"; - private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class); private static final long CONSUME_RECHECK_INTERVAL_MS = 50L; @@ -60,15 +56,13 @@ public class ConsumerManager implements Closeable { //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler. KafkaConsumer consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties()); - Path committedOffsetsPath = new File(rlmmConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath(); consumerTask = new ConsumerTask( - consumer, - rlmmConfig.remoteLogMetadataTopicName(), - remotePartitionMetadataEventHandler, - topicPartitioner, - committedOffsetsPath, - time, - 60_000L + remotePartitionMetadataEventHandler, + topicPartitioner, + consumer, + 100L, + 300_000L, + time ); consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask); } @@ -110,7 +104,7 @@ public class ConsumerManager implements Closeable { log.info("Waiting until consumer is caught up with the target partition: [{}]", partition); // If the current assignment does not have the subscription for this partition then return immediately. - if (!consumerTask.isPartitionAssigned(partition)) { + if (!consumerTask.isMetadataPartitionAssigned(partition)) { throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " + "Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned()); } @@ -119,17 +113,17 @@ public class ConsumerManager implements Closeable { long startTimeMs = time.milliseconds(); while (true) { log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset); - long receivedOffset = consumerTask.receivedOffsetForPartition(partition).orElse(-1L); - if (receivedOffset >= offset) { + long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L); + if (readOffset >= offset) { return; } log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again", - offset, partition, receivedOffset, consumeCheckIntervalMs); + offset, partition, readOffset, consumeCheckIntervalMs); if (time.milliseconds() - startTimeMs > timeoutMs) { log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ", - partition, receivedOffset, offset); + partition, readOffset, offset); throw new TimeoutException("Timed out in catching up with the expected offset by consumer."); } @@ -158,7 +152,7 @@ public class ConsumerManager implements Closeable { consumerTask.removeAssignmentsForPartitions(partitions); } - public Optional receivedOffsetForPartition(int metadataPartition) { - return consumerTask.receivedOffsetForPartition(metadataPartition); + public Optional readOffsetForPartition(int metadataPartition) { + return consumerTask.readOffsetForMetadataPartition(metadataPartition); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java index 2c95bf399a5..b53c4ee3374 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java @@ -16,12 +16,13 @@ */ package org.apache.kafka.server.log.remote.metadata.storage; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; @@ -30,8 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -64,295 +63,271 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; - private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer consumer; - private final String metadataTopicName; + private final Consumer consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; + // The timeout for the consumer to poll records from the remote log metadata topic. + private final long pollTimeoutMs; private final Time time; - // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - - // It indicates whether the consumer needs to assign the partitions or not. This is set when it is - // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + // It indicates whether the ConsumerTask is closed or not. + private volatile boolean isClosed = false; + // It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment + // has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the + // ones it is no longer assigned to. + // The initial value is set to true to wait for partition assignment on the first execution; otherwise thread will + // be busy without actually doing anything + private volatile boolean hasAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set assignedMetaPartitions = Collections.emptySet(); + private volatile Set assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set assignedTopicPartitions = Collections.emptySet(); + private volatile Map assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt; + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map offsetHolderByMetadataPartition = new HashMap<>(); + private boolean hasLastOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; + // The interval between retries to fetch the start and end offsets for the metadata partitions after a failed fetch. + private final long offsetFetchRetryIntervalMs; - public ConsumerTask(KafkaConsumer consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Consumer consumer, + long pollTimeoutMs, + long offsetFetchRetryIntervalMs, + Time time) { + this.consumer = consumer; this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); + this.pollTimeoutMs = pollTimeoutMs; + this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs; this.time = Objects.requireNonNull(time); - this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - - initializeConsumerAssignment(committedOffsetsPath); - } - - private void initializeConsumerAssignment(Path committedOffsetsPath) { - try { - committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - } catch (IOException e) { - throw new KafkaException(e); - } - - Map committedOffsets = Collections.emptyMap(); - try { - // Load committed offset and assign them in the consumer. - committedOffsets = committedOffsetsFile.readEntries(); - } catch (IOException e) { - // Ignore the error and consumer consumes from the earliest offset. - log.error("Encountered error while building committed offsets from the file. " + - "Consumer will consume from the earliest offset for the assigned partitions.", e); - } - - if (!committedOffsets.isEmpty()) { - // Assign topic partitions from the earlier committed offsets file. - Set earlierAssignedPartitions = committedOffsets.keySet(); - assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); - Set metadataTopicPartitions = earlierAssignedPartitions.stream() - .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) - .collect(Collectors.toSet()); - consumer.assign(metadataTopicPartitions); - - // Seek to the committed offsets - for (Map.Entry entry : committedOffsets.entrySet()) { - log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); - partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); - consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); - } - - lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets); - } + this.uninitializedAt = time.milliseconds(); } @Override public void run() { - log.info("Started Consumer task thread."); - lastSyncedTimeMs = time.milliseconds(); - try { - while (!closing) { - maybeWaitForPartitionsAssignment(); + log.info("Starting consumer task thread."); + while (!isClosed) { + try { + if (hasAssignmentChanged) { + maybeWaitForPartitionAssignments(); + } log.trace("Polling consumer to receive remote log metadata topic records"); - ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); + final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); for (ConsumerRecord record : consumerRecords) { processConsumerRecord(record); } - - maybeSyncCommittedDataAndOffsets(false); + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); } - } catch (Exception e) { - log.error("Error occurred in consumer task, close:[{}]", closing, e); - } finally { - maybeSyncCommittedDataAndOffsets(true); - closeConsumer(); - log.info("Exiting from consumer task thread"); } + try { + consumer.close(Duration.ofSeconds(30)); + } catch (final Exception e) { + log.error("Error encountered while closing the consumer", e); + } + log.info("Exited from consumer task thread"); } private void processConsumerRecord(ConsumerRecord record) { - // Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions - // and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed - // partitions. - RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); - synchronized (assignPartitionsLock) { - if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) { - remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); - } else { - log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata); - } - log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); - partitionToConsumedOffsets.put(record.partition(), record.offset()); + final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); + if (shouldProcess(remoteLogMetadata, record.offset())) { + remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); + readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset()); + } else { + log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata); } + log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); + readOffsetsByMetadataPartition.put(record.partition(), record.offset()); } - private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { - // Return immediately if there is no consumption from last time. - boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets); - if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { - log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync); + private boolean shouldProcess(final RemoteLogMetadata metadata, final long recordOffset) { + final TopicIdPartition tpId = metadata.topicIdPartition(); + final Long readOffset = readOffsetsByUserTopicPartition.get(tpId); + return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset); + } + + private void maybeMarkUserPartitionsAsReady() { + if (isAllUserTopicPartitionsInitialized) { return; } - - try { - // Need to take lock on assignPartitionsLock as assignedTopicPartitions might - // get updated by other threads. - synchronized (assignPartitionsLock) { - for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { - int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); - Long offset = partitionToConsumedOffsets.get(metadataPartition); - if (offset != null) { - remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset); + maybeFetchStartAndEndOffsets(); + boolean isAllInitialized = true; + for (final UserTopicIdPartition utp : assignedUserTopicIdPartitions.values()) { + if (utp.isAssigned && !utp.isInitialized) { + final Integer metadataPartition = utp.metadataPartition; + final StartAndEndOffsetHolder holder = offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition)); + // The offset-holder can be null, when the recent assignment wasn't picked up by the consumer. + if (holder != null) { + final Long readOffset = readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L); + // 1) The end-offset was fetched only once during reassignment. The metadata-partition can receive + // new stream of records, so the consumer can read records more than the last-fetched end-offset. + // 2) When the internal topic becomes empty due to breach by size/time/start-offset, then there + // are no records to read. + if (readOffset + 1 >= holder.endOffset || holder.endOffset.equals(holder.startOffset)) { + markInitialized(utp); } else { - log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset", - topicIdPartition, metadataPartition); + log.debug("The user-topic-partition {} could not be marked initialized since the read-offset is {} " + + "but the end-offset is {} for the metadata-partition {}", utp, readOffset, holder.endOffset, + metadataPartition); } - } - - // Write partitionToConsumedOffsets into committed offsets file as we do not want to process them again - // in case of restarts. - committedOffsetsFile.writeEntries(partitionToConsumedOffsets); - lastSyncedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets); - } - - lastSyncedTimeMs = time.milliseconds(); - } catch (IOException e) { - throw new KafkaException("Error encountered while writing committed offsets to a local file", e); - } - } - - private void closeConsumer() { - log.info("Closing the consumer instance"); - try { - consumer.close(Duration.ofSeconds(30)); - } catch (Exception e) { - log.error("Error encountered while closing the consumer", e); - } - } - - private void maybeWaitForPartitionsAssignment() { - Set assignedMetaPartitionsSnapshot = Collections.emptySet(); - synchronized (assignPartitionsLock) { - // If it is closing, return immediately. This should be inside the assignPartitionsLock as the closing is updated - // in close() method with in the same lock to avoid any race conditions. - if (closing) { - return; - } - - while (assignedMetaPartitions.isEmpty()) { - // If no partitions are assigned, wait until they are assigned. - log.debug("Waiting for assigned remote log metadata partitions.."); - try { - // No timeout is set here, as it is always notified. Even when it is closed, the race can happen - // between the thread calling this method and the thread calling close(). We should have a check - // for closing as that might have been set and notified with assignPartitionsLock by `close` - // method. - assignPartitionsLock.wait(); - - if (closing) { - return; - } - } catch (InterruptedException e) { - throw new KafkaException(e); + } else { + log.debug("The offset-holder is null for the metadata-partition {}. The consumer may not have picked" + + " up the recent assignment", metadataPartition); } } - - if (assignPartitions) { - assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions); - // Removing unassigned meta partitions from partitionToConsumedOffsets and partitionToCommittedOffsets - partitionToConsumedOffsets.entrySet().removeIf(entry -> !assignedMetaPartitions.contains(entry.getKey())); - - assignPartitions = false; - } + isAllInitialized = isAllInitialized && utp.isAssigned && utp.isInitialized; } - - if (!assignedMetaPartitionsSnapshot.isEmpty()) { - executeReassignment(assignedMetaPartitionsSnapshot); + if (isAllInitialized) { + log.info("Initialized for all the {} assigned user-partitions mapped to the {} meta-partitions in {} ms", + assignedUserTopicIdPartitions.size(), assignedMetadataPartitions.size(), + time.milliseconds() - uninitializedAt); } + isAllUserTopicPartitionsInitialized = isAllInitialized; } - private void executeReassignment(Set assignedMetaPartitionsSnapshot) { - Set assignedMetaTopicPartitions = - assignedMetaPartitionsSnapshot.stream() - .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum)) - .collect(Collectors.toSet()); - log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions); - consumer.assign(assignedMetaTopicPartitions); - } - - public void addAssignmentsForPartitions(Set partitions) { - updateAssignmentsForPartitions(partitions, Collections.emptySet()); - } - - public void removeAssignmentsForPartitions(Set partitions) { - updateAssignmentsForPartitions(Collections.emptySet(), partitions); - } - - private void updateAssignmentsForPartitions(Set addedPartitions, - Set removedPartitions) { - log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions); - - Objects.requireNonNull(addedPartitions, "addedPartitions must not be null"); - Objects.requireNonNull(removedPartitions, "removedPartitions must not be null"); - - if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) { - return; - } - + void maybeWaitForPartitionAssignments() throws InterruptedException { + // Snapshots of the metadata-partition and user-topic-partition are used to reduce the scope of the + // synchronization block. + // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the user-topic-partitions from the request + // handler threads. Those threads should not be blocked for a long time, therefore scope of the + // synchronization block is reduced to bare minimum. + // 2) Note that the consumer#position, consumer#seekToBeginning, consumer#seekToEnd and the other consumer APIs + // response times are un-predictable. Those should not be kept in the synchronization block. + final Set metadataPartitionSnapshot = new HashSet<>(); + final Set assignedUserTopicIdPartitionsSnapshot = new HashSet<>(); synchronized (assignPartitionsLock) { - Set updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions); - updatedReassignedPartitions.addAll(addedPartitions); - updatedReassignedPartitions.removeAll(removedPartitions); - Set updatedAssignedMetaPartitions = new HashSet<>(); - for (TopicIdPartition tp : updatedReassignedPartitions) { - updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp)); + while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) { + log.debug("Waiting for remote log metadata partitions to be assigned"); + assignPartitionsLock.wait(); } - - // Clear removed topic partitions from in-memory cache. - for (TopicIdPartition removedPartition : removedPartitions) { - remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition); - } - - assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions); - log.debug("Assigned topic partitions: {}", assignedTopicPartitions); - - if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) { - assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions); - log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions); - - assignPartitions = true; - assignPartitionsLock.notifyAll(); - } else { - log.debug("No change in assigned metadata topic partitions: {}", assignedMetaPartitions); + if (!isClosed && hasAssignmentChanged) { + assignedUserTopicIdPartitions.values().forEach(utp -> { + metadataPartitionSnapshot.add(utp.metadataPartition); + assignedUserTopicIdPartitionsSnapshot.add(utp); + }); + hasAssignmentChanged = false; } } + if (!metadataPartitionSnapshot.isEmpty()) { + final Set remoteLogPartitions = toRemoteLogPartitions(metadataPartitionSnapshot); + consumer.assign(remoteLogPartitions); + this.assignedMetadataPartitions = Collections.unmodifiableSet(metadataPartitionSnapshot); + // for newly assigned user-partitions, read from the beginning of the corresponding metadata partition + final Set seekToBeginOffsetPartitions = assignedUserTopicIdPartitionsSnapshot + .stream() + .filter(utp -> !utp.isAssigned) + .map(utp -> toRemoteLogPartition(utp.metadataPartition)) + .collect(Collectors.toSet()); + consumer.seekToBeginning(seekToBeginOffsetPartitions); + // for other metadata partitions, read from the offset where the processing left last time. + remoteLogPartitions.stream() + .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) && + readOffsetsByMetadataPartition.containsKey(tp.partition())) + .forEach(tp -> consumer.seek(tp, readOffsetsByMetadataPartition.get(tp.partition()))); + Set processedAssignmentPartitions = new HashSet<>(); + // mark all the user-topic-partitions as assigned to the consumer. + assignedUserTopicIdPartitionsSnapshot.forEach(utp -> { + if (!utp.isAssigned) { + // Note that there can be a race between `remove` and `add` partition assignment. Calling the + // `maybeLoadPartition` here again to be sure that the partition gets loaded on the handler. + remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition); + utp.isAssigned = true; + } + processedAssignmentPartitions.add(utp.topicIdPartition); + }); + processedAssignmentOfUserTopicIdPartitions = new HashSet<>(processedAssignmentPartitions); + clearResourcesForUnassignedUserTopicPartitions(processedAssignmentPartitions); + isAllUserTopicPartitionsInitialized = false; + uninitializedAt = time.milliseconds(); + fetchStartAndEndOffsets(); + } } - public Optional receivedOffsetForPartition(int partition) { - return Optional.ofNullable(partitionToConsumedOffsets.get(partition)); + private void clearResourcesForUnassignedUserTopicPartitions(Set assignedPartitions) { + // Note that there can be previously assigned user-topic-partitions where no records are there to read + // (eg) none of the segments for a partition were uploaded. Those partition resources won't be cleared. + // It can be fixed later when required since they are empty resources. + Set unassignedPartitions = readOffsetsByUserTopicPartition.keySet() + .stream() + .filter(e -> !assignedPartitions.contains(e)) + .collect(Collectors.toSet()); + unassignedPartitions.forEach(unassignedPartition -> { + remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition); + readOffsetsByUserTopicPartition.remove(unassignedPartition); + }); + log.info("Unassigned user-topic-partitions: {}", unassignedPartitions.size()); } - public boolean isPartitionAssigned(int partition) { - return assignedMetaPartitions.contains(partition); + public void addAssignmentsForPartitions(final Set partitions) { + updateAssignments(Objects.requireNonNull(partitions), Collections.emptySet()); } - public void close() { - if (!closing) { + public void removeAssignmentsForPartitions(final Set partitions) { + updateAssignments(Collections.emptySet(), Objects.requireNonNull(partitions)); + } + + private void updateAssignments(final Set addedPartitions, + final Set removedPartitions) { + log.info("Updating assignments for partitions added: {} and removed: {}", addedPartitions, removedPartitions); + if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) { synchronized (assignPartitionsLock) { - // Closing should be updated only after acquiring the lock to avoid race in - // maybeWaitForPartitionsAssignment() where it waits on assignPartitionsLock. It should not wait - // if the closing is already set. - closing = true; + // Make a copy of the existing assignments and update the copy. + final Map updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions); + addedPartitions.forEach(tpId -> updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId))); + removedPartitions.forEach(updatedUserPartitions::remove); + if (!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) { + assignedUserTopicIdPartitions = Collections.unmodifiableMap(updatedUserPartitions); + hasAssignmentChanged = true; + log.debug("Assigned user-topic-partitions: {}", assignedUserTopicIdPartitions); + assignPartitionsLock.notifyAll(); + } + } + } + } + + public Optional readOffsetForMetadataPartition(final int partition) { + return Optional.ofNullable(readOffsetsByMetadataPartition.get(partition)); + } + + public boolean isMetadataPartitionAssigned(final int partition) { + return assignedMetadataPartitions.contains(partition); + } + + public boolean isUserPartitionAssigned(final TopicIdPartition partition) { + final UserTopicIdPartition utp = assignedUserTopicIdPartitions.get(partition); + return utp != null && utp.isAssigned; + } + + @Override + public void close() { + if (!isClosed) { + log.info("Closing the instance"); + synchronized (assignPartitionsLock) { + isClosed = true; + assignedUserTopicIdPartitions.values().forEach(this::markInitialized); consumer.wakeup(); assignPartitionsLock.notifyAll(); } @@ -360,6 +335,131 @@ class ConsumerTask implements Runnable, Closeable { } public Set metadataPartitionsAssigned() { - return Collections.unmodifiableSet(assignedMetaPartitions); + return Collections.unmodifiableSet(assignedMetadataPartitions); } -} + + private void fetchStartAndEndOffsets() { + try { + final Set uninitializedPartitions = assignedUserTopicIdPartitions.values().stream() + .filter(utp -> utp.isAssigned && !utp.isInitialized) + .map(utp -> toRemoteLogPartition(utp.metadataPartition)) + .collect(Collectors.toSet()); + // Removing the previous offset holder if it exists. During reassignment, if the list-offset + // call to `earliest` and `latest` offset fails, then we should not use the previous values. + uninitializedPartitions.forEach(tp -> offsetHolderByMetadataPartition.remove(tp)); + if (!uninitializedPartitions.isEmpty()) { + Map endOffsets = consumer.endOffsets(uninitializedPartitions); + Map startOffsets = consumer.beginningOffsets(uninitializedPartitions); + offsetHolderByMetadataPartition = endOffsets.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> new StartAndEndOffsetHolder(startOffsets.get(e.getKey()), e.getValue()))); + + } + hasLastOffsetsFetchFailed = false; + } catch (final RetriableException ex) { + // ignore LEADER_NOT_AVAILABLE error, this can happen when the partition leader is not yet assigned. + hasLastOffsetsFetchFailed = true; + lastFailedFetchOffsetsTimestamp = time.milliseconds(); + } + } + + private void maybeFetchStartAndEndOffsets() { + // If the leader for a `__remote_log_metadata` partition is not available, then the call to `ListOffsets` + // will fail after the default timeout of 1 min. Added a delay between the retries to prevent the thread from + // aggressively fetching the list offsets. During this time, the recently reassigned user-topic-partitions + // won't be marked as initialized. + if (hasLastOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + offsetFetchRetryIntervalMs < time.milliseconds()) { + fetchStartAndEndOffsets(); + } + } + + private UserTopicIdPartition newUserTopicIdPartition(final TopicIdPartition tpId) { + return new UserTopicIdPartition(tpId, topicPartitioner.metadataPartition(tpId)); + } + + private void markInitialized(final UserTopicIdPartition utp) { + // Silently not initialize the utp + if (!utp.isAssigned) { + log.warn("Tried to initialize a UTP: {} that was not yet assigned!", utp); + return; + } + if (!utp.isInitialized) { + remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition); + utp.isInitialized = true; + } + } + + static Set toRemoteLogPartitions(final Set partitions) { + return partitions.stream() + .map(ConsumerTask::toRemoteLogPartition) + .collect(Collectors.toSet()); + } + + static TopicPartition toRemoteLogPartition(int partition) { + return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition); + } + + static class UserTopicIdPartition { + private final TopicIdPartition topicIdPartition; + private final Integer metadataPartition; + // The `utp` will be initialized once it reads all the existing events from the remote log metadata topic. + boolean isInitialized; + // denotes whether this `utp` is assigned to the consumer + boolean isAssigned; + + /** + * UserTopicIdPartition denotes the user topic-partitions for which this broker acts as a leader/follower of. + * + * @param tpId the unique topic partition identifier + * @param metadataPartition the remote log metadata partition mapped for this user-topic-partition. + */ + public UserTopicIdPartition(final TopicIdPartition tpId, final Integer metadataPartition) { + this.topicIdPartition = Objects.requireNonNull(tpId); + this.metadataPartition = Objects.requireNonNull(metadataPartition); + this.isInitialized = false; + this.isAssigned = false; + } + + @Override + public String toString() { + return "UserTopicIdPartition{" + + "topicIdPartition=" + topicIdPartition + + ", metadataPartition=" + metadataPartition + + ", isInitialized=" + isInitialized + + ", isAssigned=" + isAssigned + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UserTopicIdPartition that = (UserTopicIdPartition) o; + return topicIdPartition.equals(that.topicIdPartition) && metadataPartition.equals(that.metadataPartition); + } + + @Override + public int hashCode() { + return Objects.hash(topicIdPartition, metadataPartition); + } + } + + static class StartAndEndOffsetHolder { + Long startOffset; + Long endOffset; + + public StartAndEndOffsetHolder(Long startOffset, Long endOffset) { + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + @Override + public String toString() { + return "StartAndEndOffsetHolder{" + + "startOffset=" + startOffset + + ", endOffset=" + endOffset + + '}'; + } + } +} \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java index 8c89df3df2c..758a024e25c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java @@ -32,6 +32,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; /** * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments @@ -104,6 +105,16 @@ public class RemoteLogMetadataCache { // https://issues.apache.org/jira/browse/KAFKA-12641 protected final ConcurrentMap leaderEpochEntries = new ConcurrentHashMap<>(); + private final CountDownLatch initializedLatch = new CountDownLatch(1); + + public void markInitialized() { + initializedLatch.countDown(); + } + + public boolean isInitialized() { + return initializedLatch.getCount() == 0; + } + /** * Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}. diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java index c92a51ecaca..f4f43b0d883 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java @@ -50,4 +50,9 @@ public abstract class RemotePartitionMetadataEventHandler { public abstract void clearTopicPartition(TopicIdPartition topicIdPartition); + public abstract void markInitialized(TopicIdPartition partition); + + public abstract boolean isInitialized(TopicIdPartition partition); + + public abstract void maybeLoadPartition(TopicIdPartition partition); } \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index 7051d184aad..f9394eee99f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; @@ -151,6 +152,12 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } + if (!remoteLogMetadataCache.isInitialized()) { + // Throwing a retriable ReplicaNotAvailableException here for clients retry. We can introduce a new more + // appropriate exception with a KIP in the future. + throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); + } + return remoteLogMetadataCache; } @@ -180,9 +187,21 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa idToRemoteLogMetadataCache = Collections.emptyMap(); } + @Override public void maybeLoadPartition(TopicIdPartition partition) { idToRemoteLogMetadataCache.computeIfAbsent(partition, topicIdPartition -> new FileBasedRemoteLogMetadataCache(topicIdPartition, partitionLogDirectory(topicIdPartition.topicPartition()))); } + @Override + public void markInitialized(TopicIdPartition partition) { + idToRemoteLogMetadataCache.get(partition).markInitialized(); + log.trace("Remote log components are initialized for user-partition: {}", partition); + } + + @Override + public boolean isInitialized(TopicIdPartition topicIdPartition) { + RemoteLogMetadataCache metadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); + return metadataCache != null && metadataCache.isInitialized(); + } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 4b02b9b6763..e1bf145bbd8 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -84,7 +84,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana private RemotePartitionMetadataStore remotePartitionMetadataStore; private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; - private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner; + private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner; private final Set pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>()); private volatile boolean initializationFailed; @@ -260,12 +260,12 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana } public int metadataPartition(TopicIdPartition topicIdPartition) { - return rlmmTopicPartitioner.metadataPartition(topicIdPartition); + return rlmTopicPartitioner.metadataPartition(topicIdPartition); } // Visible For Testing - public Optional receivedOffsetForPartition(int metadataPartition) { - return consumerManager.receivedOffsetForPartition(metadataPartition); + public Optional readOffsetForPartition(int metadataPartition) { + return consumerManager.readOffsetForPartition(metadataPartition); } @Override @@ -357,7 +357,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana log.info("Started configuring topic-based RLMM with configs: {}", configs); rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); - rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); + rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); remotePartitionMetadataStore = new RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath()); configured = true; log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); @@ -416,8 +416,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana // Create producer and consumer managers. lock.writeLock().lock(); try { - producerManager = new ProducerManager(rlmmConfig, rlmmTopicPartitioner); - consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmmTopicPartitioner, time); + producerManager = new ProducerManager(rlmmConfig, rlmTopicPartitioner); + consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmTopicPartitioner, time); if (startConsumerThread) { consumerManager.startConsumerThread(); } else { @@ -509,10 +509,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana } // Visible for testing. - public void startConsumerThread() { - if (consumerManager != null) { - consumerManager.startConsumerThread(); - } + void setRlmTopicPartitioner(RemoteLogMetadataTopicPartitioner rlmTopicPartitioner) { + this.rlmTopicPartitioner = Objects.requireNonNull(rlmTopicPartitioner); } @Override diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java new file mode 100644 index 00000000000..2b36c4bb039 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime()); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception"); + thread.join(10_000); + assertFalse(thread.isAlive(), "Consumer task thread is still alive"); + } + } + + /** + * Tests that the consumer task shuts down gracefully when there were no assignments. + */ + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception"); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List idPartitions = getIdPartitions("sample", 3); + final Map endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + idPartition + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List allPartitions = getIdPartitions("sample", 3); + final Map endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " to be assigned"); + addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(), + "Couldn't read record"); + + final Set removePartitions = Collections.singleton(tpId); + consumerTask.removeAssignmentsForPartitions(removePartitions); + for (final TopicIdPartition idPartition : allPartitions) { + final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition); + TestUtils.waitForCondition(condition, "Timed out waiting for " + idPartition + " to be removed"); + } + for (TopicIdPartition removePartition : removePartitions) { + TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition), + "Timed out waiting for " + removePartition + " to be cleared"); + } + } + + @Test + public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException { + final List allPartitions = getIdPartitions("sample", 100); + final Map endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + + final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + Thread assignor = new Thread(() -> { + int partitionsAssigned = 0; + for (TopicIdPartition partition : allPartitions) { + if (partitionsAssigned == 50) { + // Once half the topic partitions are assigned, wait for the consumer to catch up. This ensures + // that the consumer is already running when the rest of the partitions are assigned. + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + consumerTask.addAssignmentsForPartitions(Collections.singleton(partition)); + partitionsAssigned++; + } + isAllPartitionsAssigned.set(true); + }); + Runnable consumerRunnable = () -> { + try { + while (!isAllPartitionsAssigned.get()) { + consumerTask.maybeWaitForPartitionAssignments(); + latch.countDown(); + } + } catch (Exception e) { + fail(e.getMessage()); + } + }; + + ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(); + Future future = consumerExecutor.submit(consumerRunnable); + assignor.start(); + + assignor.join(); + future.get(); + } + + @Test + public void testCanProcessRecord() throws InterruptedException { + final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); + final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); + final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new TopicPartition("sample", 2)); + assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1)); + assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId2)); + + final int metadataPartition = partitioner.metadataPartition(tpId0); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L)); + final Set assignments = Collections.singleton(tpId0); + consumerTask.addAssignmentsForPartitions(assignments); + thread.start(); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned"); + + addRecord(consumer, metadataPartition, tpId0, 0); + addRecord(consumer, metadataPartition, tpId0, 1); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + assertEquals(2, handler.metadataCounter); + + // should only read the tpId1 records + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1)); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned"); + addRecord(consumer, metadataPartition, tpId1, 2); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)), "Couldn't read record"); + assertEquals(3, handler.metadataCounter); + + // shouldn't read tpId2 records because it's not assigned + addRecord(consumer, metadataPartition, tpId2, 3); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), "Couldn't read record"); + assertEquals(3, handler.metadataCounter); + } + + @Test + public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException { + final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0); + final int metadataPartition = partitioner.metadataPartition(tpId); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 2L)); + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); + thread.start(); + + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); + assertFalse(handler.isPartitionInitialized.containsKey(tpId)); + IntStream.range(0, 5).forEach(offset -> addRecord(consumer, metadataPartition, tpId, offset)); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), "Couldn't read record"); + assertTrue(handler.isPartitionInitialized.get(tpId)); + } + + @ParameterizedTest + @CsvSource(value = {"0, 0", "500, 500"}) + public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, + long endOffset) throws InterruptedException { + final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); + final int metadataPartition = partitioner.metadataPartition(tpId); + consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), beginOffset)); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), endOffset)); + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); + thread.start(); + + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); + TestUtils.waitForCondition(() -> handler.isPartitionInitialized.containsKey(tpId), + "should have initialized the partition"); + assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent()); + } + + @Test + public void testConcurrentAccess() throws InterruptedException { + thread.start(); + final CountDownLatch latch = new CountDownLatch(1); + final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)), 0L)); + final Thread assignmentThread = new Thread(() -> { + try { + latch.await(); + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); + } catch (final InterruptedException e) { + fail("Shouldn't have thrown an exception"); + } + }); + final Thread closeThread = new Thread(() -> { + try { + latch.await(); + consumerTask.close(); + } catch (final InterruptedException e) { + fail("Shouldn't have thrown an exception"); + } + }); + assignmentThread.start(); + closeThread.start(); + + latch.countDown(); + assignmentThread.join(); + closeThread.join(); + } + + @Test + public void testConsumerShouldNotCloseOnRetriableError() throws InterruptedException { + final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); + final int metadataPartition = partitioner.metadataPartition(tpId); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L)); + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); + thread.start(); + + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); + + consumer.setPollException(new LeaderNotAvailableException("leader not available!")); + addRecord(consumer, metadataPartition, tpId, 0); + consumer.setPollException(new TimeoutException("Not able to complete the operation within the timeout")); + addRecord(consumer, metadataPartition, tpId, 1); + + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + assertEquals(2, handler.metadataCounter); + } + + @Test + public void testConsumerShouldCloseOnNonRetriableError() throws InterruptedException { + final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); + final int metadataPartition = partitioner.metadataPartition(tpId); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L)); + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); + thread.start(); + + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); + + consumer.setPollException(new AuthorizationException("Unauthorized to read the topic!")); + TestUtils.waitForCondition(() -> consumer.closed(), "Should close the consume on non-retriable error"); + } + + private void addRecord(final MockConsumer consumer, + final int metadataPartition, + final TopicIdPartition idPartition, + final long recordOffset) { + final RemoteLogSegmentId segmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid()); + final RemoteLogMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L)); + final ConsumerRecord record = new ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME, metadataPartition, recordOffset, null, serde.serialize(metadata)); + consumer.addRecord(record); + } + + private List getIdPartitions(final String topic, final int partitionCount) { + final List idPartitions = new ArrayList<>(); + for (int partition = 0; partition < partitionCount; partition++) { + idPartitions.add(new TopicIdPartition(topicId, new TopicPartition(topic, partition))); + } + return idPartitions; + } + + private static class DummyEventHandler extends RemotePartitionMetadataEventHandler { + private int metadataCounter = 0; + private final Map isPartitionInitialized = new HashMap<>(); + private final Map isPartitionLoaded = new HashMap<>(); + private final Map isPartitionCleared = new HashMap<>(); + + @Override + protected void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { + metadataCounter++; + } + + @Override + protected void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) { + } + + @Override + protected void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) { + } + + @Override + public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, int metadataPartition, Long metadataPartitionOffset) { + } + + @Override + public void clearTopicPartition(TopicIdPartition topicIdPartition) { + isPartitionCleared.put(topicIdPartition, true); + } + + @Override + public void markInitialized(TopicIdPartition partition) { + isPartitionInitialized.put(partition, true); + } + + @Override + public boolean isInitialized(TopicIdPartition partition) { + return true; + } + + @Override + public void maybeLoadPartition(TopicIdPartition partition) { + isPartitionLoaded.put(partition, true); + } + } +} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java index e39d872744a..abad6ea7676 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java @@ -63,11 +63,12 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa // Call setup to start the cluster. super.setUp(new EmptyTestInfo()); - initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread); + initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null); } public void initializeRemoteLogMetadataManager(Set topicIdPartitions, - boolean startConsumerThread) { + boolean startConsumerThread, + RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) { String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) { @Override @@ -104,6 +105,9 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); topicBasedRemoteLogMetadataManager.configure(configs); + if (remoteLogMetadataTopicPartitioner != null) { + topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner); + } try { waitUntilInitialized(60_000); } catch (TimeoutException e) { @@ -145,4 +149,4 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa public void closeRemoteLogMetadataManager() { Utils.closeQuietly(topicBasedRemoteLogMetadataManager, "TopicBasedRemoteLogMetadataManager"); } -} \ No newline at end of file +} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java new file mode 100644 index 00000000000..3386b94f895 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + + +import kafka.utils.EmptyTestInfo; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutionException; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { + private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class); + + private static final int SEG_SIZE = 1024 * 1024; + + private final Time time = new MockTime(1); + private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); + + private TopicBasedRemoteLogMetadataManager rlmm() { + return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); + } + + @BeforeEach + public void setup() { + // Start the cluster only. + remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo()); + } + + @AfterEach + public void teardown() throws IOException { + remoteLogMetadataManagerHarness.close(); + } + + @Test + public void testMultiplePartitionSubscriptions() throws Exception { + // Create topics. + String leaderTopic = "leader"; + HashMap> assignedLeaderTopicReplicas = new HashMap<>(); + List leaderTopicReplicas = new ArrayList<>(); + // Set broker id 0 as the first entry which is taken as the leader. + leaderTopicReplicas.add(0); + leaderTopicReplicas.add(1); + leaderTopicReplicas.add(2); + assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic, + JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String followerTopic = "follower"; + HashMap> assignedFollowerTopicReplicas = new HashMap<>(); + List followerTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + followerTopicReplicas.add(1); + followerTopicReplicas.add(2); + followerTopicReplicas.add(0); + assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String topicWithNoMessages = "no-messages-topic"; + HashMap> assignedTopicReplicas = new HashMap<>(); + List noMessagesTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + noMessagesTopicReplicas.add(1); + noMessagesTopicReplicas.add(2); + noMessagesTopicReplicas.add(0); + assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); + final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0)); + + RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) { + @Override + public int metadataPartition(TopicIdPartition topicIdPartition) { + // Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user + // partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition. + // That will make the secondary consumer assignment. + if (emptyTopicIdPartition.equals(topicIdPartition)) { + return 1; + } else { + return 0; + } + } + }; + + remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner); + + // Add segments for these partitions but an exception is received as they have not yet been subscribed. + // These messages would have been published to the respective metadata topic partitions but the ConsumerManager + // has not yet been subscribing as they are not yet registered. + RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []", + exception.getMessage()); + + RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); + Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []", + exception.getMessage()); + + // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. + Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(leaderTopicIdPartition)); + Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition)); + + rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), + Collections.emptySet()); + + // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start + // fetching those events and build the cache. + waitUntilConsumerCatchesUp(30_000L); + // leader partitions would have received as it is registered, but follower partition is not yet registered, + // hence it throws an exception. + Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext()); + Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition)); + + // Register follower partition + rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition), + Collections.singleton(followerTopicIdPartition)); + + // In this state, all the metadata should be available in RLMM for both leader and follower partitions. + TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); + TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); + } + + private void waitUntilConsumerCatchesUp(long timeoutMs) throws TimeoutException, InterruptedException { + TestUtils.waitForCondition(() -> { + // If both the leader and follower partitions are mapped to the same metadata partition which is 0, it + // should have at least 2 messages. That means, read offset should be >= 1 (including duplicate messages if any). + return rlmm().readOffsetForPartition(0).orElse(-1L) >= 1; + }, timeoutMs, "Consumer did not catch up"); + } +} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index 714405a30e9..46ffeba2142 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -31,18 +31,14 @@ import org.junit.jupiter.api.Test; import scala.collection.JavaConverters; import scala.collection.Seq; -import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.COMMITTED_OFFSETS_FILE_NAME; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters @@ -69,7 +65,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { } private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) { - remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread); + remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, null); } @AfterEach @@ -136,9 +132,8 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { // Stop TopicBasedRemoteLogMetadataManager only. stopTopicBasedRemoteLogMetadataManagerHarness(); - // Start TopicBasedRemoteLogMetadataManager but do not start consumer thread to check whether the stored metadata is - // loaded successfully or not. - startTopicBasedRemoteLogMetadataManagerHarness(false); + // Start TopicBasedRemoteLogMetadataManager + startTopicBasedRemoteLogMetadataManagerHarness(true); // Register these partitions to RLMM, which loads the respective metadata snapshots. topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition)); @@ -148,29 +143,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition))); Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(), topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition))); - // Check whether the check-pointed consumer offsets are stored or not. - Path committedOffsetsPath = new File(logDir, COMMITTED_OFFSETS_FILE_NAME).toPath(); - Assertions.assertTrue(committedOffsetsPath.toFile().exists()); - CommittedOffsetsFile committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - - int metadataPartition1 = topicBasedRlmm().metadataPartition(leaderTopicIdPartition); - int metadataPartition2 = topicBasedRlmm().metadataPartition(followerTopicIdPartition); - Optional receivedOffsetForPartition1 = topicBasedRlmm().receivedOffsetForPartition(metadataPartition1); - Optional receivedOffsetForPartition2 = topicBasedRlmm().receivedOffsetForPartition(metadataPartition2); - Assertions.assertTrue(receivedOffsetForPartition1.isPresent()); - Assertions.assertTrue(receivedOffsetForPartition2.isPresent()); - - // Make sure these offsets are at least 0. - Assertions.assertTrue(receivedOffsetForPartition1.get() >= 0); - Assertions.assertTrue(receivedOffsetForPartition2.get() >= 0); - - // Check the stored entries and the offsets that were set on consumer are the same. - Map partitionToOffset = committedOffsetsFile.readEntries(); - Assertions.assertEquals(partitionToOffset.get(metadataPartition1), receivedOffsetForPartition1.get()); - Assertions.assertEquals(partitionToOffset.get(metadataPartition2), receivedOffsetForPartition2.get()); - - // Start Consumer thread - topicBasedRlmm().startConsumerThread(); // Add one more segment RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index eaf62edfea7..96e48de8a73 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -149,17 +149,17 @@ public class TopicBasedRemoteLogMetadataManagerTest { } // If both the leader and follower partitions are mapped to the same metadata partition then it should have at least - // 2 messages. That means, received offset should be >= 1 (including duplicate messages if any). + // 2 messages. That means, read offset should be >= 1 (including duplicate messages if any). if (leaderMetadataPartition == followerMetadataPartition) { - if (topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) { + if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) { break; } } else { // If the leader partition and the follower partition are mapped to different metadata partitions then - // each of those metadata partitions will have at least 1 message. That means, received offset should + // each of those metadata partitions will have at least 1 message. That means, read offset should // be >= 0 (including duplicate messages if any). - if (topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 || - topicBasedRlmm().receivedOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) { + if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 || + topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) { break; } }