mirror of https://github.com/apache/kafka.git
KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory. (#13535)
This change includes - Recognize the fetch requests with out of range local log offsets - Add fetch implementation for the data lying in the range of [logStartOffset, localLogStartOffset] - Add a new purgatory for async remote read requests which are served through a specific thread pool We have an extended version of remote fetch that can fetch from multiple remote partitions in parallel, which we will raise as a followup PR. A few tests for the newly introduced changes are added in this PR. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes, will add them in followup PRs. Other contributors: Kamal Chandraprakash <kchandraprakash@uber.com> - Further improvements and adding a few tests Luke Chen <showuon@gmail.com> - Added a few test cases for these changes. PS: This functionality is pulled out from internal branches with other functionalities related to the feature in 2.8.x. The reason for not pulling all the changes as it makes the PR huge, and burdensome to review and it also needs other metrics, minor enhancements(including perf), and minor changes done for tests. So, we will try to have followup PRs to cover all those. Reviewers: Jun Rao <junrao@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Divij Vaidya <diviv@amazon.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
This commit is contained in:
parent
d944ef1efb
commit
6f19730164
|
@ -39,6 +39,7 @@
|
|||
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
|
||||
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
|
||||
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
|
||||
<suppress checks="NPathComplexity|ClassFanOutComplexity" files="RemoteLogManager.java"/>
|
||||
<suppress checks="MethodLength"
|
||||
files="(KafkaClusterTestKit).java"/>
|
||||
|
||||
|
|
|
@ -24,10 +24,14 @@ import org.apache.kafka.common.KafkaException;
|
|||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
|
||||
import org.apache.kafka.common.message.FetchResponseData;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.Record;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.common.record.RemoteLogInputStream;
|
||||
import org.apache.kafka.common.requests.FetchRequest;
|
||||
import org.apache.kafka.common.utils.ChildFirstClassLoader;
|
||||
import org.apache.kafka.common.utils.KafkaThread;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
@ -46,7 +50,18 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
|
|||
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
|
||||
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
|
||||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
|
||||
import org.apache.kafka.storage.internals.log.AbortedTxn;
|
||||
import org.apache.kafka.storage.internals.log.EpochEntry;
|
||||
import org.apache.kafka.storage.internals.log.FetchDataInfo;
|
||||
import org.apache.kafka.storage.internals.log.FetchIsolation;
|
||||
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
|
||||
import org.apache.kafka.storage.internals.log.OffsetIndex;
|
||||
import org.apache.kafka.storage.internals.log.OffsetPosition;
|
||||
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
|
||||
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
|
||||
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
|
||||
import org.apache.kafka.storage.internals.log.TransactionIndex;
|
||||
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Option;
|
||||
|
@ -65,6 +80,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
@ -76,6 +92,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
@ -97,6 +114,7 @@ import java.util.stream.Stream;
|
|||
public class RemoteLogManager implements Closeable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
|
||||
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
|
||||
|
||||
private final RemoteLogManagerConfig rlmConfig;
|
||||
private final int brokerId;
|
||||
|
@ -109,7 +127,7 @@ public class RemoteLogManager implements Closeable {
|
|||
private final RemoteLogMetadataManager remoteLogMetadataManager;
|
||||
|
||||
private final RemoteIndexCache indexCache;
|
||||
|
||||
private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
|
||||
private final RLMScheduledThreadPool rlmScheduledThreadPool;
|
||||
|
||||
private final long delayInMs;
|
||||
|
@ -147,6 +165,11 @@ public class RemoteLogManager implements Closeable {
|
|||
indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
|
||||
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
|
||||
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
|
||||
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
|
||||
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
|
||||
rlmConfig.remoteLogReaderThreads(),
|
||||
rlmConfig.remoteLogReaderMaxPendingTasks()
|
||||
);
|
||||
}
|
||||
|
||||
private <T> T createDelegate(ClassLoader classLoader, String className) {
|
||||
|
@ -447,7 +470,7 @@ public class RemoteLogManager implements Closeable {
|
|||
leaderEpoch = -1;
|
||||
}
|
||||
|
||||
private void maybeUpdateReadOffset() throws RemoteStorageException {
|
||||
private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException {
|
||||
if (!copiedOffsetOption.isPresent()) {
|
||||
logger.info("Find the highest remote offset for partition: {} after becoming leader, leaderEpoch: {}", topicIdPartition, leaderEpoch);
|
||||
|
||||
|
@ -455,23 +478,17 @@ public class RemoteLogManager implements Closeable {
|
|||
// of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
|
||||
// previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
|
||||
// epoch cache then it starts copying the segments from the earliest epoch entry's offset.
|
||||
copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition));
|
||||
copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
|
||||
}
|
||||
}
|
||||
|
||||
public void copyLogSegmentsToRemote() throws InterruptedException {
|
||||
public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException {
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
try {
|
||||
maybeUpdateReadOffset();
|
||||
maybeUpdateReadOffset(log);
|
||||
long copiedOffset = copiedOffsetOption.getAsLong();
|
||||
Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition());
|
||||
if (!maybeLog.isPresent()) {
|
||||
return;
|
||||
}
|
||||
|
||||
UnifiedLog log = maybeLog.get();
|
||||
|
||||
// LSO indicates the offset below are ready to be consumed (high-watermark or committed)
|
||||
long lso = log.lastStableOffset();
|
||||
|
@ -578,9 +595,15 @@ public class RemoteLogManager implements Closeable {
|
|||
return;
|
||||
|
||||
try {
|
||||
Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition());
|
||||
|
||||
if (!unifiedLogOptional.isPresent()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isLeader()) {
|
||||
// Copy log segments to remote storage
|
||||
copyLogSegmentsToRemote();
|
||||
copyLogSegmentsToRemote(unifiedLogOptional.get());
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
if (!isCancelled()) {
|
||||
|
@ -600,11 +623,187 @@ public class RemoteLogManager implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException {
|
||||
public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
|
||||
int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
|
||||
TopicPartition tp = remoteStorageFetchInfo.topicPartition;
|
||||
FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
|
||||
|
||||
boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
|
||||
|
||||
long offset = fetchInfo.fetchOffset;
|
||||
int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
|
||||
|
||||
Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
|
||||
OptionalInt epoch = OptionalInt.empty();
|
||||
|
||||
if (logOptional.isPresent()) {
|
||||
Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
|
||||
if (leaderEpochCache.isDefined()) {
|
||||
epoch = leaderEpochCache.get().epochForOffset(offset);
|
||||
}
|
||||
}
|
||||
|
||||
Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent()
|
||||
? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
|
||||
: Optional.empty();
|
||||
|
||||
if (!rlsMetadataOptional.isPresent()) {
|
||||
String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
|
||||
throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch "
|
||||
+ epochStr + " and partition " + tp + " which does not exist in remote tier.");
|
||||
}
|
||||
|
||||
RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get();
|
||||
int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset);
|
||||
InputStream remoteSegInputStream = null;
|
||||
try {
|
||||
// Search forward for the position of the last offset that is greater than or equal to the target offset
|
||||
remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
|
||||
RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
|
||||
|
||||
RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset);
|
||||
|
||||
if (firstBatch == null)
|
||||
return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
|
||||
includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
|
||||
|
||||
int firstBatchSize = firstBatch.sizeInBytes();
|
||||
// An empty record is sent instead of an incomplete batch when
|
||||
// - there is no minimum-one-message constraint and
|
||||
// - the first batch size is more than maximum bytes that can be sent and
|
||||
// - for FetchRequest version 3 or above.
|
||||
if (!remoteStorageFetchInfo.minOneMessage &&
|
||||
!remoteStorageFetchInfo.hardMaxBytesLimit &&
|
||||
firstBatchSize > maxBytes) {
|
||||
return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY);
|
||||
}
|
||||
|
||||
int updatedFetchSize =
|
||||
remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes;
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
|
||||
int remainingBytes = updatedFetchSize;
|
||||
|
||||
firstBatch.writeTo(buffer);
|
||||
remainingBytes -= firstBatchSize;
|
||||
|
||||
if (remainingBytes > 0) {
|
||||
// read the input stream until min of (EOF stream or buffer's remaining capacity).
|
||||
Utils.readFully(remoteSegInputStream, buffer);
|
||||
}
|
||||
buffer.flip();
|
||||
|
||||
FetchDataInfo fetchDataInfo = new FetchDataInfo(
|
||||
new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos),
|
||||
MemoryRecords.readableRecords(buffer));
|
||||
if (includeAbortedTxns) {
|
||||
fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get());
|
||||
}
|
||||
|
||||
return fetchDataInfo;
|
||||
} finally {
|
||||
Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
|
||||
}
|
||||
}
|
||||
|
||||
private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
|
||||
return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
|
||||
}
|
||||
|
||||
private FetchDataInfo addAbortedTransactions(long startOffset,
|
||||
RemoteLogSegmentMetadata segmentMetadata,
|
||||
FetchDataInfo fetchInfo,
|
||||
UnifiedLog log) throws RemoteStorageException {
|
||||
int fetchSize = fetchInfo.records.sizeInBytes();
|
||||
OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
|
||||
fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
|
||||
|
||||
OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex();
|
||||
long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
|
||||
.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
|
||||
|
||||
final Set<FetchResponseData.AbortedTransaction> abortedTransactions = new HashSet<>();
|
||||
|
||||
Consumer<List<AbortedTxn>> accumulator =
|
||||
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
|
||||
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
|
||||
|
||||
collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log);
|
||||
|
||||
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
|
||||
fetchInfo.records,
|
||||
fetchInfo.firstEntryIncomplete,
|
||||
Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions)));
|
||||
}
|
||||
|
||||
private void collectAbortedTransactions(long startOffset,
|
||||
long upperBoundOffset,
|
||||
RemoteLogSegmentMetadata segmentMetadata,
|
||||
Consumer<List<AbortedTxn>> accumulator,
|
||||
UnifiedLog log) throws RemoteStorageException {
|
||||
// Search in remote segments first.
|
||||
Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata);
|
||||
while (nextSegmentMetadataOpt.isPresent()) {
|
||||
Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex());
|
||||
if (txnIndexOpt.isPresent()) {
|
||||
TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
|
||||
accumulator.accept(searchResult.abortedTransactions);
|
||||
if (searchResult.isComplete) {
|
||||
// Return immediately when the search result is complete, it does not need to go through local log segments.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
|
||||
}
|
||||
|
||||
// Search in local segments
|
||||
collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, JavaConverters.asJavaIterator(log.logSegments().iterator()));
|
||||
}
|
||||
|
||||
private void collectAbortedTransactionInLocalSegments(long startOffset,
|
||||
long upperBoundOffset,
|
||||
Consumer<List<AbortedTxn>> accumulator,
|
||||
Iterator<LogSegment> localLogSegments) {
|
||||
while (localLogSegments.hasNext()) {
|
||||
TransactionIndex txnIndex = localLogSegments.next().txnIndex();
|
||||
if (txnIndex != null) {
|
||||
TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
|
||||
accumulator.accept(searchResult.abortedTransactions);
|
||||
if (searchResult.isComplete) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
|
||||
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws RemoteStorageException {
|
||||
if (leaderEpochFileCacheOption.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1;
|
||||
OptionalInt epoch = leaderEpochFileCacheOption.get().epochForOffset(nextSegmentBaseOffset);
|
||||
return epoch.isPresent()
|
||||
? fetchRemoteLogSegmentMetadata(segmentMetadata.topicIdPartition().topicPartition(), epoch.getAsInt(), nextSegmentBaseOffset)
|
||||
: Optional.empty();
|
||||
}
|
||||
|
||||
private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
|
||||
RecordBatch nextBatch = null;
|
||||
// Look for the batch which has the desired offset
|
||||
// We will always have a batch in that segment as it is a non-compacted topic.
|
||||
do {
|
||||
nextBatch = remoteLogInputStream.nextBatch();
|
||||
} while (nextBatch != null && nextBatch.lastOffset() < offset);
|
||||
|
||||
return nextBatch;
|
||||
}
|
||||
|
||||
long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException {
|
||||
Optional<Long> offset = Optional.empty();
|
||||
Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition());
|
||||
if (maybeLog.isPresent()) {
|
||||
UnifiedLog log = maybeLog.get();
|
||||
|
||||
Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache();
|
||||
if (maybeLeaderEpochFileCache.isDefined()) {
|
||||
LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
|
||||
|
@ -614,11 +813,22 @@ public class RemoteLogManager implements Closeable {
|
|||
epoch = cache.previousEpoch(epoch.getAsInt());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return offset.orElse(-1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a remote log read task.
|
||||
*
|
||||
* This method returns immediately. The read operation is executed in a thread pool.
|
||||
* The callback will be called when the task is done.
|
||||
*
|
||||
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full)
|
||||
*/
|
||||
public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
|
||||
return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback));
|
||||
}
|
||||
|
||||
void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
|
||||
Consumer<RLMTask> convertToLeaderOrFollower) {
|
||||
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
|
||||
|
@ -665,17 +875,40 @@ public class RemoteLogManager implements Closeable {
|
|||
Utils.closeQuietly(remoteLogStorageManager, "RemoteLogStorageManager");
|
||||
Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager");
|
||||
Utils.closeQuietly(indexCache, "RemoteIndexCache");
|
||||
try {
|
||||
rlmScheduledThreadPool.shutdown();
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
rlmScheduledThreadPool.close();
|
||||
shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
|
||||
|
||||
leaderOrFollowerTasks.clear();
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void shutdownAndAwaitTermination(ExecutorService pool, String poolName, long timeout, TimeUnit timeUnit) {
|
||||
// This pattern of shutting down thread pool is adopted from here: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
|
||||
LOGGER.info("Shutting down of thread pool {} is started", poolName);
|
||||
pool.shutdown(); // Disable new tasks from being submitted
|
||||
try {
|
||||
// Wait a while for existing tasks to terminate
|
||||
if (!pool.awaitTermination(timeout, timeUnit)) {
|
||||
LOGGER.info("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow.", poolName);
|
||||
pool.shutdownNow(); // Cancel currently executing tasks
|
||||
// Wait a while for tasks to respond to being cancelled
|
||||
if (!pool.awaitTermination(timeout, timeUnit))
|
||||
LOGGER.warn("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow.", poolName);
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
// (Re-)Cancel if current thread also interrupted
|
||||
LOGGER.warn("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow.", poolName);
|
||||
pool.shutdownNow();
|
||||
// Preserve interrupt status
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
|
||||
}
|
||||
|
||||
static class RLMScheduledThreadPool {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
|
||||
|
@ -708,11 +941,8 @@ public class RemoteLogManager implements Closeable {
|
|||
return scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit);
|
||||
}
|
||||
|
||||
public boolean shutdown() throws InterruptedException {
|
||||
LOGGER.info("Shutting down scheduled thread pool");
|
||||
scheduledThreadPool.shutdownNow();
|
||||
//waits for 2 mins to terminate the current tasks
|
||||
return scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES);
|
||||
public void close() {
|
||||
shutdownAndAwaitTermination(scheduledThreadPool, "RLMScheduledThreadPool", 10, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.log.remote;
|
||||
|
||||
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.storage.internals.log.FetchDataInfo;
|
||||
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
|
||||
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class RemoteLogReader implements Callable<Void> {
|
||||
private final Logger logger;
|
||||
private final RemoteStorageFetchInfo fetchInfo;
|
||||
private final RemoteLogManager rlm;
|
||||
private final Consumer<RemoteLogReadResult> callback;
|
||||
|
||||
public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
|
||||
RemoteLogManager rlm,
|
||||
Consumer<RemoteLogReadResult> callback) {
|
||||
this.fetchInfo = fetchInfo;
|
||||
this.rlm = rlm;
|
||||
this.callback = callback;
|
||||
logger = new LogContext() {
|
||||
@Override
|
||||
public String logPrefix() {
|
||||
return "[" + Thread.currentThread().getName() + "]";
|
||||
}
|
||||
}.logger(RemoteLogReader.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() {
|
||||
RemoteLogReadResult result;
|
||||
try {
|
||||
logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
|
||||
|
||||
FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
|
||||
result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
|
||||
} catch (OffsetOutOfRangeException e) {
|
||||
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
|
||||
} catch (Exception e) {
|
||||
logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
|
||||
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
|
||||
}
|
||||
|
||||
logger.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
|
||||
callback.accept(result);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import kafka.server.DelayedElectLeader;
|
|||
import kafka.server.DelayedFetch;
|
||||
import kafka.server.DelayedOperationPurgatory;
|
||||
import kafka.server.DelayedProduce;
|
||||
import kafka.server.DelayedRemoteFetch;
|
||||
import kafka.server.KafkaConfig;
|
||||
import kafka.server.MetadataCache;
|
||||
import kafka.server.QuotaFactory.QuotaManagers;
|
||||
|
@ -61,6 +62,7 @@ public class ReplicaManagerBuilder {
|
|||
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
|
||||
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
|
||||
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
|
||||
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
|
||||
private Optional<String> threadNamePrefix = Optional.empty();
|
||||
private Long brokerEpoch = -1L;
|
||||
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
|
||||
|
@ -140,6 +142,11 @@ public class ReplicaManagerBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ReplicaManagerBuilder setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch> delayedRemoteFetchPurgatory) {
|
||||
this.delayedRemoteFetchPurgatory = Optional.of(delayedRemoteFetchPurgatory);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords> delayedDeleteRecordsPurgatory) {
|
||||
this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory);
|
||||
return this;
|
||||
|
@ -189,6 +196,7 @@ public class ReplicaManagerBuilder {
|
|||
OptionConverters.toScala(delayedFetchPurgatory),
|
||||
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
|
||||
OptionConverters.toScala(delayedElectLeaderPurgatory),
|
||||
OptionConverters.toScala(delayedRemoteFetchPurgatory),
|
||||
OptionConverters.toScala(threadNamePrefix),
|
||||
() -> brokerEpoch,
|
||||
OptionConverters.toScala(addPartitionsToTxnManager));
|
||||
|
|
|
@ -526,7 +526,7 @@ class Partition(val topicPartition: TopicPartition,
|
|||
leaderReplicaIdOpt.filter(_ == localBrokerId)
|
||||
}
|
||||
|
||||
private def localLogWithEpochOrThrow(
|
||||
def localLogWithEpochOrThrow(
|
||||
currentLeaderEpoch: Optional[Integer],
|
||||
requireLeader: Boolean
|
||||
): UnifiedLog = {
|
||||
|
|
|
@ -270,6 +270,7 @@ class BrokerServer(
|
|||
isShuttingDown = isShuttingDown,
|
||||
zkClient = None,
|
||||
threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
|
||||
delayedRemoteFetchPurgatoryParam = None,
|
||||
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
|
||||
addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
|
||||
)
|
||||
|
|
|
@ -163,7 +163,7 @@ class DelayedFetch(
|
|||
tp -> status.fetchInfo
|
||||
}
|
||||
|
||||
val logReadResults = replicaManager.readFromLocalLog(
|
||||
val logReadResults = replicaManager.readFromLog(
|
||||
params,
|
||||
fetchInfos,
|
||||
quota,
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server
|
||||
|
||||
import org.apache.kafka.common.TopicIdPartition
|
||||
import org.apache.kafka.common.errors._
|
||||
import org.apache.kafka.common.protocol.Errors
|
||||
import org.apache.kafka.storage.internals.log.{FetchParams, FetchPartitionData, LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
|
||||
|
||||
import java.util.concurrent.{CompletableFuture, Future}
|
||||
import java.util.{Optional, OptionalInt, OptionalLong}
|
||||
import scala.collection._
|
||||
|
||||
/**
|
||||
* A remote fetch operation that can be created by the replica manager and watched
|
||||
* in the remote fetch operation purgatory
|
||||
*/
|
||||
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
|
||||
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
|
||||
remoteFetchInfo: RemoteStorageFetchInfo,
|
||||
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
|
||||
fetchParams: FetchParams,
|
||||
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
|
||||
replicaManager: ReplicaManager,
|
||||
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
|
||||
extends DelayedOperation(fetchParams.maxWaitMs) {
|
||||
|
||||
/**
|
||||
* The operation can be completed if:
|
||||
*
|
||||
* Case a: This broker is no longer the leader of the partition it tries to fetch
|
||||
* Case b: This broker does not know the partition it tries to fetch
|
||||
* Case c: The remote storage read request completed (succeeded or failed)
|
||||
* Case d: The partition is in an offline log directory on this broker
|
||||
*
|
||||
* Upon completion, should return whatever data is available for each valid partition
|
||||
*/
|
||||
override def tryComplete(): Boolean = {
|
||||
fetchPartitionStatus.foreach {
|
||||
case (topicPartition, fetchStatus) =>
|
||||
val fetchOffset = fetchStatus.startOffsetMetadata
|
||||
try {
|
||||
if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
|
||||
replicaManager.getPartitionOrException(topicPartition.topicPartition())
|
||||
}
|
||||
} catch {
|
||||
case _: KafkaStorageException => // Case d
|
||||
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchParams immediately")
|
||||
return forceComplete()
|
||||
case _: UnknownTopicOrPartitionException => // Case b
|
||||
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchParams immediately")
|
||||
return forceComplete()
|
||||
case _: NotLeaderOrFollowerException => // Case a
|
||||
debug("Broker is no longer the leader or follower of %s, satisfy %s immediately".format(topicPartition, fetchParams))
|
||||
return forceComplete()
|
||||
}
|
||||
}
|
||||
if (remoteFetchResult.isDone) // Case c
|
||||
forceComplete()
|
||||
else
|
||||
false
|
||||
}
|
||||
|
||||
override def onExpiration(): Unit = {
|
||||
// cancel the remote storage read task, if it has not been executed yet
|
||||
val cancelled = remoteFetchTask.cancel(true)
|
||||
if (!cancelled) debug(s"Remote fetch task for for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
|
||||
}
|
||||
|
||||
/**
|
||||
* Upon completion, read whatever data is available and pass to the complete callback
|
||||
*/
|
||||
override def onComplete(): Unit = {
|
||||
val fetchPartitionData = localReadResults.map { case (tp, result) =>
|
||||
if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
|
||||
&& remoteFetchResult.isDone
|
||||
&& result.error == Errors.NONE
|
||||
&& result.info.delayedRemoteStorageFetch.isPresent) {
|
||||
if (remoteFetchResult.get.error.isPresent) {
|
||||
tp -> ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false)
|
||||
} else {
|
||||
val info = remoteFetchResult.get.fetchDataInfo.get
|
||||
tp -> new FetchPartitionData(
|
||||
result.error,
|
||||
result.highWatermark,
|
||||
result.leaderLogStartOffset,
|
||||
info.records,
|
||||
Optional.empty(),
|
||||
if (result.lastStableOffset.isDefined) OptionalLong.of(result.lastStableOffset.get) else OptionalLong.empty(),
|
||||
info.abortedTransactions,
|
||||
if (result.preferredReadReplica.isDefined) OptionalInt.of(result.preferredReadReplica.get) else OptionalInt.empty(),
|
||||
false)
|
||||
}
|
||||
} else {
|
||||
tp -> result.toFetchPartitionData(false)
|
||||
}
|
||||
}
|
||||
|
||||
responseCallback(fetchPartitionData)
|
||||
}
|
||||
}
|
|
@ -629,6 +629,7 @@ class KafkaServer(
|
|||
brokerTopicStats = brokerTopicStats,
|
||||
isShuttingDown = isShuttingDown,
|
||||
zkClient = Some(zkClient),
|
||||
delayedRemoteFetchPurgatoryParam = None,
|
||||
threadNamePrefix = threadNamePrefix,
|
||||
brokerEpochSupplier = brokerEpochSupplier,
|
||||
addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
|
||||
|
|
|
@ -24,6 +24,7 @@ import kafka.log.remote.RemoteLogManager
|
|||
import kafka.log.{LogManager, UnifiedLog}
|
||||
import kafka.server.HostedPartition.Online
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.ReplicaManager.createLogReadResult
|
||||
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
|
||||
import kafka.server.metadata.ZkMetadataCache
|
||||
import kafka.utils.Implicits._
|
||||
|
@ -55,16 +56,16 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdParti
|
|||
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
|
||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
|
||||
import org.apache.kafka.server.common.MetadataVersion._
|
||||
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException}
|
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup
|
||||
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo}
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.{Files, Paths}
|
||||
import java.util
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.Lock
|
||||
import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit}
|
||||
import java.util.{Optional, OptionalInt, OptionalLong}
|
||||
import scala.collection.{Map, Seq, Set, mutable}
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
@ -175,6 +176,33 @@ object HostedPartition {
|
|||
|
||||
object ReplicaManager {
|
||||
val HighWatermarkFilename = "replication-offset-checkpoint"
|
||||
|
||||
def createLogReadResult(highWatermark: Long,
|
||||
leaderLogStartOffset: Long,
|
||||
leaderLogEndOffset: Long,
|
||||
e: Throwable) = {
|
||||
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
|
||||
divergingEpoch = None,
|
||||
highWatermark,
|
||||
leaderLogStartOffset,
|
||||
leaderLogEndOffset,
|
||||
followerLogStartOffset = -1L,
|
||||
fetchTimeMs = -1L,
|
||||
lastStableOffset = None,
|
||||
exception = Some(e))
|
||||
}
|
||||
|
||||
def createLogReadResult(e: Throwable): LogReadResult = {
|
||||
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
|
||||
divergingEpoch = None,
|
||||
highWatermark = UnifiedLog.UnknownOffset,
|
||||
leaderLogStartOffset = UnifiedLog.UnknownOffset,
|
||||
leaderLogEndOffset = UnifiedLog.UnknownOffset,
|
||||
followerLogStartOffset = UnifiedLog.UnknownOffset,
|
||||
fetchTimeMs = -1L,
|
||||
lastStableOffset = None,
|
||||
exception = Some(e))
|
||||
}
|
||||
}
|
||||
|
||||
class ReplicaManager(val config: KafkaConfig,
|
||||
|
@ -194,6 +222,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None,
|
||||
delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
|
||||
delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
|
||||
delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
|
||||
threadNamePrefix: Option[String] = None,
|
||||
val brokerEpochSupplier: () => Long = () => -1,
|
||||
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None
|
||||
|
@ -215,6 +244,9 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
|
||||
DelayedOperationPurgatory[DelayedElectLeader](
|
||||
purgatoryName = "ElectLeader", brokerId = config.brokerId))
|
||||
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
|
||||
DelayedOperationPurgatory[DelayedRemoteFetch](
|
||||
purgatoryName = "RemoteFetch", brokerId = config.brokerId))
|
||||
|
||||
/* epoch of the controller that last changed the leader */
|
||||
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
|
||||
|
@ -330,6 +362,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
|
||||
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
|
||||
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
|
||||
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1121,21 +1154,69 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
|
||||
* else returns [[None]].
|
||||
*/
|
||||
private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
|
||||
params: FetchParams,
|
||||
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
|
||||
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
|
||||
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = {
|
||||
val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition())
|
||||
val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
|
||||
var remoteFetchTask: Future[Void] = null
|
||||
try {
|
||||
remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo, (result: RemoteLogReadResult) => {
|
||||
remoteFetchResult.complete(result)
|
||||
delayedRemoteFetchPurgatory.checkAndComplete(key)
|
||||
})
|
||||
} catch {
|
||||
case e: RejectedExecutionException =>
|
||||
// Return the error if any in scheduling the remote fetch task
|
||||
return Some(createLogReadResult(e))
|
||||
}
|
||||
|
||||
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
|
||||
fetchPartitionStatus, params, logReadResults, this, responseCallback)
|
||||
|
||||
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
|
||||
None
|
||||
}
|
||||
|
||||
private def buildPartitionToFetchPartitionData(logReadResults: Seq[(TopicIdPartition, LogReadResult)],
|
||||
remoteFetchTopicPartition: TopicPartition,
|
||||
error: LogReadResult): Seq[(TopicIdPartition, FetchPartitionData)] = {
|
||||
logReadResults.map { case (tp, result) =>
|
||||
val fetchPartitionData = {
|
||||
if (tp.topicPartition().equals(remoteFetchTopicPartition))
|
||||
error
|
||||
else
|
||||
result
|
||||
}.toFetchPartitionData(false)
|
||||
|
||||
tp -> fetchPartitionData
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch messages from a replica, and wait until enough data can be fetched and return;
|
||||
* the callback function will be triggered either when timeout or required fetch info is satisfied.
|
||||
* Consumers may fetch from any replica, but followers can only fetch from the leader.
|
||||
*/
|
||||
def fetchMessages(
|
||||
params: FetchParams,
|
||||
def fetchMessages(params: FetchParams,
|
||||
fetchInfos: Seq[(TopicIdPartition, PartitionData)],
|
||||
quota: ReplicaQuota,
|
||||
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
|
||||
): Unit = {
|
||||
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = {
|
||||
|
||||
// check if this fetch request can be satisfied right away
|
||||
val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)
|
||||
val logReadResults = readFromLog(params, fetchInfos, quota, readFromPurgatory = false)
|
||||
var bytesReadable: Long = 0
|
||||
var errorReadingData = false
|
||||
|
||||
// The 1st topic-partition that has to be read from remote storage
|
||||
var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
|
||||
|
||||
var hasDivergingEpoch = false
|
||||
var hasPreferredReadReplica = false
|
||||
val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
|
||||
|
@ -1145,6 +1226,9 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
|
||||
if (logReadResult.error != Errors.NONE)
|
||||
errorReadingData = true
|
||||
if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) {
|
||||
remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
|
||||
}
|
||||
if (logReadResult.divergingEpoch.nonEmpty)
|
||||
hasDivergingEpoch = true
|
||||
if (logReadResult.preferredReadReplica.nonEmpty)
|
||||
|
@ -1153,14 +1237,15 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
logReadResultMap.put(topicIdPartition, logReadResult)
|
||||
}
|
||||
|
||||
// respond immediately if 1) fetch request does not want to wait
|
||||
// Respond immediately if no remote fetches are required and any of the below conditions is true
|
||||
// 1) fetch request does not want to wait
|
||||
// 2) fetch request does not require any data
|
||||
// 3) has enough data to respond
|
||||
// 4) some error happens while reading data
|
||||
// 5) we found a diverging epoch
|
||||
// 6) has a preferred read replica
|
||||
if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
|
||||
hasDivergingEpoch || hasPreferredReadReplica) {
|
||||
if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
|
||||
hasDivergingEpoch || hasPreferredReadReplica)) {
|
||||
val fetchPartitionData = logReadResults.map { case (tp, result) =>
|
||||
val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
|
||||
tp -> result.toFetchPartitionData(isReassignmentFetch)
|
||||
|
@ -1175,6 +1260,19 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
|
||||
})
|
||||
}
|
||||
|
||||
if (remoteFetchInfo.isPresent) {
|
||||
val maybeLogReadResultWithError = processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, logReadResults, fetchPartitionStatus)
|
||||
if (maybeLogReadResultWithError.isDefined) {
|
||||
// If there is an error in scheduling the remote fetch task, return what we currently have
|
||||
// (the data read from local log segment for the other topic-partitions) and an error for the topic-partition
|
||||
// that we couldn't read from remote storage
|
||||
val partitionToFetchPartitionData = buildPartitionToFetchPartitionData(logReadResults, remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
|
||||
responseCallback(partitionToFetchPartitionData)
|
||||
}
|
||||
} else {
|
||||
// If there is not enough data to respond and there is no remote data, we will let the fetch request
|
||||
// wait for new data.
|
||||
val delayedFetch = new DelayedFetch(
|
||||
params = params,
|
||||
fetchPartitionStatus = fetchPartitionStatus,
|
||||
|
@ -1192,32 +1290,47 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from multiple topic partitions at the given offset up to maxSize bytes
|
||||
*/
|
||||
def readFromLocalLog(
|
||||
def readFromLog(
|
||||
params: FetchParams,
|
||||
readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
|
||||
quota: ReplicaQuota,
|
||||
readFromPurgatory: Boolean
|
||||
): Seq[(TopicIdPartition, LogReadResult)] = {
|
||||
readFromPurgatory: Boolean): Seq[(TopicIdPartition, LogReadResult)] = {
|
||||
val traceEnabled = isTraceEnabled
|
||||
|
||||
def checkFetchDataInfo(partition: Partition, givenFetchedDataInfo: FetchDataInfo) = {
|
||||
if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
|
||||
// If the partition is being throttled, simply return an empty set.
|
||||
new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, MemoryRecords.EMPTY)
|
||||
} else if (!params.hardMaxBytesLimit && givenFetchedDataInfo.firstEntryIncomplete) {
|
||||
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
|
||||
// progress in such cases and don't need to report a `RecordTooLargeException`
|
||||
new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, MemoryRecords.EMPTY)
|
||||
} else {
|
||||
givenFetchedDataInfo
|
||||
}
|
||||
}
|
||||
|
||||
def read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
|
||||
val offset = fetchInfo.fetchOffset
|
||||
val partitionFetchSize = fetchInfo.maxBytes
|
||||
val followerLogStartOffset = fetchInfo.logStartOffset
|
||||
|
||||
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
|
||||
var log: UnifiedLog = null
|
||||
var partition : Partition = null
|
||||
val fetchTimeMs = time.milliseconds
|
||||
try {
|
||||
if (traceEnabled)
|
||||
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
|
||||
s"remaining response limit $limitBytes" +
|
||||
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
|
||||
|
||||
val partition = getPartitionOrException(tp.topicPartition)
|
||||
val fetchTimeMs = time.milliseconds
|
||||
partition = getPartitionOrException(tp.topicPartition)
|
||||
|
||||
// Check if topic ID from the fetch request/session matches the ID in the log
|
||||
val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else Some(tp.topicId)
|
||||
|
@ -1246,6 +1359,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
preferredReadReplica = preferredReadReplica,
|
||||
exception = None)
|
||||
} else {
|
||||
log = partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, params.fetchOnlyLeader())
|
||||
|
||||
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
|
||||
val readInfo: LogReadInfo = partition.fetchRecords(
|
||||
fetchParams = params,
|
||||
|
@ -1253,19 +1368,9 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
fetchTimeMs = fetchTimeMs,
|
||||
maxBytes = adjustedMaxBytes,
|
||||
minOneMessage = minOneMessage,
|
||||
updateFetchState = !readFromPurgatory
|
||||
)
|
||||
updateFetchState = !readFromPurgatory)
|
||||
|
||||
val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
|
||||
// If the partition is being throttled, simply return an empty set.
|
||||
new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
|
||||
} else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
|
||||
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
|
||||
// progress in such cases and don't need to report a `RecordTooLargeException`
|
||||
new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
|
||||
} else {
|
||||
readInfo.fetchedData
|
||||
}
|
||||
val fetchDataInfo = checkFetchDataInfo(partition, readInfo.fetchedData)
|
||||
|
||||
LogReadResult(info = fetchDataInfo,
|
||||
divergingEpoch = readInfo.divergingEpoch.asScala,
|
||||
|
@ -1288,17 +1393,10 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
_: FencedLeaderEpochException |
|
||||
_: ReplicaNotAvailableException |
|
||||
_: KafkaStorageException |
|
||||
_: OffsetOutOfRangeException |
|
||||
_: InconsistentTopicIdException) =>
|
||||
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
|
||||
divergingEpoch = None,
|
||||
highWatermark = UnifiedLog.UnknownOffset,
|
||||
leaderLogStartOffset = UnifiedLog.UnknownOffset,
|
||||
leaderLogEndOffset = UnifiedLog.UnknownOffset,
|
||||
followerLogStartOffset = UnifiedLog.UnknownOffset,
|
||||
fetchTimeMs = -1L,
|
||||
lastStableOffset = None,
|
||||
exception = Some(e))
|
||||
createLogReadResult(e)
|
||||
case e: OffsetOutOfRangeException =>
|
||||
handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes, minOneMessage, log, fetchTimeMs, e)
|
||||
case e: Throwable =>
|
||||
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
|
||||
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
|
||||
|
@ -1335,6 +1433,50 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
result
|
||||
}
|
||||
|
||||
private def handleOffsetOutOfRangeError(tp: TopicIdPartition, params: FetchParams, fetchInfo: PartitionData,
|
||||
adjustedMaxBytes: Int, minOneMessage:
|
||||
Boolean, log: UnifiedLog, fetchTimeMs: Long,
|
||||
exception: OffsetOutOfRangeException): LogReadResult = {
|
||||
val offset = fetchInfo.fetchOffset
|
||||
// In case of offset out of range errors, handle it for tiered storage only if all the below conditions are true.
|
||||
// 1) remote log manager is enabled and it is available
|
||||
// 2) `log` instance should not be null here as that would have been caught earlier with NotLeaderForPartitionException or ReplicaNotAvailableException.
|
||||
// 3) fetch offset is within the offset range of the remote storage layer
|
||||
if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() &&
|
||||
log.logStartOffset <= offset && offset < log.localLogStartOffset())
|
||||
{
|
||||
val highWatermark = log.highWatermark
|
||||
val leaderLogStartOffset = log.logStartOffset
|
||||
val leaderLogEndOffset = log.logEndOffset
|
||||
|
||||
if (params.isFromFollower) {
|
||||
// If it is from a follower then send the offset metadata only as the data is already available in remote
|
||||
// storage and throw an error saying that this offset is moved to tiered storage.
|
||||
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
|
||||
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
|
||||
} else {
|
||||
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
|
||||
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
|
||||
val fetchDataInfo =
|
||||
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
|
||||
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
|
||||
fetchInfo, params.isolation, params.hardMaxBytesLimit())))
|
||||
|
||||
LogReadResult(fetchDataInfo,
|
||||
divergingEpoch = None,
|
||||
highWatermark,
|
||||
leaderLogStartOffset,
|
||||
leaderLogEndOffset,
|
||||
fetchInfo.logStartOffset,
|
||||
fetchTimeMs,
|
||||
Some(log.lastStableOffset),
|
||||
exception = None)
|
||||
}
|
||||
} else {
|
||||
createLogReadResult(exception)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using the configured [[ReplicaSelector]], determine the preferred read replica for a partition given the
|
||||
* client metadata, the requested offset, and the current set of replicas. If the preferred read replica is the
|
||||
|
@ -2045,6 +2187,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
replicaFetcherManager.shutdown()
|
||||
replicaAlterLogDirsManager.shutdown()
|
||||
delayedFetchPurgatory.shutdown()
|
||||
delayedRemoteFetchPurgatory.shutdown()
|
||||
delayedProducePurgatory.shutdown()
|
||||
delayedDeleteRecordsPurgatory.shutdown()
|
||||
delayedElectLeaderPurgatory.shutdown()
|
||||
|
|
|
@ -167,11 +167,11 @@ public class RemoteLogManagerTest {
|
|||
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
|
||||
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
|
||||
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
|
||||
long offset = remoteLogManager.findHighestRemoteOffset(tpId);
|
||||
long offset = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
|
||||
assertEquals(-1, offset);
|
||||
|
||||
when(remoteLogMetadataManager.highestOffsetForEpoch(tpId, 2)).thenReturn(Optional.of(200L));
|
||||
long offset2 = remoteLogManager.findHighestRemoteOffset(tpId);
|
||||
long offset2 = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
|
||||
assertEquals(200, offset2);
|
||||
}
|
||||
|
||||
|
@ -261,7 +261,7 @@ public class RemoteLogManagerTest {
|
|||
|
||||
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
|
||||
task.convertToLeader(2);
|
||||
task.copyLogSegmentsToRemote();
|
||||
task.copyLogSegmentsToRemote(mockLog);
|
||||
|
||||
// verify remoteLogMetadataManager did add the expected RemoteLogSegmentMetadata
|
||||
ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
|
||||
|
@ -318,7 +318,7 @@ public class RemoteLogManagerTest {
|
|||
|
||||
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
|
||||
task.convertToFollower();
|
||||
task.copyLogSegmentsToRemote();
|
||||
task.copyLogSegmentsToRemote(mockLog);
|
||||
|
||||
// verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments
|
||||
verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.log.remote;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
|
||||
import org.apache.kafka.common.record.Records;
|
||||
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
|
||||
import org.apache.kafka.storage.internals.log.FetchDataInfo;
|
||||
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
|
||||
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
|
||||
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class RemoteLogReaderTest {
|
||||
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
|
||||
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
|
||||
Records records = mock(Records.class);
|
||||
|
||||
@Test
|
||||
public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOException {
|
||||
FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, records);
|
||||
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
|
||||
|
||||
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
|
||||
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
|
||||
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
|
||||
remoteLogReader.call();
|
||||
|
||||
// verify the callback did get invoked with the expected remoteLogReadResult
|
||||
ArgumentCaptor<RemoteLogReadResult> remoteLogReadResultArg = ArgumentCaptor.forClass(RemoteLogReadResult.class);
|
||||
verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
|
||||
RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue();
|
||||
assertFalse(actualRemoteLogReadResult.error.isPresent());
|
||||
assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
|
||||
assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException {
|
||||
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new OffsetOutOfRangeException("error"));
|
||||
|
||||
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
|
||||
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
|
||||
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
|
||||
remoteLogReader.call();
|
||||
|
||||
// verify the callback did get invoked with the expected remoteLogReadResult
|
||||
ArgumentCaptor<RemoteLogReadResult> remoteLogReadResultArg = ArgumentCaptor.forClass(RemoteLogReadResult.class);
|
||||
verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
|
||||
RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue();
|
||||
assertTrue(actualRemoteLogReadResult.error.isPresent());
|
||||
assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());
|
||||
}
|
||||
}
|
|
@ -188,7 +188,7 @@ class DelayedFetchTest {
|
|||
fetchPartitionData: FetchRequest.PartitionData,
|
||||
error: Errors
|
||||
): Unit = {
|
||||
when(replicaManager.readFromLocalLog(
|
||||
when(replicaManager.readFromLog(
|
||||
fetchParams,
|
||||
readPartitionInfo = Seq((topicIdPartition, fetchPartitionData)),
|
||||
quota = replicaQuota,
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.server
|
||||
|
||||
import kafka.cluster.Partition
|
||||
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.record.MemoryRecords
|
||||
import org.apache.kafka.common.requests.FetchRequest
|
||||
import org.apache.kafka.common.{TopicIdPartition, Uuid}
|
||||
import org.apache.kafka.storage.internals.log._
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.mockito.Mockito.{mock, when}
|
||||
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
import scala.collection._
|
||||
|
||||
class DelayedRemoteFetchTest {
|
||||
private val maxBytes = 1024
|
||||
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
|
||||
private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
|
||||
private val fetchOffset = 500L
|
||||
private val logStartOffset = 0L
|
||||
private val currentLeaderEpoch = Optional.of[Integer](10)
|
||||
private val replicaId = 1
|
||||
|
||||
private val fetchStatus = FetchPartitionStatus(
|
||||
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
|
||||
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
|
||||
private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
|
||||
|
||||
@Test
|
||||
def testFetch(): Unit = {
|
||||
var actualTopicPartition: Option[TopicIdPartition] = None
|
||||
var fetchResultOpt: Option[FetchPartitionData] = None
|
||||
|
||||
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
|
||||
assertEquals(1, responses.size)
|
||||
actualTopicPartition = Some(responses.head._1)
|
||||
fetchResultOpt = Some(responses.head._2)
|
||||
}
|
||||
|
||||
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
|
||||
future.complete(null)
|
||||
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
|
||||
val highWatermark = 100
|
||||
val leaderLogStartOffset = 10
|
||||
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
|
||||
.thenReturn(mock(classOf[Partition]))
|
||||
|
||||
assertTrue(delayedRemoteFetch.tryComplete())
|
||||
assertTrue(delayedRemoteFetch.isCompleted)
|
||||
assertTrue(actualTopicPartition.isDefined)
|
||||
assertEquals(topicIdPartition, actualTopicPartition.get)
|
||||
assertTrue(fetchResultOpt.isDefined)
|
||||
|
||||
val fetchResult = fetchResultOpt.get
|
||||
assertEquals(Errors.NONE, fetchResult.error)
|
||||
assertEquals(highWatermark, fetchResult.highWatermark)
|
||||
assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testNotLeaderOrFollower(): Unit = {
|
||||
var actualTopicPartition: Option[TopicIdPartition] = None
|
||||
var fetchResultOpt: Option[FetchPartitionData] = None
|
||||
|
||||
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
|
||||
assertEquals(1, responses.size)
|
||||
actualTopicPartition = Some(responses.head._1)
|
||||
fetchResultOpt = Some(responses.head._2)
|
||||
}
|
||||
|
||||
// throw exception while getPartition
|
||||
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
|
||||
.thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available"))
|
||||
|
||||
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
|
||||
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
|
||||
|
||||
val logReadInfo = buildReadResult(Errors.NONE)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
// delayed remote fetch should still be able to complete
|
||||
assertTrue(delayedRemoteFetch.tryComplete())
|
||||
assertTrue(delayedRemoteFetch.isCompleted)
|
||||
assertEquals(topicIdPartition, actualTopicPartition.get)
|
||||
assertTrue(fetchResultOpt.isDefined)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testErrorLogReadInfo(): Unit = {
|
||||
var actualTopicPartition: Option[TopicIdPartition] = None
|
||||
var fetchResultOpt: Option[FetchPartitionData] = None
|
||||
|
||||
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
|
||||
assertEquals(1, responses.size)
|
||||
actualTopicPartition = Some(responses.head._1)
|
||||
fetchResultOpt = Some(responses.head._2)
|
||||
}
|
||||
|
||||
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
|
||||
.thenReturn(mock(classOf[Partition]))
|
||||
|
||||
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
|
||||
future.complete(null)
|
||||
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
|
||||
|
||||
// build a read result with error
|
||||
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
assertTrue(delayedRemoteFetch.tryComplete())
|
||||
assertTrue(delayedRemoteFetch.isCompleted)
|
||||
assertEquals(topicIdPartition, actualTopicPartition.get)
|
||||
assertTrue(fetchResultOpt.isDefined)
|
||||
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error)
|
||||
}
|
||||
|
||||
private def buildFollowerFetchParams(replicaId: Int,
|
||||
maxWaitMs: Int): FetchParams = {
|
||||
new FetchParams(
|
||||
ApiKeys.FETCH.latestVersion,
|
||||
replicaId,
|
||||
1,
|
||||
maxWaitMs,
|
||||
1,
|
||||
maxBytes,
|
||||
FetchIsolation.LOG_END,
|
||||
Optional.empty()
|
||||
)
|
||||
}
|
||||
|
||||
private def buildReadResult(error: Errors,
|
||||
highWatermark: Int = 0,
|
||||
leaderLogStartOffset: Int = 0): LogReadResult = {
|
||||
LogReadResult(
|
||||
exception = if (error != Errors.NONE) Some(error.exception) else None,
|
||||
info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
|
||||
divergingEpoch = None,
|
||||
highWatermark = highWatermark,
|
||||
leaderLogStartOffset = leaderLogStartOffset,
|
||||
leaderLogEndOffset = -1L,
|
||||
followerLogStartOffset = -1L,
|
||||
fetchTimeMs = -1L,
|
||||
lastStableOffset = None)
|
||||
}
|
||||
|
||||
}
|
|
@ -158,15 +158,20 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
|||
private[this] def fetchOffsetAndEpoch(serverId: Int,
|
||||
timestamp: Long,
|
||||
version: Short): (Long, Int) = {
|
||||
val (offset, leaderEpoch, _) = fetchOffsetAndEpochWithError(serverId, timestamp, version)
|
||||
(offset, leaderEpoch)
|
||||
}
|
||||
|
||||
private[this] def fetchOffsetAndEpochWithError(serverId: Int, timestamp: Long, version: Short): (Long, Int, Short) = {
|
||||
val partitionData = sendRequest(serverId, timestamp, version)
|
||||
|
||||
if (version == 0) {
|
||||
if (partitionData.oldStyleOffsets().isEmpty)
|
||||
(-1, partitionData.leaderEpoch)
|
||||
(-1, partitionData.leaderEpoch, partitionData.errorCode())
|
||||
else
|
||||
(partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch)
|
||||
(partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch, partitionData.errorCode())
|
||||
} else
|
||||
(partitionData.offset, partitionData.leaderEpoch)
|
||||
(partitionData.offset, partitionData.leaderEpoch, partitionData.errorCode())
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -202,8 +207,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
|||
assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
|
||||
|
||||
// The latest offset reflects the updated epoch
|
||||
assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
|
||||
assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
|
||||
assertEquals((10L, secondLeaderEpoch, Errors.NONE.code()), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
|
||||
assertEquals((9L, secondLeaderEpoch, Errors.NONE.code()), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -67,7 +67,7 @@ class ReplicaManagerQuotasTest {
|
|||
.thenReturn(true)
|
||||
|
||||
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
|
||||
val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
|
||||
"Given two partitions, with only one throttled, we should get the first")
|
||||
assertEquals(0, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size,
|
||||
|
@ -85,7 +85,7 @@ class ReplicaManagerQuotasTest {
|
|||
.thenReturn(true)
|
||||
|
||||
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
|
||||
val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
assertEquals(0, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
|
||||
"Given two partitions, with both throttled, we should get no messages")
|
||||
assertEquals(0, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size,
|
||||
|
@ -103,7 +103,7 @@ class ReplicaManagerQuotasTest {
|
|||
.thenReturn(false)
|
||||
|
||||
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
|
||||
val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
|
||||
"Given two partitions, with both non-throttled, we should get both messages")
|
||||
assertEquals(1, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size,
|
||||
|
@ -121,7 +121,7 @@ class ReplicaManagerQuotasTest {
|
|||
.thenReturn(true)
|
||||
|
||||
val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
|
||||
val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
|
||||
assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
|
||||
"Given two partitions, with only one throttled, we should get the first")
|
||||
|
||||
|
@ -137,7 +137,7 @@ class ReplicaManagerQuotasTest {
|
|||
when(quota.isQuotaExceeded).thenReturn(true)
|
||||
|
||||
val fetchParams = PartitionTest.consumerFetchParams()
|
||||
val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false).toMap
|
||||
val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false).toMap
|
||||
assertEquals(1, fetch(topicIdPartition1).info.records.batches.asScala.size,
|
||||
"Replication throttled partitions should return data for consumer fetch")
|
||||
assertEquals(1, fetch(topicIdPartition2).info.records.batches.asScala.size,
|
||||
|
|
|
@ -28,6 +28,7 @@ public class FetchDataInfo {
|
|||
public final Records records;
|
||||
public final boolean firstEntryIncomplete;
|
||||
public final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
|
||||
public final Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch;
|
||||
|
||||
public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
|
||||
Records records) {
|
||||
|
@ -38,10 +39,19 @@ public class FetchDataInfo {
|
|||
Records records,
|
||||
boolean firstEntryIncomplete,
|
||||
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {
|
||||
this(fetchOffsetMetadata, records, firstEntryIncomplete, abortedTransactions, Optional.empty());
|
||||
}
|
||||
|
||||
public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
|
||||
Records records,
|
||||
boolean firstEntryIncomplete,
|
||||
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions,
|
||||
Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch) {
|
||||
this.fetchOffsetMetadata = fetchOffsetMetadata;
|
||||
this.records = records;
|
||||
this.firstEntryIncomplete = firstEntryIncomplete;
|
||||
this.abortedTransactions = abortedTransactions;
|
||||
this.delayedRemoteStorageFetch = delayedRemoteStorageFetch;
|
||||
}
|
||||
|
||||
public static FetchDataInfo empty(long fetchOffset) {
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class RemoteLogReadResult {
|
||||
public final Optional<FetchDataInfo> fetchDataInfo;
|
||||
public final Optional<Throwable> error;
|
||||
|
||||
public RemoteLogReadResult(Optional<FetchDataInfo> fetchDataInfo, Optional<Throwable> error) {
|
||||
this.fetchDataInfo = fetchDataInfo;
|
||||
this.error = error;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.requests.FetchRequest;
|
||||
|
||||
public class RemoteStorageFetchInfo {
|
||||
|
||||
public final int fetchMaxBytes;
|
||||
public final boolean minOneMessage;
|
||||
public final TopicPartition topicPartition;
|
||||
public final FetchRequest.PartitionData fetchInfo;
|
||||
public final FetchIsolation fetchIsolation;
|
||||
public final boolean hardMaxBytesLimit;
|
||||
|
||||
public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, TopicPartition topicPartition,
|
||||
FetchRequest.PartitionData fetchInfo, FetchIsolation fetchIsolation,
|
||||
boolean hardMaxBytesLimit) {
|
||||
this.fetchMaxBytes = fetchMaxBytes;
|
||||
this.minOneMessage = minOneMessage;
|
||||
this.topicPartition = topicPartition;
|
||||
this.fetchInfo = fetchInfo;
|
||||
this.fetchIsolation = fetchIsolation;
|
||||
this.hardMaxBytesLimit = hardMaxBytesLimit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RemoteStorageFetchInfo{" +
|
||||
"fetchMaxBytes=" + fetchMaxBytes +
|
||||
", minOneMessage=" + minOneMessage +
|
||||
", topicPartition=" + topicPartition +
|
||||
", fetchInfo=" + fetchInfo +
|
||||
", fetchIsolation=" + fetchIsolation +
|
||||
", hardMaxBytesLimit=" + hardMaxBytesLimit +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import org.apache.kafka.common.internals.FatalExitError;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class RemoteStorageThreadPool extends ThreadPoolExecutor {
|
||||
private final Logger logger;
|
||||
|
||||
public RemoteStorageThreadPool(String threadNamePrefix,
|
||||
int numThreads,
|
||||
int maxPendingTasks) {
|
||||
super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
|
||||
new RemoteStorageThreadFactory(threadNamePrefix));
|
||||
logger = new LogContext() {
|
||||
@Override
|
||||
public String logPrefix() {
|
||||
return "[" + Thread.currentThread().getName() + "]";
|
||||
}
|
||||
}.logger(RemoteStorageThreadPool.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecute(Runnable runnable, Throwable th) {
|
||||
if (th != null) {
|
||||
if (th instanceof FatalExitError) {
|
||||
logger.error("Stopping the server as it encountered a fatal error.");
|
||||
Exit.exit(((FatalExitError) th).statusCode());
|
||||
} else {
|
||||
if (!isShutdown())
|
||||
logger.error("Error occurred while executing task: {}", runnable, th);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteStorageThreadFactory implements ThreadFactory {
|
||||
private final String namePrefix;
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(0);
|
||||
|
||||
RemoteStorageThreadFactory(String namePrefix) {
|
||||
this.namePrefix = namePrefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue