KAFKA-17742: Move DelayedShareFetchPurgatory declaration to ReplicaManager (#17437)

Declare the delayed share fetch purgatory inside ReplicaManager along with the existing purgatories. 

Check the share fetch purgatory when a replica becomes the follower or a replica is deleted from a broker through ReplicaManager.

Perform a checkAndComplete for share fetch when HWM is updated.

Reviewers:  Andrew Schofield <aschofield@confluent.io>,  Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Abhinav Dixit 2024-10-18 02:28:10 +05:30 committed by GitHub
parent 6d39031958
commit cb3b03377d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 248 additions and 113 deletions

View File

@ -33,6 +33,7 @@ import kafka.server.KafkaConfig;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.share.DelayedShareFetch;
import kafka.zk.KafkaZkClient; import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
@ -70,6 +71,7 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedShareFetch>> delayedShareFetchPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty(); private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L; private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty(); private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
@ -215,6 +217,7 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedElectLeaderPurgatory), OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory), OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory), OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(delayedShareFetchPurgatory),
OptionConverters.toScala(threadNamePrefix), OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch, () -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager), OptionConverters.toScala(addPartitionsToTxnManager),

View File

@ -135,8 +135,12 @@ public class DelayedShareFetch extends DelayedOperation {
// then we should check if there is a pending share fetch request for the topic-partition and complete it. // then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete // we directly call delayedShareFetchPurgatory.checkAndComplete
sharePartitionManager.addPurgatoryCheckAndCompleteDelayedActionToActionQueue( replicaManager.addToActionQueue(() -> {
topicPartitionData.keySet(), shareFetchData.groupId()); topicPartitionData.keySet().forEach(topicIdPartition ->
replicaManager.completeDelayedShareFetchRequest(
new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())));
return BoxedUnit.UNIT;
});
} }
} }

View File

@ -29,7 +29,7 @@ public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey, Del
private final Uuid topicId; private final Uuid topicId;
private final int partition; private final int partition;
DelayedShareFetchPartitionKey(Uuid topicId, int partition) { public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
this.topicId = topicId; this.topicId = topicId;
this.partition = partition; this.partition = partition;
} }

View File

@ -16,7 +16,7 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
@ -271,9 +271,10 @@ public class SharePartition {
private SharePartitionState partitionState; private SharePartitionState partitionState;
/** /**
* The delayed share fetch purgatory is used to store the share fetch requests that could not be processed immediately. * The replica manager is used to check to see if any delayed share fetch request can be completed because of data
* availability due to acquisition lock timeout.
*/ */
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory; private final ReplicaManager replicaManager;
SharePartition( SharePartition(
String groupId, String groupId,
@ -284,7 +285,7 @@ public class SharePartition {
Timer timer, Timer timer,
Time time, Time time,
Persister persister, Persister persister,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory, ReplicaManager replicaManager,
GroupConfigManager groupConfigManager GroupConfigManager groupConfigManager
) { ) {
this.groupId = groupId; this.groupId = groupId;
@ -300,7 +301,7 @@ public class SharePartition {
this.time = time; this.time = time;
this.persister = persister; this.persister = persister;
this.partitionState = SharePartitionState.EMPTY; this.partitionState = SharePartitionState.EMPTY;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager; this.groupConfigManager = groupConfigManager;
} }
@ -1810,7 +1811,7 @@ public class SharePartition {
// If we have an acquisition lock timeout for a share-partition, then we should check if // If we have an acquisition lock timeout for a share-partition, then we should check if
// there is a pending share fetch request for the share-partition and complete it. // there is a pending share fetch request for the share-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
}); });
} }
} finally { } finally {

View File

@ -16,8 +16,6 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.server.ActionQueue;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.AcknowledgeType;
@ -75,7 +73,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import scala.jdk.javaapi.CollectionConverters; import scala.jdk.javaapi.CollectionConverters;
import scala.runtime.BoxedUnit;
/** /**
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
@ -151,16 +148,6 @@ public class SharePartitionManager implements AutoCloseable {
*/ */
private final ShareGroupMetrics shareGroupMetrics; private final ShareGroupMetrics shareGroupMetrics;
/**
* The delayed share fetch purgatory is used to store the share fetch requests that could not be processed immediately.
*/
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;
/**
* The delayed actions queue is used to complete any pending delayed share fetch actions.
*/
private final ActionQueue delayedActionsQueue;
public SharePartitionManager( public SharePartitionManager(
ReplicaManager replicaManager, ReplicaManager replicaManager,
Time time, Time time,
@ -168,9 +155,7 @@ public class SharePartitionManager implements AutoCloseable {
int defaultRecordLockDurationMs, int defaultRecordLockDurationMs,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages, int maxInFlightMessages,
int shareFetchPurgatoryPurgeIntervalRequests,
Persister persister, Persister persister,
ActionQueue delayedActionsQueue,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
Metrics metrics Metrics metrics
) { ) {
@ -181,9 +166,7 @@ public class SharePartitionManager implements AutoCloseable {
defaultRecordLockDurationMs, defaultRecordLockDurationMs,
maxDeliveryCount, maxDeliveryCount,
maxInFlightMessages, maxInFlightMessages,
shareFetchPurgatoryPurgeIntervalRequests,
persister, persister,
delayedActionsQueue,
groupConfigManager, groupConfigManager,
metrics metrics
); );
@ -197,9 +180,7 @@ public class SharePartitionManager implements AutoCloseable {
int defaultRecordLockDurationMs, int defaultRecordLockDurationMs,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages, int maxInFlightMessages,
int shareFetchPurgatoryPurgeIntervalRequests,
Persister persister, Persister persister,
ActionQueue delayedActionsQueue,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
Metrics metrics Metrics metrics
) { ) {
@ -215,8 +196,6 @@ public class SharePartitionManager implements AutoCloseable {
this.maxDeliveryCount = maxDeliveryCount; this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages; this.maxInFlightMessages = maxInFlightMessages;
this.persister = persister; this.persister = persister;
this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true);
this.delayedActionsQueue = delayedActionsQueue;
this.groupConfigManager = groupConfigManager; this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
} }
@ -234,8 +213,6 @@ public class SharePartitionManager implements AutoCloseable {
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages, int maxInFlightMessages,
Persister persister, Persister persister,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory,
ActionQueue delayedActionsQueue,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
Metrics metrics Metrics metrics
) { ) {
@ -250,8 +227,6 @@ public class SharePartitionManager implements AutoCloseable {
this.maxDeliveryCount = maxDeliveryCount; this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages; this.maxInFlightMessages = maxInFlightMessages;
this.persister = persister; this.persister = persister;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
this.delayedActionsQueue = delayedActionsQueue;
this.groupConfigManager = groupConfigManager; this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
} }
@ -319,7 +294,7 @@ public class SharePartitionManager implements AutoCloseable {
// If we have an acknowledgement completed for a topic-partition, then we should check if // If we have an acknowledgement completed for a topic-partition, then we should check if
// there is a pending share fetch request for the topic-partition and complete it. // there is a pending share fetch request for the topic-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
futures.put(topicIdPartition, future); futures.put(topicIdPartition, future);
} else { } else {
@ -338,15 +313,6 @@ public class SharePartitionManager implements AutoCloseable {
}); });
} }
void addPurgatoryCheckAndCompleteDelayedActionToActionQueue(Set<TopicIdPartition> topicIdPartitions, String groupId) {
delayedActionsQueue.add(() -> {
topicIdPartitions.forEach(topicIdPartition ->
delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
return BoxedUnit.UNIT;
});
}
/** /**
* The release session method is used to release the session for the memberId of respective group. * The release session method is used to release the session for the memberId of respective group.
* The method post removing session also releases acquired records for the respective member. * The method post removing session also releases acquired records for the respective member.
@ -397,7 +363,7 @@ public class SharePartitionManager implements AutoCloseable {
// If we have a release acquired request completed for a topic-partition, then we should check if // If we have a release acquired request completed for a topic-partition, then we should check if
// there is a pending share fetch request for the topic-partition and complete it. // there is a pending share fetch request for the topic-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
futuresMap.put(topicIdPartition, future); futuresMap.put(topicIdPartition, future);
} }
@ -557,13 +523,12 @@ public class SharePartitionManager implements AutoCloseable {
// Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be // Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be
// completed else watch until it can be completed/timeout. // completed else watch until it can be completed/timeout.
private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<DelayedShareFetchKey> keys) { private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<DelayedShareFetchKey> keys) {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, replicaManager.addDelayedShareFetchRequest(delayedShareFetch,
CollectionConverters.asScala(keys).toSeq().indices()); CollectionConverters.asScala(keys).toSeq());
} }
@Override @Override
public void close() throws Exception { public void close() throws Exception {
this.delayedShareFetchPurgatory.shutdown();
this.timer.close(); this.timer.close();
this.persister.stop(); this.persister.stop();
if (!fetchQueue.isEmpty()) { if (!fetchQueue.isEmpty()) {
@ -676,7 +641,7 @@ public class SharePartitionManager implements AutoCloseable {
timer, timer,
time, time,
persister, persister,
delayedShareFetchPurgatory, replicaManager,
groupConfigManager groupConfigManager
); );
this.shareGroupMetrics.partitionLoadTime(start); this.shareGroupMetrics.partitionLoadTime(start);

View File

@ -25,6 +25,7 @@ import kafka.log._
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.server._ import kafka.server._
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchPartitionKey}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._ import kafka.utils._
import kafka.zookeeper.ZooKeeperClientException import kafka.zookeeper.ZooKeeperClientException
@ -87,16 +88,20 @@ trait AlterPartitionListener {
def markFailed(): Unit def markFailed(): Unit
} }
class DelayedOperations(topicPartition: TopicPartition, class DelayedOperations(topicId: Option[Uuid],
topicPartition: TopicPartition,
produce: DelayedOperationPurgatory[DelayedProduce], produce: DelayedOperationPurgatory[DelayedProduce],
fetch: DelayedOperationPurgatory[DelayedFetch], fetch: DelayedOperationPurgatory[DelayedFetch],
deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords]) { deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords],
shareFetch: DelayedOperationPurgatory[DelayedShareFetch]) {
def checkAndCompleteAll(): Unit = { def checkAndCompleteAll(): Unit = {
val requestKey = TopicPartitionOperationKey(topicPartition) val requestKey = TopicPartitionOperationKey(topicPartition)
CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), fetch, Level.ERROR) CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), fetch, Level.ERROR)
CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), produce, Level.ERROR) CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), produce, Level.ERROR)
CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), deleteRecords, Level.ERROR) CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), deleteRecords, Level.ERROR)
if (topicId.isDefined) CoreUtils.swallow(() -> shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(
topicId.get, topicPartition.partition())), shareFetch, Level.ERROR)
} }
def numDelayedDelete: Int = deleteRecords.numDelayed def numDelayedDelete: Int = deleteRecords.numDelayed
@ -132,10 +137,12 @@ object Partition {
} }
val delayedOperations = new DelayedOperations( val delayedOperations = new DelayedOperations(
topicId,
topicPartition, topicPartition,
replicaManager.delayedProducePurgatory, replicaManager.delayedProducePurgatory,
replicaManager.delayedFetchPurgatory, replicaManager.delayedFetchPurgatory,
replicaManager.delayedDeleteRecordsPurgatory) replicaManager.delayedDeleteRecordsPurgatory,
replicaManager.delayedShareFetchPurgatory)
new Partition(topicPartition, new Partition(topicPartition,
_topicId = topicId, _topicId = topicId,

View File

@ -433,9 +433,7 @@ class BrokerServer(
config.shareGroupConfig.shareGroupRecordLockDurationMs, config.shareGroupConfig.shareGroupRecordLockDurationMs,
config.shareGroupConfig.shareGroupDeliveryCountLimit, config.shareGroupConfig.shareGroupDeliveryCountLimit,
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks, config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests,
persister, persister,
defaultActionQueue,
groupConfigManager, groupConfigManager,
new Metrics() new Metrics()
) )

View File

@ -25,6 +25,7 @@ import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache import kafka.server.metadata.ZkMetadataCache
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
@ -55,7 +56,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common import org.apache.kafka.server.common
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal} import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, TopicOptionalIdPartition}
import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
@ -286,6 +287,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None, delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None, delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
delayedShareFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
threadNamePrefix: Option[String] = None, threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1, val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None,
@ -315,6 +317,10 @@ class ReplicaManager(val config: KafkaConfig,
val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse( val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedRemoteListOffsets]( DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", brokerId = config.brokerId)) purgatoryName = "RemoteListOffsets", brokerId = config.brokerId))
val delayedShareFetchPurgatory = delayedShareFetchPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedShareFetch](
purgatoryName = "ShareFetch", brokerId = config.brokerId,
purgeInterval = config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))
/* epoch of the controller that last changed the leader */ /* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
@ -463,12 +469,14 @@ class ReplicaManager(val config: KafkaConfig,
}) })
} }
private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition): Unit = { private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition, topicId: Option[Uuid]): Unit = {
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition) val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey) delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey) delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey) delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey) delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey)
if (topicId.isDefined) delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchPartitionKey(topicId.get, topicPartition.partition()))
} }
/** /**
@ -480,6 +488,27 @@ class ReplicaManager(val config: KafkaConfig,
topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp))) topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
} }
/**
* Complete any delayed share fetch requests that have been unblocked since new data is available from the leader
* for one of the partitions. This could happen due to acknowledgements, acquisition lock timeout of records, partition
* locks getting freed and release of acquired records due to share session close.
* @param delayedShareFetchKey The key corresponding to which the share fetch request has been stored in the purgatory
*/
private[server] def completeDelayedShareFetchRequest(delayedShareFetchKey: DelayedShareFetchKey): Unit = {
delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey)
}
/**
* Add and watch a share fetch request in the delayed share fetch purgatory corresponding to a set of keys in case it cannot be
* completed instantaneously, otherwise complete it.
* @param delayedShareFetch Refers to the DelayedOperation over share fetch request
* @param delayedShareFetchKeys The keys corresponding to which the delayed share fetch request will be stored in the purgatory
*/
private[server] def addDelayedShareFetchRequest(delayedShareFetch: DelayedShareFetch,
delayedShareFetchKeys : Seq[DelayedShareFetchKey]): Unit = {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchKeys)
}
/** /**
* Registers the provided listener to the partition iff the partition is online. * Registers the provided listener to the partition iff the partition is online.
*/ */
@ -610,6 +639,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToDelete = mutable.Set.empty[TopicPartition] val partitionsToDelete = mutable.Set.empty[TopicPartition]
partitionsToStop.foreach { stopPartition => partitionsToStop.foreach { stopPartition =>
val topicPartition = stopPartition.topicPartition val topicPartition = stopPartition.topicPartition
var topicId: Option[Uuid] = None
if (stopPartition.deleteLocalLog) { if (stopPartition.deleteLocalLog) {
getPartition(topicPartition) match { getPartition(topicPartition) match {
case hostedPartition: HostedPartition.Online => case hostedPartition: HostedPartition.Online =>
@ -618,6 +648,7 @@ class ReplicaManager(val config: KafkaConfig,
// Logs are not deleted here. They are deleted in a single batch later on. // Logs are not deleted here. They are deleted in a single batch later on.
// This is done to avoid having to checkpoint for every deletions. // This is done to avoid having to checkpoint for every deletions.
hostedPartition.partition.delete() hostedPartition.partition.delete()
topicId = hostedPartition.partition.topicId
} }
case _ => case _ =>
@ -626,7 +657,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
// If we were the leader, we may have some operations still waiting for completion. // If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out. // We force completion to prevent them from timing out.
completeDelayedOperationsWhenNotPartitionLeader(topicPartition) completeDelayedOperationsWhenNotPartitionLeader(topicPartition, topicId)
} }
// Third delete the logs and checkpoint. // Third delete the logs and checkpoint.
@ -744,6 +775,8 @@ class ReplicaManager(val config: KafkaConfig,
def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
def addToActionQueue(action: () => Unit): Unit = defaultActionQueue.add(action)
/** /**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied; * the callback function will be triggered either when timeout or the required acks are satisfied;
@ -783,13 +816,15 @@ class ReplicaManager(val config: KafkaConfig,
} }
val sTime = time.milliseconds val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, val localProduceResultsWithTopicId = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap) origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val localProduceResults : Map[TopicPartition, LogAppendResult] = localProduceResultsWithTopicId.map {
case(k, v) => (k.topicPartition, v)}
val produceStatus = buildProducePartitionStatus(localProduceResults) val produceStatus = buildProducePartitionStatus(localProduceResults)
addCompletePurgatoryAction(actionQueue, localProduceResults) addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
recordValidationStatsCallback(localProduceResults.map { case (k, v) => recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
k -> v.info.recordValidationStats k -> v.info.recordValidationStats
}) })
@ -940,17 +975,19 @@ class ReplicaManager(val config: KafkaConfig,
private def addCompletePurgatoryAction( private def addCompletePurgatoryAction(
actionQueue: ActionQueue, actionQueue: ActionQueue,
appendResults: Map[TopicPartition, LogAppendResult] appendResults: Map[TopicOptionalIdPartition, LogAppendResult]
): Unit = { ): Unit = {
actionQueue.add { actionQueue.add {
() => appendResults.foreach { case (topicPartition, result) => () => appendResults.foreach { case (topicOptionalIdPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition) val requestKey = TopicPartitionOperationKey(topicOptionalIdPartition.topicPartition)
result.info.leaderHwChange match { result.info.leaderHwChange match {
case LeaderHwChange.INCREASED => case LeaderHwChange.INCREASED =>
// some delayed operations may be unblocked after HW changed // some delayed operations may be unblocked after HW changed
delayedProducePurgatory.checkAndComplete(requestKey) delayedProducePurgatory.checkAndComplete(requestKey)
delayedFetchPurgatory.checkAndComplete(requestKey) delayedFetchPurgatory.checkAndComplete(requestKey)
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
if (topicOptionalIdPartition.topicId.isPresent) delayedShareFetchPurgatory.checkAndComplete(new DelayedShareFetchPartitionKey(
topicOptionalIdPartition.topicId.get, topicOptionalIdPartition.partition))
case LeaderHwChange.SAME => case LeaderHwChange.SAME =>
// probably unblock some follower fetch requests since log end offset has been updated // probably unblock some follower fetch requests since log end offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey) delayedFetchPurgatory.checkAndComplete(requestKey)
@ -1392,7 +1429,8 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords], entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short, requiredAcks: Short,
requestLocal: RequestLocal, requestLocal: RequestLocal,
verificationGuards: Map[TopicPartition, VerificationGuard]): Map[TopicPartition, LogAppendResult] = { verificationGuards: Map[TopicPartition, VerificationGuard]):
Map[TopicOptionalIdPartition, LogAppendResult] = {
val traceEnabled = isTraceEnabled val traceEnabled = isTraceEnabled
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L) val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
@ -1417,7 +1455,7 @@ class ReplicaManager(val config: KafkaConfig,
// reject appending to internal topics if it is not allowed // reject appending to internal topics if it is not allowed
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult( (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")), Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),
hasCustomErrorMessage = false)) hasCustomErrorMessage = false))
@ -1438,7 +1476,10 @@ class ReplicaManager(val config: KafkaConfig,
trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " + trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
s"${info.firstOffset} and ending at offset ${info.lastOffset}") s"${info.firstOffset} and ending at offset ${info.lastOffset}")
(topicPartition, LogAppendResult(info, exception = None, hasCustomErrorMessage = false)) var topicId: Optional[Uuid] = Optional.empty()
if (partition.topicId.isDefined) topicId = Optional.of(partition.topicId.get)
(new TopicOptionalIdPartition(topicId, topicPartition), LogAppendResult(info, exception = None, hasCustomErrorMessage = false))
} catch { } catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions // NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request // it is supposed to indicate un-expected failures of a broker in handling a produce request
@ -1448,15 +1489,15 @@ class ReplicaManager(val config: KafkaConfig,
_: RecordBatchTooLargeException | _: RecordBatchTooLargeException |
_: CorruptRecordException | _: CorruptRecordException |
_: KafkaStorageException) => _: KafkaStorageException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false)) (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false))
case rve: RecordValidationException => case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
val recordErrors = rve.recordErrors val recordErrors = rve.recordErrors
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors), (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors),
Some(rve.invalidException), hasCustomErrorMessage = true)) Some(rve.invalidException), hasCustomErrorMessage = true))
case t: Throwable => case t: Throwable =>
val logStartOffset = processFailedRecord(topicPartition, t) val logStartOffset = processFailedRecord(topicPartition, t)
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
Some(t), hasCustomErrorMessage = false)) Some(t), hasCustomErrorMessage = false))
} }
} }
@ -2452,7 +2493,7 @@ class ReplicaManager(val config: KafkaConfig,
s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions") s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
partitionsToMakeFollower.foreach { partition => partitionsToMakeFollower.foreach { partition =>
completeDelayedOperationsWhenNotPartitionLeader(partition.topicPartition) completeDelayedOperationsWhenNotPartitionLeader(partition.topicPartition, partition.topicId)
} }
if (isShuttingDown.get()) { if (isShuttingDown.get()) {
@ -2676,6 +2717,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatory.shutdown() delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown()
delayedElectLeaderPurgatory.shutdown() delayedElectLeaderPurgatory.shutdown()
delayedShareFetchPurgatory.shutdown()
if (checkpointHW) if (checkpointHW)
checkpointHighWatermarks() checkpointHighWatermarks()
replicaSelectorOpt.foreach(_.close) replicaSelectorOpt.foreach(_.close)
@ -3029,7 +3071,8 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions") stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")
partitionsToStartFetching.keySet.foreach(completeDelayedOperationsWhenNotPartitionLeader) partitionsToStartFetching.foreach{ case (topicPartition, partition) =>
completeDelayedOperationsWhenNotPartitionLeader(topicPartition, partition.topicId)}
updateLeaderAndFollowerMetrics(followerTopicSet) updateLeaderAndFollowerMetrics(followerTopicSet)
} }

View File

@ -16,7 +16,6 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.server.DelayedActionQueue;
import kafka.server.DelayedOperationPurgatory; import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota; import kafka.server.ReplicaQuota;
@ -54,6 +53,7 @@ import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -330,6 +330,7 @@ public class DelayedShareFetchTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
Set<Object> delayedShareFetchWatchKeys = new HashSet<>(); Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
@ -357,16 +358,12 @@ public class DelayedShareFetchTest {
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue());
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>(); Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionManager sharePartitionManager2 = SharePartitionManagerTest.SharePartitionManagerBuilder SharePartitionManager sharePartitionManager2 = SharePartitionManagerTest.SharePartitionManagerBuilder
.builder() .builder()
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.withDelayedActionsQueue(delayedActionQueue)
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.build(); .build();
@ -390,8 +387,8 @@ public class DelayedShareFetchTest {
Mockito.verify(replicaManager, times(1)).readFromLog( Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean()); any(), any(), any(ReplicaQuota.class), anyBoolean());
assertFalse(delayedShareFetch1.isCompleted()); assertFalse(delayedShareFetch1.isCompleted());
Mockito.verify(delayedActionQueue, times(1)).add(any()); Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
Mockito.verify(delayedActionQueue, times(0)).tryCompleteActions(); Mockito.verify(replicaManager, times(0)).tryCompleteActions();
} }
static class DelayedShareFetchBuilder { static class DelayedShareFetchBuilder {

View File

@ -16,7 +16,6 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.server.DelayedActionQueue;
import kafka.server.DelayedOperationPurgatory; import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult; import kafka.server.LogReadResult;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
@ -1034,13 +1033,13 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTime(time) .withTime(time)
.withMetrics(metrics) .withMetrics(metrics)
.withTimer(mockTimer) .withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.build(); .build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -1096,12 +1095,12 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withTime(time) .withTime(time)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.build(); .build();
SharePartition sp0 = mock(SharePartition.class); SharePartition sp0 = mock(SharePartition.class);
@ -1192,12 +1191,13 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes);
@ -1223,11 +1223,11 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.build(); .build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -1667,12 +1667,12 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTime(time) .withTime(time)
.withTimer(mockTimer) .withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.withFetchQueue(fetchQueue).build(); .withFetchQueue(fetchQueue).build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -1724,6 +1724,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both sp1 and sp2. // Initially you cannot acquire records for both sp1 and sp2.
when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1736,7 +1737,6 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.build(); .build();
@ -1820,6 +1820,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both all 3 share partitions. // Initially you cannot acquire records for both all 3 share partitions.
when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1834,7 +1835,6 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.build(); .build();
@ -1913,6 +1913,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both sp1 and sp2. // Initially you cannot acquire records for both sp1 and sp2.
when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1926,7 +1927,6 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withCache(cache) .withCache(cache)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.build()); .build());
@ -2013,6 +2013,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both all 3 share partitions. // Initially you cannot acquire records for both all 3 share partitions.
when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -2028,7 +2029,6 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withCache(cache) .withCache(cache)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.build()); .build());
@ -2084,10 +2084,11 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes);
@ -2122,10 +2123,11 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer)
.withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); .build();
// Return LeaderNotAvailableException to simulate initialization failure. // Return LeaderNotAvailableException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new LeaderNotAvailableException("Leader not available"))); when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new LeaderNotAvailableException("Leader not available")));
@ -2289,6 +2291,21 @@ public class SharePartitionManagerTest {
return CollectionConverters.asScala(logReadResults).toSeq(); return CollectionConverters.asScala(logReadResults).toSeq();
} }
static void mockReplicaManagerDelayedShareFetch(ReplicaManager replicaManager,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
doAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
delayedShareFetchPurgatory.checkAndComplete(args[0]);
return null;
}).when(replicaManager).completeDelayedShareFetchRequest(any(DelayedShareFetchKey.class));
doAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedShareFetch) args[0], (Seq<Object>) args[1]);
return null;
}).when(replicaManager).addDelayedShareFetchRequest(any(), any());
}
static class SharePartitionManagerBuilder { static class SharePartitionManagerBuilder {
private ReplicaManager replicaManager = mock(ReplicaManager.class); private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime(); private Time time = new MockTime();
@ -2298,8 +2315,6 @@ public class SharePartitionManagerTest {
private Timer timer = new MockTimer(); private Timer timer = new MockTimer();
private Metrics metrics = new Metrics(); private Metrics metrics = new Metrics();
private ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new ConcurrentLinkedQueue<>(); private ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new ConcurrentLinkedQueue<>();
private DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class);
private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class);
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager; this.replicaManager = replicaManager;
@ -2341,16 +2356,6 @@ public class SharePartitionManagerTest {
return this; return this;
} }
SharePartitionManagerBuilder withDelayedShareFetchPurgatory(DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
return this;
}
SharePartitionManagerBuilder withDelayedActionsQueue(DelayedActionQueue delayedActionsQueue) {
this.delayedActionsQueue = delayedActionsQueue;
return this;
}
public static SharePartitionManagerBuilder builder() { public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder(); return new SharePartitionManagerBuilder();
} }
@ -2366,8 +2371,6 @@ public class SharePartitionManagerTest {
MAX_DELIVERY_COUNT, MAX_DELIVERY_COUNT,
MAX_IN_FLIGHT_MESSAGES, MAX_IN_FLIGHT_MESSAGES,
persister, persister,
delayedShareFetchPurgatory,
delayedActionsQueue,
mock(GroupConfigManager.class), mock(GroupConfigManager.class),
metrics); metrics);
} }

View File

@ -16,7 +16,7 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.RecordState;
import kafka.server.share.SharePartition.SharePartitionState; import kafka.server.share.SharePartition.SharePartitionState;
@ -4949,7 +4949,7 @@ public class SharePartitionTest {
private int maxDeliveryCount = MAX_DELIVERY_COUNT; private int maxDeliveryCount = MAX_DELIVERY_COUNT;
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
private Persister persister = NoOpShareStatePersister.getInstance(); private Persister persister = NoOpShareStatePersister.getInstance();
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = Mockito.mock(DelayedOperationPurgatory.class); private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) {
@ -4982,8 +4982,8 @@ public class SharePartitionTest {
} }
public SharePartition build() { public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, defaultAcquisitionLockTimeoutMs, return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount,
mockTimer, MOCK_TIME, persister, delayedShareFetchPurgatory, groupConfigManager); defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager);
} }
} }
} }

View File

@ -21,7 +21,7 @@ import kafka.server.MetadataCache
import kafka.server.metadata.MockConfigRepository import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.utils.TestUtils.MockAlterPartitionManager import kafka.utils.TestUtils.MockAlterPartitionManager
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
@ -49,6 +49,7 @@ class AbstractPartitionTest {
val brokerId = AbstractPartitionTest.brokerId val brokerId = AbstractPartitionTest.brokerId
val remoteReplicaId = brokerId + 1 val remoteReplicaId = brokerId + 1
val topicId : Option[Uuid] = Option(Uuid.randomUuid())
val topicPartition = new TopicPartition("test-topic", 0) val topicPartition = new TopicPartition("test-topic", 0)
val time = new MockTime() val time = new MockTime()
var tmpDir: File = _ var tmpDir: File = _

View File

@ -46,6 +46,7 @@ import java.nio.ByteBuffer
import java.util.Optional import java.util.Optional
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore} import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchPartitionKey}
import org.apache.kafka.clients.ClientResponse import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
@ -4110,7 +4111,11 @@ class PartitionTest extends AbstractPartitionTest {
val deleteRecords = mock(classOf[DelayedOperationPurgatory[DelayedDeleteRecords]]) val deleteRecords = mock(classOf[DelayedOperationPurgatory[DelayedDeleteRecords]])
when(deleteRecords.checkAndComplete(requestKey)).thenThrow(new RuntimeException("uh oh")) when(deleteRecords.checkAndComplete(requestKey)).thenThrow(new RuntimeException("uh oh"))
val delayedOperations = new DelayedOperations(topicPartition, produce, fetch, deleteRecords) val shareFetch = mock(classOf[DelayedOperationPurgatory[DelayedShareFetch]])
when(shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(topicId.get, topicPartition.partition())))
.thenThrow(new RuntimeException("uh oh"))
val delayedOperations = new DelayedOperations(topicId, topicPartition, produce, fetch, deleteRecords, shareFetch)
val spyLogManager = spy(logManager) val spyLogManager = spy(logManager)
val partition = new Partition(topicPartition, val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT, replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT,

View File

@ -26,6 +26,7 @@ import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.epoch.util.MockBlockingSender import kafka.server.epoch.util.MockBlockingSender
import kafka.server.share.DelayedShareFetch
import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{Pool, TestUtils} import kafka.utils.{Pool, TestUtils}
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
@ -3004,6 +3005,8 @@ class ReplicaManagerTest {
purgatoryName = "RemoteFetch", timer, reaperEnabled = false) purgatoryName = "RemoteFetch", timer, reaperEnabled = false)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch](
purgatoryName = "ShareFetch", timer, reaperEnabled = false)
// Mock network client to show leader offset of 5 // Mock network client to show leader offset of 5
val blockingSend = new MockBlockingSender( val blockingSend = new MockBlockingSender(
@ -3030,6 +3033,7 @@ class ReplicaManagerTest {
delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory), delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
threadNamePrefix = Option(this.getClass.getName)) { threadNamePrefix = Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager(metrics: Metrics, override protected def createReplicaFetcherManager(metrics: Metrics,
@ -3428,6 +3432,8 @@ class ReplicaManagerTest {
purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false) purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false)
val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch](
purgatoryName = "ShareFetch", timer, reaperEnabled = false)
when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true) when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true)
@ -3461,6 +3467,7 @@ class ReplicaManagerTest {
delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory), delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
threadNamePrefix = Option(this.getClass.getName), threadNamePrefix = Option(this.getClass.getName),
addPartitionsToTxnManager = Some(addPartitionsToTxnManager), addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler, directoryEventHandler = directoryEventHandler,

View File

@ -181,7 +181,7 @@ public class ReplicaFetcherThreadBenchmark {
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Optional.of(0L)); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Optional.of(0L));
AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class); AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class);
Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(), Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(),
0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp), 0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(topicId, tp),
Mockito.mock(MetadataCache.class), logManager, isrChannelManager, topicId); Mockito.mock(MetadataCache.class), logManager, isrChannelManager, topicId);
partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty()); partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty());
@ -277,8 +277,8 @@ public class ReplicaFetcherThreadBenchmark {
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
private static class DelayedOperationsMock extends DelayedOperations { private static class DelayedOperationsMock extends DelayedOperations {
DelayedOperationsMock(TopicPartition topicPartition) { DelayedOperationsMock(Option<Uuid> topicId, TopicPartition topicPartition) {
super(topicPartition, null, null, null); super(topicId, topicPartition, null, null, null, null);
} }
@Override @Override

View File

@ -139,7 +139,7 @@ public class UpdateFollowerFetchStateBenchmark {
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
private class DelayedOperationsMock extends DelayedOperations { private class DelayedOperationsMock extends DelayedOperations {
DelayedOperationsMock() { DelayedOperationsMock() {
super(topicPartition, null, null, null); super(topicId, topicPartition, null, null, null, null);
} }
@Override @Override

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
import java.util.Optional;
/**
* This represents universally unique identifier with topic id for a topic partition. However, for this wrapper, we can
* have an optional topic id with a not null topic partition to account for the functionalities that don't have topic id incorporated yet.
*/
public class TopicOptionalIdPartition {
private final Optional<Uuid> topicId;
private final TopicPartition topicPartition;
/**
* Create an instance with the provided parameters.
*
* @param topicId the topic id
* @param topicPartition the topic partition
*/
public TopicOptionalIdPartition(Optional<Uuid> topicId, TopicPartition topicPartition) {
this.topicId = topicId;
this.topicPartition = Objects.requireNonNull(topicPartition, "topicPartition can not be null");
}
/**
* @return Universally unique id representing this topic partition.
*/
public Optional<Uuid> topicId() {
return topicId;
}
/**
* @return the topic name.
*/
public String topic() {
return topicPartition.topic();
}
/**
* @return the partition id.
*/
public int partition() {
return topicPartition.partition();
}
/**
* @return Topic partition representing this instance.
*/
public TopicPartition topicPartition() {
return topicPartition;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TopicOptionalIdPartition that = (TopicOptionalIdPartition) o;
return topicId.equals(that.topicId) &&
topicPartition.equals(that.topicPartition);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 0;
if (topicId.isPresent()) {
result = prime + topicId.get().hashCode();
}
result = prime * result + topicPartition.hashCode();
return result;
}
@Override
public String toString() {
return topicId.map(uuid -> uuid + ":" + topic() + "-" + partition()).orElseGet(() -> "none" + ":" + topic() + "-" + partition());
}
}