KAFKA-17890: Move DelayedOperationPurgatory to server-common (#17636)

Reviewers: Jun Rao <jun@confluent.io>, Apoorv Mittal <amittal@confluent.io>
This commit is contained in:
Mickael Maison 2024-11-08 09:55:09 +01:00 committed by GitHub
parent 1792b19a05
commit 0049b967e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
48 changed files with 1501 additions and 1111 deletions

View File

@ -93,6 +93,11 @@
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>
<subpackage name="purgatory">
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage>
<subpackage name="util">
<!-- InterBrokerSendThread uses some clients classes that are not part of the public -->
<!-- API but are still relatively common -->

View File

@ -19,10 +19,8 @@ package kafka.log.remote;
import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.StopPartition;
import kafka.server.TopicPartitionOperationKey;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
@ -68,6 +66,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.purgatory.TopicPartitionOperationKey;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;

View File

@ -24,7 +24,6 @@ import kafka.server.AlterPartitionManager;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
import kafka.server.DelayedRemoteListOffsets;
@ -39,6 +38,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

View File

@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.DelayedOperation;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@ -25,6 +24,7 @@ import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
@ -42,7 +42,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
@ -70,7 +69,7 @@ public class DelayedShareFetch extends DelayedOperation {
ReplicaManager replicaManager,
SharePartitionManager sharePartitionManager,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
super(shareFetchData.fetchParams().maxWaitMs, Optional.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();

View File

@ -16,8 +16,6 @@
*/
package kafka.server.share;
import kafka.server.DelayedOperationKey;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@ -25,7 +23,7 @@ import java.util.Objects;
/**
* A key for delayed share fetch purgatory that refers to the share partition.
*/
public class DelayedShareFetchGroupKey implements DelayedShareFetchKey, DelayedOperationKey {
public class DelayedShareFetchGroupKey implements DelayedShareFetchKey {
private final String groupId;
private final Uuid topicId;
private final int partition;

View File

@ -16,8 +16,10 @@
*/
package kafka.server.share;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
/**
* A key for delayed operations that fetch data for share consumers.
*/
public interface DelayedShareFetchKey {
public interface DelayedShareFetchKey extends DelayedOperationKey {
}

View File

@ -16,8 +16,6 @@
*/
package kafka.server.share;
import kafka.server.DelayedOperationKey;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@ -25,7 +23,7 @@ import java.util.Objects;
/**
* A key for delayed share fetch purgatory that refers to the topic partition.
*/
public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey, DelayedOperationKey {
public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey {
private final Uuid topicId;
private final int partition;

View File

@ -64,7 +64,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -74,8 +73,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import scala.jdk.javaapi.CollectionConverters;
/**
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
* It is responsible for fetching messages from the log and acknowledging the messages.
@ -546,9 +543,8 @@ public class SharePartitionManager implements AutoCloseable {
// Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be
// completed else watch until it can be completed/timeout.
private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<DelayedShareFetchKey> keys) {
replicaManager.addDelayedShareFetchRequest(delayedShareFetch,
CollectionConverters.asScala(keys).toSeq());
private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, List<DelayedShareFetchKey> keys) {
replicaManager.addDelayedShareFetchRequest(delayedShareFetch, keys);
}
@Override
@ -574,7 +570,7 @@ public class SharePartitionManager implements AutoCloseable {
// Initialize lazily, if required.
Map<TopicIdPartition, Throwable> erroneous = null;
Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
List<DelayedShareFetchKey> delayedShareFetchWatchKeys = new ArrayList<>();
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) {
SharePartitionKey sharePartitionKey = sharePartitionKey(

View File

@ -45,6 +45,7 @@ import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level
@ -93,18 +94,18 @@ class DelayedOperations(topicId: Option[Uuid],
produce: DelayedOperationPurgatory[DelayedProduce],
fetch: DelayedOperationPurgatory[DelayedFetch],
deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords],
shareFetch: DelayedOperationPurgatory[DelayedShareFetch]) {
shareFetch: DelayedOperationPurgatory[DelayedShareFetch]) extends Logging {
def checkAndCompleteAll(): Unit = {
val requestKey = TopicPartitionOperationKey(topicPartition)
CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), fetch, Level.ERROR)
CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), produce, Level.ERROR)
CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), deleteRecords, Level.ERROR)
val requestKey = new TopicPartitionOperationKey(topicPartition)
CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), this, Level.ERROR)
CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), this, Level.ERROR)
CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), this, Level.ERROR)
if (topicId.isDefined) CoreUtils.swallow(() -> shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(
topicId.get, topicPartition.partition())), shareFetch, Level.ERROR)
topicId.get, topicPartition.partition())), this, Level.ERROR)
}
def numDelayedDelete: Int = deleteRecords.numDelayed
def numDelayedDelete: Int = deleteRecords.numDelayed()
}
object Partition {

View File

@ -17,7 +17,7 @@
package kafka.coordinator.group
import kafka.server.DelayedOperation
import org.apache.kafka.server.purgatory.DelayedOperation
/**
* Delayed heartbeat operations that are added to the purgatory for session timeout checking.
@ -28,7 +28,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
memberId: String,
isPending: Boolean,
timeoutMs: Long)
extends DelayedOperation(timeoutMs, Some(group.lock)) {
extends DelayedOperation(timeoutMs, group.lock) {
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, forceComplete _)
override def onExpiration(): Unit = coordinator.onExpireHeartbeat(group, memberId, isPending)

View File

@ -17,8 +17,8 @@
package kafka.coordinator.group
import kafka.server.{DelayedOperationPurgatory, GroupJoinKey}
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKey}
import java.util
import scala.math.{max, min}
/**
@ -85,7 +85,7 @@ private[group] class InitialDelayedJoin(
configuredRebalanceDelay,
delay,
remaining
), Seq(GroupJoinKey(group.groupId)))
), util.List.of(new GroupJoinKey(group.groupId)))
} else
super.onComplete()
}

View File

@ -17,8 +17,7 @@
package kafka.coordinator.group
import kafka.server.DelayedOperation
import org.apache.kafka.server.purgatory.DelayedOperation
import java.util.concurrent.locks.Lock
/**
@ -30,5 +29,5 @@ private[group] abstract class DelayedRebalance(
groupLock: Lock
) extends DelayedOperation(
rebalanceTimeoutMs,
Some(groupLock)
groupLock
)

View File

@ -16,6 +16,7 @@
*/
package kafka.coordinator.group
import java.util
import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.server._
@ -33,6 +34,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.{Group, OffsetAndMetadata, OffsetConfig}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKey, GroupSyncKey, MemberKey}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
@ -226,7 +228,7 @@ private[group] class GroupCoordinator(
// attempt to complete JoinGroup
if (group.is(PreparingRebalance)) {
rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
}
}
}
@ -690,7 +692,7 @@ private[group] class GroupCoordinator(
}
} else if (group.isPendingMember(memberId)) {
removePendingMemberAndUpdateGroup(group, memberId)
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
heartbeatPurgatory.checkAndComplete(new MemberKey(group.groupId, memberId))
info(s"Pending member with memberId=$memberId has left group ${group.groupId} " +
s"through explicit `LeaveGroup` request")
memberLeaveError(leavingMember, Errors.NONE)
@ -1197,12 +1199,12 @@ private[group] class GroupCoordinator(
group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NOT_COORDINATOR))
}
rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
case Stable | CompletingRebalance =>
for (member <- group.allMemberMetadata) {
group.maybeInvokeSyncCallback(member, SyncGroupResult(Errors.NOT_COORDINATOR))
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, member.memberId))
heartbeatPurgatory.checkAndComplete(new MemberKey(group.groupId, member.memberId))
}
}
@ -1283,7 +1285,7 @@ private[group] class GroupCoordinator(
}
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
val memberKey = MemberKey(group.groupId, member.memberId)
val memberKey = new MemberKey(group.groupId, member.memberId)
// complete current heartbeat expectation
member.heartbeatSatisfied = true
@ -1292,20 +1294,20 @@ private[group] class GroupCoordinator(
// reschedule the next heartbeat expiration deadline
member.heartbeatSatisfied = false
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, util.Collections.singletonList(memberKey))
}
/**
* Add pending member expiration to heartbeat purgatory
*/
private def addPendingMemberExpiration(group: GroupMetadata, pendingMemberId: String, timeoutMs: Long): Unit = {
val pendingMemberKey = MemberKey(group.groupId, pendingMemberId)
val pendingMemberKey = new MemberKey(group.groupId, pendingMemberId)
val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, timeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey))
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, util.Collections.singletonList(pendingMemberKey))
}
private def removeHeartbeatForLeavingMember(group: GroupMetadata, memberId: String): Unit = {
val memberKey = MemberKey(group.groupId, memberId)
val memberKey = new MemberKey(group.groupId, memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
}
@ -1498,8 +1500,8 @@ private[group] class GroupCoordinator(
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")
val groupKey = GroupJoinKey(group.groupId)
rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
val groupKey = new GroupJoinKey(group.groupId)
rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, util.Collections.singletonList(groupKey))
}
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
@ -1512,7 +1514,7 @@ private[group] class GroupCoordinator(
group.currentState match {
case Dead | Empty =>
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
case PreparingRebalance => rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
case PreparingRebalance => rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
}
}
@ -1520,7 +1522,7 @@ private[group] class GroupCoordinator(
group.remove(memberId)
if (group.is(PreparingRebalance)) {
rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
}
}
@ -1554,7 +1556,7 @@ private[group] class GroupCoordinator(
error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
rebalancePurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
Seq(GroupJoinKey(group.groupId)))
util.Collections.singletonList(new GroupJoinKey(group.groupId)))
} else {
group.initNextGeneration()
if (group.is(Empty)) {
@ -1620,7 +1622,7 @@ private[group] class GroupCoordinator(
private def maybeCompleteSyncExpiration(
group: GroupMetadata
): Unit = {
val groupKey = GroupSyncKey(group.groupId)
val groupKey = new GroupSyncKey(group.groupId)
rebalancePurgatory.checkAndComplete(groupKey)
}
@ -1628,8 +1630,8 @@ private[group] class GroupCoordinator(
group: GroupMetadata
): Unit = {
val delayedSync = new DelayedSync(this, group, group.generationId, group.rebalanceTimeoutMs)
val groupKey = GroupSyncKey(group.groupId)
rebalancePurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
val groupKey = new GroupSyncKey(group.groupId)
rebalancePurgatory.tryCompleteElseWatch(delayedSync, util.Collections.singletonList(groupKey))
}
def tryCompletePendingSync(
@ -1763,8 +1765,8 @@ object GroupCoordinator {
time: Time,
metrics: Metrics
): GroupCoordinator = {
val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val rebalancePurgatory = DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}

View File

@ -17,9 +17,11 @@
package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
@ -49,7 +51,7 @@ class DelayedCreatePartitions(delayMs: Long,
createMetadata: Seq[CreatePartitionsMetadata],
adminManager: ZkAdminManager,
responseCallback: Map[String, ApiError] => Unit)
extends DelayedOperation(delayMs) {
extends DelayedOperation(delayMs) with Logging {
/**
* The operation can be completed if all of the topics that do not have an error exist and every partition has a

View File

@ -18,13 +18,15 @@
package kafka.server
import java.util.concurrent.TimeUnit
import kafka.utils.Logging
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.DeleteRecordsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
@ -45,7 +47,7 @@ class DelayedDeleteRecords(delayMs: Long,
deleteRecordsStatus: Map[TopicPartition, DeleteRecordsPartitionStatus],
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult] => Unit)
extends DelayedOperation(delayMs) {
extends DelayedOperation(delayMs) with Logging {
// first update the acks pending variable according to the error code
deleteRecordsStatus.foreachEntry { (topicPartition, status) =>

View File

@ -17,7 +17,9 @@
package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
@ -40,7 +42,7 @@ class DelayedDeleteTopics(delayMs: Long,
deleteMetadata: Seq[DeleteTopicMetadata],
adminManager: ZkAdminManager,
responseCallback: Map[String, Errors] => Unit)
extends DelayedOperation(delayMs) {
extends DelayedOperation(delayMs) with Logging {
/**
* The operation can be completed if all of the topics not in error have been removed

View File

@ -17,9 +17,11 @@
package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection.{Map, mutable}
@ -32,7 +34,7 @@ class DelayedElectLeader(
results: Map[TopicPartition, ApiError],
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, ApiError] => Unit
) extends DelayedOperation(delayMs) {
) extends DelayedOperation(delayMs) with Logging {
private val waitingPartitions = mutable.Map() ++= expectedLeaders
private val fullResults = mutable.Map() ++= results

View File

@ -18,6 +18,7 @@
package kafka.server
import com.yammer.metrics.core.Meter
import kafka.utils.Logging
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicIdPartition
@ -26,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
@ -51,7 +53,7 @@ class DelayedFetch(
replicaManager: ReplicaManager,
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
) extends DelayedOperation(params.maxWaitMs) {
) extends DelayedOperation(params.maxWaitMs) with Logging {
override def toString: String = {
s"DelayedFetch(params=$params" +

View File

@ -17,11 +17,14 @@
package kafka.server
import kafka.utils.Logging
import java.util
import java.util.concurrent._
import java.util.function.BiConsumer
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.KafkaThread
import org.apache.kafka.server.purgatory.{DelayedOperation, DelayedOperationKey, DelayedOperationPurgatory}
import scala.collection.Seq
@ -32,7 +35,7 @@ import scala.collection.Seq
class DelayedFuture[T](timeoutMs: Long,
futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit)
extends DelayedOperation(timeoutMs) {
extends DelayedOperation(timeoutMs) with Logging {
/**
* The operation can be completed if all the futures have completed successfully
@ -70,19 +73,21 @@ class DelayedFuture[T](timeoutMs: Long,
}
class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) {
private val purgatory = DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
private val purgatory = new DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
private val executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](),
new ThreadFactory {
override def newThread(r: Runnable): Thread = new KafkaThread(s"DelayedExecutor-$purgatoryName", r, true)
})
private val purgatoryKey = new Object
private val purgatoryKey = new DelayedOperationKey() {
override def keyLabel(): String = "delayed-future-key"
}
def tryCompleteElseWatch[T](timeoutMs: Long,
futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit): DelayedFuture[T] = {
val delayedFuture = new DelayedFuture[T](timeoutMs, futures, responseCallback)
val done = purgatory.tryCompleteElseWatch(delayedFuture, Seq(purgatoryKey))
val done = purgatory.tryCompleteElseWatch(delayedFuture, util.Collections.singletonList(purgatoryKey))
if (!done) {
val callbackAction = new BiConsumer[Void, Throwable]() {
override def accept(result: Void, exception: Throwable): Unit = delayedFuture.forceComplete()

View File

@ -1,452 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.concurrent.locks.{Lock, ReentrantLock}
import kafka.utils.CoreUtils.inLock
import kafka.utils._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.server.util.timer.{SystemTimer, Timer, TimerTask}
import scala.collection._
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
/**
* An operation whose processing needs to be delayed for at most the given delayMs. For example
* a delayed produce operation could be waiting for specified number of acks; or
* a delayed fetch operation could be waiting for a given number of bytes to accumulate.
*
* The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
* Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
* forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
* or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
* forceComplete().
*
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*
* Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete()
* like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction().
*/
abstract class DelayedOperation(delayMs: Long,
lockOpt: Option[Lock] = None)
extends TimerTask(delayMs) with Logging {
private val completed = new AtomicBoolean(false)
// Visible for testing
private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
/*
* Force completing the delayed operation, if not already completed.
* This function can be triggered when
*
* 1. The operation has been verified to be completable inside tryComplete()
* 2. The operation has expired and hence needs to be completed right now
*
* Return true iff the operation is completed by the caller: note that
* concurrent threads can try to complete the same operation, but only
* the first thread will succeed in completing the operation and return
* true, others will still return false
*/
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel()
onComplete()
true
} else {
false
}
}
/**
* Check if the delayed operation is already completed
*/
def isCompleted: Boolean = completed.get()
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
def onExpiration(): Unit
/**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
def onComplete(): Unit
/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* This function needs to be defined in subclasses
*/
def tryComplete(): Boolean
/**
* Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
* @param f else function to be executed after first tryComplete returns false
* @return result of tryComplete
*/
private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
if (tryComplete()) true
else {
f
// last completion check
tryComplete()
}
}
/**
* Thread-safe variant of tryComplete()
*/
private[server] def safeTryComplete(): Boolean = inLock(lock) {
if (isCompleted)
false
else
tryComplete()
}
/*
* run() method defines a task that is executed on timeout
*/
override def run(): Unit = {
if (forceComplete())
onExpiration()
}
}
object DelayedOperationPurgatory {
private val Shards = 512 // Shard the watcher list to reduce lock contention
def apply[T <: DelayedOperation](purgatoryName: String,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
val timer = new SystemTimer(purgatoryName)
new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled)
}
}
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true)
extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
/* a list of operation watching keys */
private class WatcherList {
val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
val watchersLock = new ReentrantLock()
/*
* Return all the current watcher lists,
* note that the returned watchers may be removed from the list by other threads
*/
def allWatchers: Iterable[Watchers] = {
watchersByKey.values
}
}
private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)
private def watcherList(key: Any): WatcherList = {
watcherLists(Math.abs(key.hashCode() % watcherLists.length))
}
// the number of estimated total operations in the purgatory
private[this] val estimatedTotalOperations = new AtomicInteger(0)
/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper()
private val metricsTags = Map("delayedOperation" -> purgatoryName).asJava
metricsGroup.newGauge("PurgatorySize", () => watched, metricsTags)
metricsGroup.newGauge("NumDelayedOperations", () => numDelayed, metricsTags)
if (reaperEnabled)
expirationReaper.start()
/**
* Check if the operation can be completed, if not watch it based on the given watch keys
*
* Note that a delayed operation can be watched on multiple keys. It is possible that
* an operation is completed after it has been added to the watch list for some, but
* not all of the keys. In this case, the operation is considered completed and won't
* be added to the watch list of the remaining keys. The expiration reaper thread will
* remove this operation from any watcher list in which the operation exists.
*
* @param operation the delayed operation to be checked
* @param watchKeys keys for bookkeeping the operation
* @return true iff the delayed operations can be completed by the caller
*/
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")
// The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is
// going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse().
// If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At
// this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering
// event since the operation is already on the watcher list for all keys.
//
// ==============[story about lock]==============
// Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing
// the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and
// checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete()
// 1) thread_a holds readlock of stateLock from TransactionStateManager
// 2) thread_a is executing tryCompleteElseWatch()
// 3) thread_a adds op to watch list
// 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a)
// 5) thread_c calls checkAndComplete() and holds lock of op
// 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b)
// 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c)
//
// Note that even with the current approach, deadlocks could still be introduced. For example,
// 1) thread_a calls tryCompleteElseWatch() and gets lock of op
// 2) thread_a adds op to watch list
// 3) thread_a calls op#tryComplete and tries to require lock_b
// 4) thread_b holds lock_b and calls checkAndComplete()
// 5) thread_b sees op from watch list
// 6) thread_b needs lock of op
// To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding
// any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
// holding a exclusive lock to make the call is often unnecessary.
if (operation.safeTryCompleteOrElse {
watchKeys.foreach(key => watchForOperation(key, operation))
if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
}) return true
// if it cannot be completed by now and hence is watched, add to the expire queue also
if (!operation.isCompleted) {
if (timerEnabled)
timeoutTimer.add(operation)
if (operation.isCompleted) {
// cancel the timer task
operation.cancel()
}
}
false
}
/**
* Check if some delayed operations can be completed with the given watch key,
* and if yes complete them.
*
* @return the number of completed operations during this process
*/
def checkAndComplete(key: Any): Int = {
val wl = watcherList(key)
val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
val numCompleted = if (watchers == null)
0
else
watchers.tryCompleteWatched()
if (numCompleted > 0) {
debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
}
numCompleted
}
/**
* Return the total size of watch lists the purgatory. Since an operation may be watched
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/
def watched: Int = {
watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.countWatched).sum }
}
/**
* Return the number of delayed operations in the expiry queue
*/
def numDelayed: Int = timeoutTimer.size
/**
* Cancel watching on any delayed operations for the given key. Note the operation will not be completed
*/
def cancelForKey(key: Any): List[T] = {
val wl = watcherList(key)
inLock(wl.watchersLock) {
val watchers = wl.watchersByKey.remove(key)
if (watchers != null)
watchers.cancel()
else
Nil
}
}
/*
* Return the watch list of the given key, note that we need to
* grab the removeWatchersLock to avoid the operation being added to a removed watcher list
*/
private def watchForOperation(key: Any, operation: T): Unit = {
val wl = watcherList(key)
inLock(wl.watchersLock) {
val watcher = wl.watchersByKey.getAndMaybePut(key)
watcher.watch(operation)
}
}
/*
* Remove the key from watcher lists if its list is empty
*/
private def removeKeyIfEmpty(key: Any, watchers: Watchers): Unit = {
val wl = watcherList(key)
inLock(wl.watchersLock) {
// if the current key is no longer correlated to the watchers to remove, skip
if (wl.watchersByKey.get(key) != watchers)
return
if (watchers != null && watchers.isEmpty) {
wl.watchersByKey.remove(key)
}
}
}
/**
* Shutdown the expire reaper thread
*/
def shutdown(): Unit = {
if (reaperEnabled) {
expirationReaper.initiateShutdown()
// improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op
timeoutTimer.add(new TimerTask(0) {
override def run(): Unit = {}
})
expirationReaper.awaitShutdown()
}
timeoutTimer.close()
metricsGroup.removeMetric("PurgatorySize", metricsTags)
metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
}
/**
* A linked list of watched delayed operations based on some key
*/
private class Watchers(val key: Any) {
private[this] val operations = new ConcurrentLinkedQueue[T]()
// count the current number of watched operations. This is O(n), so use isEmpty() if possible
def countWatched: Int = operations.size
def isEmpty: Boolean = operations.isEmpty
// add the element to watch
def watch(t: T): Unit = {
operations.add(t)
}
// traverse the list and try to complete some watched elements
def tryCompleteWatched(): Int = {
var completed = 0
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
// another thread has completed this operation, just remove it
iter.remove()
} else if (curr.safeTryComplete()) {
iter.remove()
completed += 1
}
}
if (operations.isEmpty)
removeKeyIfEmpty(key, this)
completed
}
def cancel(): List[T] = {
val iter = operations.iterator()
val cancelled = new ListBuffer[T]()
while (iter.hasNext) {
val curr = iter.next()
curr.cancel()
iter.remove()
cancelled += curr
}
cancelled.toList
}
// traverse the list and purge elements that are already completed by others
def purgeCompleted(): Int = {
var purged = 0
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
iter.remove()
purged += 1
}
}
if (operations.isEmpty)
removeKeyIfEmpty(key, this)
purged
}
}
def advanceClock(timeoutMs: Long): Unit = {
timeoutTimer.advanceClock(timeoutMs)
// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(numDelayed)
debug("Begin purging watch lists")
val purged = watcherLists.foldLeft(0) {
case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
}
debug("Purged %d elements from watch lists.".format(purged))
}
}
/**
* A background reaper to expire delayed operations that have timed out
*/
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {
override def doWork(): Unit = {
advanceClock(200L)
}
}
}

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
/**
* Keys used for delayed operation metrics recording
*/
trait DelayedOperationKey {
def keyLabel: String
}
/* used by delayed-produce and delayed-fetch operations */
case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
override def keyLabel: String = "%s-%d".format(topic, partition)
}
object TopicPartitionOperationKey {
def apply(topicPartition: TopicPartition): TopicPartitionOperationKey = {
apply(topicPartition.topic, topicPartition.partition)
}
def apply(topicIdPartition: TopicIdPartition): TopicPartitionOperationKey = {
apply(topicIdPartition.topic, topicIdPartition.partition)
}
}
/* used by delayed-join-group operations */
case class MemberKey(groupId: String, consumerId: String) extends DelayedOperationKey {
override def keyLabel: String = "%s-%s".format(groupId, consumerId)
}
/* used by delayed-join operations */
case class GroupJoinKey(groupId: String) extends DelayedOperationKey {
override def keyLabel: String = "join-%s".format(groupId)
}
/* used by delayed-sync operations */
case class GroupSyncKey(groupId: String) extends DelayedOperationKey {
override def keyLabel: String = "sync-%s".format(groupId)
}
/* used by delayed-topic operations */
case class TopicKey(topic: String) extends DelayedOperationKey {
override def keyLabel: String = topic
}

View File

@ -21,14 +21,16 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
import kafka.utils.Pool
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
@ -58,8 +60,8 @@ class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock] = None)
extends DelayedOperation(delayMs, lockOpt) {
lockOpt: Option[Lock])
extends DelayedOperation(delayMs, lockOpt.toJava) with Logging {
override lazy val logger: Logger = DelayedProduce.logger

View File

@ -18,10 +18,12 @@
package kafka.server
import com.yammer.metrics.core.Meter
import kafka.utils.Logging
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
@ -42,7 +44,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(remoteFetchMaxWaitMs) {
extends DelayedOperation(remoteFetchMaxWaitMs) with Logging {
if (fetchParams.isFromFollower) {
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")

View File

@ -17,13 +17,14 @@
package kafka.server
import com.yammer.metrics.core.Meter
import kafka.utils.Pool
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable}
@ -33,7 +34,8 @@ class DelayedRemoteListOffsets(delayMs: Long,
version: Int,
statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus],
replicaManager: ReplicaManager,
responseCallback: List[ListOffsetsTopicResponse] => Unit) extends DelayedOperation(delayMs) {
responseCallback: List[ListOffsetsTopicResponse] => Unit)
extends DelayedOperation(delayMs) with Logging {
// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
statusByPartition.foreachEntry { (topicPartition, status) =>

View File

@ -73,6 +73,7 @@ import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.purgatory.TopicPartitionOperationKey
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.ErroneousAndValidPartitionData
@ -417,7 +418,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (replicaManager.hasDelayedElectionOperations) {
updateMetadataRequest.partitionStates.forEach { partitionState =>
val tp = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp))
replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp))
}
}
requestHelper.sendResponseExemptThrottle(request, new UpdateMetadataResponse(

View File

@ -60,6 +60,7 @@ import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, Topi
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
@ -298,30 +299,30 @@ class ReplicaManager(val config: KafkaConfig,
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", brokerId = config.brokerId,
purgeInterval = config.producerPurgatoryPurgeIntervalRequests))
new DelayedOperationPurgatory[DelayedProduce](
"Produce", config.brokerId,
config.producerPurgatoryPurgeIntervalRequests))
val delayedFetchPurgatory = delayedFetchPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", brokerId = config.brokerId,
purgeInterval = config.fetchPurgatoryPurgeIntervalRequests))
new DelayedOperationPurgatory[DelayedFetch](
"Fetch", config.brokerId,
config.fetchPurgatoryPurgeIntervalRequests))
val delayedDeleteRecordsPurgatory = delayedDeleteRecordsPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", brokerId = config.brokerId,
purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests))
new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", config.brokerId,
config.deleteRecordsPurgatoryPurgeIntervalRequests))
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", brokerId = config.brokerId))
new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", config.brokerId))
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", brokerId = config.brokerId))
new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", config.brokerId))
val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", brokerId = config.brokerId))
new DelayedOperationPurgatory[DelayedRemoteListOffsets](
"RemoteListOffsets", config.brokerId))
val delayedShareFetchPurgatory = delayedShareFetchPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedShareFetch](
purgatoryName = "ShareFetch", brokerId = config.brokerId,
purgeInterval = config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))
new DelayedOperationPurgatory[DelayedShareFetch](
"ShareFetch", config.brokerId,
config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))
/* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
@ -471,7 +472,7 @@ class ReplicaManager(val config: KafkaConfig,
}
private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition, topicId: Option[Uuid]): Unit = {
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
val topicPartitionOperationKey = new TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
@ -486,7 +487,7 @@ class ReplicaManager(val config: KafkaConfig,
* after successfully replicating from the leader.
*/
private[server] def completeDelayedFetchRequests(topicPartitions: Seq[TopicPartition]): Unit = {
topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(new TopicPartitionOperationKey(tp)))
}
/**
@ -506,7 +507,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param delayedShareFetchKeys The keys corresponding to which the delayed share fetch request will be stored in the purgatory
*/
private[server] def addDelayedShareFetchRequest(delayedShareFetch: DelayedShareFetch,
delayedShareFetchKeys : Seq[DelayedShareFetchKey]): Unit = {
delayedShareFetchKeys : util.List[DelayedShareFetchKey]): Unit = {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchKeys)
}
@ -980,7 +981,7 @@ class ReplicaManager(val config: KafkaConfig,
): Unit = {
actionQueue.add {
() => appendResults.foreach { case (topicOptionalIdPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicOptionalIdPartition.topicPartition)
val requestKey = new TopicPartitionOperationKey(topicOptionalIdPartition.topicPartition)
result.info.leaderHwChange match {
case LeaderHwChange.INCREASED =>
// some delayed operations may be unblocked after HW changed
@ -1014,12 +1015,12 @@ class ReplicaManager(val config: KafkaConfig,
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.asJava)
} else {
// we can respond immediately
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }
@ -1392,12 +1393,12 @@ class ReplicaManager(val config: KafkaConfig,
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
val deleteRecordsRequestKeys = offsetPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed delete records operation is being created, new
// requests may arrive and hence make this operation completable.
delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys.asJava)
} else {
// we can respond immediately
val deleteRecordsResponseStatus = deleteRecordsStatus.map { case (k, status) => k -> status.responseStatus }
@ -1601,9 +1602,9 @@ class ReplicaManager(val config: KafkaConfig,
// create delayed remote list offsets operation
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, statusByPartition, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the purgatory
delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys.asJava)
} else {
// we can respond immediately
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
@ -1663,7 +1664,7 @@ class ReplicaManager(val config: KafkaConfig,
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, util.Collections.singletonList(key))
None
}
@ -1765,12 +1766,12 @@ class ReplicaManager(val config: KafkaConfig,
)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys.asJava)
}
}
}
@ -2812,8 +2813,8 @@ class ReplicaManager(val config: KafkaConfig,
}
if (expectedLeaders.nonEmpty) {
val watchKeys = expectedLeaders.iterator.map {
case (tp, _) => TopicPartitionOperationKey(tp)
}.toBuffer
case (tp, _) => new TopicPartitionOperationKey(tp)
}.toList.asJava
delayedElectLeaderPurgatory.tryCompleteElseWatch(
new DelayedElectLeader(

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.server.purgatory.DelayedOperationKey
/* used by delayed-topic operations */
case class TopicKey(topic: String) extends DelayedOperationKey {
override def keyLabel: String = topic
}

View File

@ -49,6 +49,7 @@ import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigType, QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
import org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG
import org.apache.kafka.server.purgatory.{DelayedOperation, DelayedOperationPurgatory}
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.{Map, mutable, _}
@ -74,7 +75,7 @@ class ZkAdminManager(val config: KafkaConfig,
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val topicPurgatory = new DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val adminZkClient = new AdminZkClient(zkClient, Some(config))
private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient))
@ -250,9 +251,9 @@ class ZkAdminManager(val config: KafkaConfig,
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this,
responseCallback)
val delayedCreateKeys = toCreate.values.map(topic => TopicKey(topic.name)).toBuffer
val delayedCreateKeys = toCreate.values.map(topic => TopicKey(topic.name)).toList
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys.asJava)
}
}
@ -297,9 +298,9 @@ class ZkAdminManager(val config: KafkaConfig,
} else {
// 3. else pass the topics and errors to the delayed operation and set the keys
val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
val delayedDeleteKeys = topics.map(TopicKey).toSeq
val delayedDeleteKeys = topics.map(TopicKey).toList
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys.asJava)
}
}
@ -393,9 +394,9 @@ class ZkAdminManager(val config: KafkaConfig,
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeoutMs, metadata, this, callback)
val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name))
val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name)).toList
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys.asJava)
}
}

View File

@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
@ -28,6 +27,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
@ -44,17 +45,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
@ -447,7 +446,7 @@ public class DelayedShareFetchTest {
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
@ -458,7 +457,7 @@ public class DelayedShareFetchTest {
// We add a delayed share fetch entry to the purgatory which will be waiting for completion since neither of the
// partitions in the share fetch request can be acquired.
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, delayedShareFetchWatchKeys);
assertEquals(2, delayedShareFetchPurgatory.watched());
assertFalse(shareFetchData1.future().isDone());

View File

@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
@ -52,6 +51,8 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.SharePartitionKey;
@ -1674,7 +1675,7 @@ public class SharePartitionManagerTest {
when(sp2.maybeAcquireFetchLock()).thenReturn(true);
when(sp2.canAcquireRecords()).thenReturn(false);
Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
@ -1693,8 +1694,7 @@ public class SharePartitionManagerTest {
.withSharePartitions(sharePartitions)
.build();
delayedShareFetchPurgatory.tryCompleteElseWatch(
delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchWatchKeys);
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
@ -1775,7 +1775,7 @@ public class SharePartitionManagerTest {
when(sp3.maybeAcquireFetchLock()).thenReturn(true);
when(sp3.canAcquireRecords()).thenReturn(false);
Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
@ -1795,8 +1795,7 @@ public class SharePartitionManagerTest {
.withSharePartitions(sharePartitions)
.build();
delayedShareFetchPurgatory.tryCompleteElseWatch(
delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchWatchKeys);
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
@ -1872,7 +1871,7 @@ public class SharePartitionManagerTest {
when(sp2.maybeAcquireFetchLock()).thenReturn(true);
when(sp2.canAcquireRecords()).thenReturn(false);
Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
@ -1892,8 +1891,7 @@ public class SharePartitionManagerTest {
.withSharePartitions(sharePartitions)
.build();
delayedShareFetchPurgatory.tryCompleteElseWatch(
delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchWatchKeys);
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
@ -1977,7 +1975,7 @@ public class SharePartitionManagerTest {
when(sp3.maybeAcquireFetchLock()).thenReturn(true);
when(sp3.canAcquireRecords()).thenReturn(false);
Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
@ -1998,8 +1996,7 @@ public class SharePartitionManagerTest {
.withSharePartitions(sharePartitions)
.build();
delayedShareFetchPurgatory.tryCompleteElseWatch(
delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchWatchKeys);
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
@ -2504,13 +2501,16 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
doAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
delayedShareFetchPurgatory.checkAndComplete(args[0]);
DelayedShareFetchKey key = (DelayedShareFetchKey) args[0];
delayedShareFetchPurgatory.checkAndComplete(key);
return null;
}).when(replicaManager).completeDelayedShareFetchRequest(any(DelayedShareFetchKey.class));
doAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedShareFetch) args[0], (Seq<Object>) args[1]);
DelayedShareFetch operation = (DelayedShareFetch) args[0];
List<DelayedOperationKey> keys = (List<DelayedOperationKey>) args[1];
delayedShareFetchPurgatory.tryCompleteElseWatch(operation, keys);
return null;
}).when(replicaManager).addDelayedShareFetchRequest(any(), any());
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.server
import kafka.server.DelayedFuturePurgatory
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Time
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import java.util.concurrent.{CompletableFuture, ExecutionException}
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters.CollectionHasAsScala
class DelayedFutureTest {
@Test
def testDelayedFuture(): Unit = {
val purgatoryName = "testDelayedFuture"
val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
try {
val result = new AtomicInteger()
def hasExecutorThread: Boolean = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
.exists(_.contains(s"DelayedExecutor-$purgatoryName"))
def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
assertFalse(hasExecutorThread, "Unnecessary thread created")
// Two completed futures: callback should be executed immediately on the same thread
val futures1 = List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () => updateResult(futures1))
assertTrue(r1.isCompleted, "r1 not completed")
assertEquals(21, result.get())
assertFalse(hasExecutorThread, "Unnecessary thread created")
// Two delayed futures: callback should wait for both to complete
result.set(-1)
val futures2 = List(new CompletableFuture[Integer], new CompletableFuture[Integer])
val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () => updateResult(futures2))
assertFalse(r2.isCompleted, "r2 should be incomplete")
futures2.head.complete(20)
assertFalse(r2.isCompleted)
assertEquals(-1, result.get())
futures2(1).complete(21)
TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
assertTrue(hasExecutorThread, "Thread not created for executing delayed task")
// One immediate and one delayed future: callback should wait for delayed task to complete
result.set(-1)
val futures3 = List(new CompletableFuture[Integer], CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () => updateResult(futures3))
assertFalse(r3.isCompleted, "r3 should be incomplete")
assertEquals(-1, result.get())
futures3.head.complete(30)
TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
// One future doesn't complete within timeout. Should expire and invoke callback after timeout.
result.set(-1)
val start = Time.SYSTEM.hiResClockMs
val expirationMs = 2000L
val futures4 = List(new CompletableFuture[Integer], new CompletableFuture[Integer])
val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4, () => updateResult(futures4))
futures4.head.complete(40)
TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not expired")
assertTrue(r4.isCompleted, "r4 not completed after timeout")
val elapsed = Time.SYSTEM.hiResClockMs - start
assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed should at least $expirationMs")
assertEquals(40, futures4.head.get)
assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
assertThrows(classOf[ExecutionException], () => futures4(1).get).getCause.getClass)
assertEquals(40, result.get())
} finally {
purgatory.shutdown()
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicR
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.MockTimer
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.assertEquals
@ -33,6 +34,7 @@ import java.util.Optional
import java.util.concurrent.CompletableFuture
import scala.collection.mutable
import scala.concurrent.TimeoutException
import scala.jdk.CollectionConverters._
class DelayedRemoteListOffsetsTest {
@ -41,7 +43,7 @@ class DelayedRemoteListOffsetsTest {
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
type T = Either[Exception, Option[TimestampAndOffset]]
val purgatory =
new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, purgeInterval = 10)
new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, 0, 10, true, true)
@AfterEach
def afterEach(): Unit = {
@ -80,7 +82,7 @@ class DelayedRemoteListOffsetsTest {
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList.asJava
assertEquals(0, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
assertEquals(0, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size)
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
@ -90,7 +92,7 @@ class DelayedRemoteListOffsetsTest {
assertEquals(listOffsetsRequestKeys.size, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
assertEquals(listOffsetsRequestKeys.size, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
listOffsetsRequestKeys.foreach(key => {
listOffsetsRequestKeys.forEach(key => {
val tp = new TopicPartition(key.topic, key.partition)
assertEquals(1, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.get(tp).count())
})
@ -132,7 +134,7 @@ class DelayedRemoteListOffsetsTest {
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList.asJava
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
assertEquals(0, cancelledCount)
@ -188,7 +190,7 @@ class DelayedRemoteListOffsetsTest {
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList.asJava
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
assertEquals(0, cancelledCount)
@ -246,7 +248,7 @@ class DelayedRemoteListOffsetsTest {
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList.asJava
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
assertEquals(1, cancelledCount)

View File

@ -58,6 +58,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager, RequestLocal}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
@ -4099,7 +4100,7 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def tryCompleteDelayedRequestsCatchesExceptions(): Unit = {
val requestKey = TopicPartitionOperationKey(topicPartition)
val requestKey = new TopicPartitionOperationKey(topicPartition)
val produce = mock(classOf[DelayedOperationPurgatory[DelayedProduce]])
when(produce.checkAndComplete(requestKey)).thenThrow(new RuntimeException("uh oh"))

View File

@ -24,7 +24,7 @@ import java.util.concurrent.locks.Lock
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, _}
import kafka.server.{KafkaConfig, _}
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
@ -62,7 +63,7 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend
executor = Executors.newFixedThreadPool(nThreads)
val mockLogMger = mock(classOf[LogManager])
when(mockLogMger.liveLogDirs).thenReturn(Seq.empty)
val producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false)
val producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, 1000, false, true)
val watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
replicaManager = TestReplicaManager(KafkaConfig.fromProps(serverProps), time, scheduler, timer, mockLogMger, mock(classOf[QuotaManagers], withSettings().stubOnly()), producePurgatory, watchKeys)
zkClient = mock(classOf[KafkaZkClient], withSettings().stubOnly())
@ -247,9 +248,9 @@ object AbstractCoordinatorConcurrencyTest {
})
}
}
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_))
watchKeys ++= producerRequestKeys
producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.toList.asJava)
}
override def getMagic(topicPartition: TopicPartition): Option[Byte] = {
@ -287,15 +288,15 @@ object AbstractCoordinatorConcurrencyTest {
producePurgatory: DelayedOperationPurgatory[DelayedProduce],
watchKeys: mutable.Set[TopicPartitionOperationKey]): TestReplicaManager = {
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", timer, reaperEnabled = false)
"RemoteFetch", timer, 0, 1000, false, true)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
"RemoteListOffsets", timer, 0, 1000, false, true)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", timer, reaperEnabled = false)
"Fetch", timer, 0, 1000, false, true)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
"DeleteRecords", timer, 0, 1000, false, true)
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", timer, reaperEnabled = false)
"ElectLeader", timer, 0, 1000, false, true)
new TestReplicaManager(config, time, scheduler, logManager, quotaManagers, watchKeys, producePurgatory,
mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory,
mockRemoteListOffsetsPurgatory)

View File

@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, KafkaRequestHandler}
import kafka.server.{KafkaConfig, KafkaRequestHandler}
import kafka.utils.CoreUtils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic
@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetAndMetadata}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.when
@ -79,8 +80,8 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
val config = KafkaConfig.fromProps(serverProps)
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, reaperEnabled = false)
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, 1000, false, true)
rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, 1000, false, true)
metrics = new Metrics
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)

View File

@ -18,7 +18,7 @@
package kafka.coordinator.group
import java.util.{OptionalInt, OptionalLong}
import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager}
import kafka.server.{HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager}
import kafka.utils._
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -38,6 +38,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetAndMetadata}
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, VerificationGuard}
@ -118,8 +119,8 @@ class GroupCoordinatorTest {
val config = KafkaConfig.fromProps(props)
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, reaperEnabled = false)
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, 1000, config.brokerId, false, true)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, 1000, config.brokerId, false, true)
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics())
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),

View File

@ -19,7 +19,7 @@ package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.{DelayedOperationPurgatory, DelayedRemoteListOffsets, KafkaConfig}
import kafka.server.{DelayedRemoteListOffsets, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@ -37,6 +37,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
@ -2098,7 +2099,7 @@ class UnifiedLogTest {
@Test
def testFetchOffsetByTimestampFromRemoteStorage(): Unit = {
val config: KafkaConfig = createKafkaConfigWithRLM
val purgatory = DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId)
val purgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId)
val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig,
0,
logDir.getAbsolutePath,
@ -2195,7 +2196,7 @@ class UnifiedLogTest {
@Test
def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {
val config: KafkaConfig = createKafkaConfigWithRLM
val purgatory = DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId)
val purgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId)
val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig,
0,
logDir.getAbsolutePath,

View File

@ -1,409 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Random
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.CoreUtils.inLock
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Time
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
class DelayedOperationTest {
var purgatory: DelayedOperationPurgatory[DelayedOperation] = _
var executorService: ExecutorService = _
@BeforeEach
def setUp(): Unit = {
purgatory = DelayedOperationPurgatory[DelayedOperation](purgatoryName = "mock")
}
@AfterEach
def tearDown(): Unit = {
purgatory.shutdown()
if (executorService != null)
executorService.shutdown()
}
@Test
def testLockInTryCompleteElseWatch(): Unit = {
val op = new DelayedOperation(100000L) {
override def onExpiration(): Unit = {}
override def onComplete(): Unit = {}
override def tryComplete(): Boolean = {
assertTrue(lock.asInstanceOf[ReentrantLock].isHeldByCurrentThread)
false
}
override def safeTryComplete(): Boolean = {
fail("tryCompleteElseWatch should not use safeTryComplete")
super.safeTryComplete()
}
}
purgatory.tryCompleteElseWatch(op, Seq("key"))
}
@Test
def testSafeTryCompleteOrElse(): Unit = {
def op(shouldComplete: Boolean) = new DelayedOperation(100000L) {
override def onExpiration(): Unit = {}
override def onComplete(): Unit = {}
override def tryComplete(): Boolean = {
assertTrue(lock.asInstanceOf[ReentrantLock].isHeldByCurrentThread)
shouldComplete
}
}
var pass = false
assertFalse(op(false).safeTryCompleteOrElse {
pass = true
})
assertTrue(pass)
assertTrue(op(true).safeTryCompleteOrElse {
fail("this method should NOT be executed")
})
}
@Test
def testRequestSatisfaction(): Unit = {
val r1 = new MockDelayedOperation(100000L)
val r2 = new MockDelayedOperation(100000L)
assertEquals(0, purgatory.checkAndComplete("test1"), "With no waiting requests, nothing should be satisfied")
assertFalse(purgatory.tryCompleteElseWatch(r1, Array("test1")), "r1 not satisfied and hence watched")
assertEquals(0, purgatory.checkAndComplete("test1"), "Still nothing satisfied")
assertFalse(purgatory.tryCompleteElseWatch(r2, Array("test2")), "r2 not satisfied and hence watched")
assertEquals(0, purgatory.checkAndComplete("test2"), "Still nothing satisfied")
r1.completable = true
assertEquals(1, purgatory.checkAndComplete("test1"), "r1 satisfied")
assertEquals(0, purgatory.checkAndComplete("test1"), "Nothing satisfied")
r2.completable = true
assertEquals(1, purgatory.checkAndComplete("test2"), "r2 satisfied")
assertEquals(0, purgatory.checkAndComplete("test2"), "Nothing satisfied")
}
@Test
def testRequestExpiry(): Unit = {
val expiration = 20L
val start = Time.SYSTEM.hiResClockMs
val r1 = new MockDelayedOperation(expiration)
val r2 = new MockDelayedOperation(200000L)
assertFalse(purgatory.tryCompleteElseWatch(r1, Array("test1")), "r1 not satisfied and hence watched")
assertFalse(purgatory.tryCompleteElseWatch(r2, Array("test2")), "r2 not satisfied and hence watched")
r1.awaitExpiration()
val elapsed = Time.SYSTEM.hiResClockMs - start
assertTrue(r1.isCompleted, "r1 completed due to expiration")
assertFalse(r2.isCompleted, "r2 hasn't completed")
assertTrue(elapsed >= expiration, s"Time for expiration $elapsed should at least $expiration")
}
@Test
def testDelayedFuture(): Unit = {
val purgatoryName = "testDelayedFuture"
val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
try {
val result = new AtomicInteger()
def hasExecutorThread: Boolean = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
.exists(_.contains(s"DelayedExecutor-$purgatoryName"))
def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
assertFalse(hasExecutorThread, "Unnecessary thread created")
// Two completed futures: callback should be executed immediately on the same thread
val futures1 = List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () => updateResult(futures1))
assertTrue(r1.isCompleted, "r1 not completed")
assertEquals(21, result.get())
assertFalse(hasExecutorThread, "Unnecessary thread created")
// Two delayed futures: callback should wait for both to complete
result.set(-1)
val futures2 = List(new CompletableFuture[Integer], new CompletableFuture[Integer])
val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () => updateResult(futures2))
assertFalse(r2.isCompleted, "r2 should be incomplete")
futures2.head.complete(20)
assertFalse(r2.isCompleted)
assertEquals(-1, result.get())
futures2(1).complete(21)
TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
assertTrue(hasExecutorThread, "Thread not created for executing delayed task")
// One immediate and one delayed future: callback should wait for delayed task to complete
result.set(-1)
val futures3 = List(new CompletableFuture[Integer], CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () => updateResult(futures3))
assertFalse(r3.isCompleted, "r3 should be incomplete")
assertEquals(-1, result.get())
futures3.head.complete(30)
TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
// One future doesn't complete within timeout. Should expire and invoke callback after timeout.
result.set(-1)
val start = Time.SYSTEM.hiResClockMs
val expirationMs = 2000L
val futures4 = List(new CompletableFuture[Integer], new CompletableFuture[Integer])
val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4, () => updateResult(futures4))
futures4.head.complete(40)
TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not expired")
assertTrue(r4.isCompleted, "r4 not completed after timeout")
val elapsed = Time.SYSTEM.hiResClockMs - start
assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed should at least $expirationMs")
assertEquals(40, futures4.head.get)
assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
assertThrows(classOf[ExecutionException], () => futures4(1).get).getCause.getClass)
assertEquals(40, result.get())
} finally {
purgatory.shutdown()
}
}
@Test
def testRequestPurge(): Unit = {
val r1 = new MockDelayedOperation(100000L)
val r2 = new MockDelayedOperation(100000L)
val r3 = new MockDelayedOperation(100000L)
purgatory.tryCompleteElseWatch(r1, Array("test1"))
purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3"))
assertEquals(3, purgatory.numDelayed, "Purgatory should have 3 total delayed operations")
assertEquals(6, purgatory.watched, "Purgatory should have 6 watched elements")
// complete the operations, it should immediately be purged from the delayed operation
r2.completable = true
r2.tryComplete()
assertEquals(2, purgatory.numDelayed, "Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed)
r3.completable = true
r3.tryComplete()
assertEquals(1, purgatory.numDelayed, "Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed)
// checking a watch should purge the watch list
purgatory.checkAndComplete("test1")
assertEquals(4, purgatory.watched, "Purgatory should have 4 watched elements instead of " + purgatory.watched)
purgatory.checkAndComplete("test2")
assertEquals(2, purgatory.watched, "Purgatory should have 2 watched elements instead of " + purgatory.watched)
purgatory.checkAndComplete("test3")
assertEquals(1, purgatory.watched, "Purgatory should have 1 watched elements instead of " + purgatory.watched)
}
@Test
def shouldCancelForKeyReturningCancelledOperations(): Unit = {
purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Seq("key"))
purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Seq("key"))
purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Seq("key2"))
val cancelledOperations = purgatory.cancelForKey("key")
assertEquals(2, cancelledOperations.size)
assertEquals(1, purgatory.numDelayed)
assertEquals(1, purgatory.watched)
}
@Test
def shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist(): Unit = {
val cancelledOperations = purgatory.cancelForKey("key")
assertEquals(Nil, cancelledOperations)
}
/**
* Test `tryComplete` with multiple threads to verify that there are no timing windows
* when completion is not performed even if the thread that makes the operation completable
* may not be able to acquire the operation lock. Since it is difficult to test all scenarios,
* this test uses random delays with a large number of threads.
*/
@Test
def testTryCompleteWithMultipleThreads(): Unit = {
val executor = Executors.newScheduledThreadPool(20)
this.executorService = executor
val random = new Random
val maxDelayMs = 10
val completionAttempts = 20
class TestDelayOperation(index: Int) extends MockDelayedOperation(10000L) {
val key = s"key$index"
val completionAttemptsRemaining = new AtomicInteger(completionAttempts)
override def tryComplete(): Boolean = {
val shouldComplete = completable
Thread.sleep(random.nextInt(maxDelayMs))
if (shouldComplete)
forceComplete()
else
false
}
}
val ops = (0 until 100).map { index =>
val op = new TestDelayOperation(index)
purgatory.tryCompleteElseWatch(op, Seq(op.key))
op
}
def scheduleTryComplete(op: TestDelayOperation, delayMs: Long): Future[_] = {
executor.schedule(new Runnable {
override def run(): Unit = {
if (op.completionAttemptsRemaining.decrementAndGet() == 0)
op.completable = true
purgatory.checkAndComplete(op.key)
}
}, delayMs, TimeUnit.MILLISECONDS)
}
(1 to completionAttempts).flatMap { _ =>
ops.map { op => scheduleTryComplete(op, random.nextInt(maxDelayMs)) }
}.foreach { future => future.get }
ops.foreach { op => assertTrue(op.isCompleted, "Operation should have completed") }
}
def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean): Unit = {
val key = "key"
executorService = Executors.newSingleThreadExecutor
def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
(1 to count).map { _ =>
val op = mockDelayedOperation
purgatory.tryCompleteElseWatch(op, Seq(key))
assertFalse(op.isCompleted, "Not completable")
op
}
}
def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
(1 to count).map { _ =>
val op = mockDelayedOperation
op.completable = true
op
}
}
def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = {
completableOps.foreach(op => op.completable = true)
val completed = purgatory.checkAndComplete(key)
assertEquals(expectedComplete.size, completed)
expectedComplete.foreach(op => assertTrue(op.isCompleted, "Should have completed"))
val expectedNotComplete = completableOps.toSet -- expectedComplete
expectedNotComplete.foreach(op => assertFalse(op.isCompleted, "Should not have completed"))
}
// If locks are free all completable operations should complete
var ops = createDelayedOperations(2)
checkAndComplete(ops, ops)
// Lock held by current thread, completable operations should complete
ops = createDelayedOperations(2)
inLock(ops(1).lock) {
checkAndComplete(ops, ops)
}
// Lock held by another thread, should not block, only operations that can be
// locked without blocking on the current thread should complete
ops = createDelayedOperations(2)
runOnAnotherThread(ops(0).lock.lock(), shouldComplete = true)
try {
checkAndComplete(ops, Seq(ops(1)))
} finally {
runOnAnotherThread(ops(0).lock.unlock(), shouldComplete = true)
checkAndComplete(Seq(ops(0)), Seq(ops(0)))
}
// Lock acquired by response callback held by another thread, should not block
// if the response lock is used as operation lock, only operations
// that can be locked without blocking on the current thread should complete
ops = createDelayedOperations(2)
ops(0).responseLockOpt.foreach { lock =>
runOnAnotherThread(lock.lock(), shouldComplete = true)
try {
try {
checkAndComplete(ops, Seq(ops(1)))
assertFalse(mismatchedLocks, "Should have failed with mismatched locks")
} catch {
case e: IllegalStateException =>
assertTrue(mismatchedLocks, "Should not have failed with valid locks")
}
} finally {
runOnAnotherThread(lock.unlock(), shouldComplete = true)
checkAndComplete(Seq(ops(0)), Seq(ops(0)))
}
}
// Immediately completable operations should complete without locking
ops = createCompletableOperations(2)
ops.foreach { op =>
assertTrue(purgatory.tryCompleteElseWatch(op, Seq(key)), "Should have completed")
assertTrue(op.isCompleted, "Should have completed")
}
}
private def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = {
val future = executorService.submit(new Runnable {
def run(): Unit = fun
})
if (shouldComplete)
future.get()
else
assertFalse(future.isDone, "Should not have completed")
future
}
class MockDelayedOperation(delayMs: Long,
lockOpt: Option[ReentrantLock] = None,
val responseLockOpt: Option[ReentrantLock] = None)
extends DelayedOperation(delayMs, lockOpt) {
var completable = false
def awaitExpiration(): Unit = {
synchronized {
wait()
}
}
override def tryComplete() = {
if (completable)
forceComplete()
else
false
}
override def onExpiration(): Unit = {
}
override def onComplete(): Unit = {
responseLockOpt.foreach { lock =>
if (!lock.tryLock())
throw new IllegalStateException("Response callback lock could not be acquired in callback")
}
synchronized {
notify()
}
}
}
}

View File

@ -66,6 +66,7 @@ import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerL
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
@ -1790,7 +1791,7 @@ class ReplicaManagerTest {
assertTrue(consumerResult.hasFired)
// No delayed fetch was inserted
assertEquals(0, replicaManager.delayedFetchPurgatory.watched)
assertEquals(0, replicaManager.delayedFetchPurgatory.watched())
// Returns a preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isPresent)
@ -2995,19 +2996,19 @@ class ReplicaManagerTest {
followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap)
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
"Produce", timer, 0, false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", timer, reaperEnabled = false)
"Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
"DeleteRecords", timer, 0, false)
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", timer, reaperEnabled = false)
"ElectLeader", timer, 0, false)
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", timer, reaperEnabled = false)
"RemoteFetch", timer, 0, false)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
"RemoteListOffsets", timer, 0, false)
val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch](
purgatoryName = "ShareFetch", timer, reaperEnabled = false)
"ShareFetch", timer, 0, false)
// Mock network client to show leader offset of 5
val blockingSend = new MockBlockingSender(
@ -3422,19 +3423,19 @@ class ReplicaManagerTest {
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
"Produce", timer, 0, false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", timer, reaperEnabled = false)
"Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
"DeleteRecords", timer, 0, false)
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
"DelayedElectLeader", timer, 0, false)
val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false)
"DelayedRemoteFetch", timer, 0, false)
val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
"RemoteListOffsets", timer, 0, false)
val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch](
purgatoryName = "ShareFetch", timer, reaperEnabled = false)
"ShareFetch", timer, 0, false)
when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true)

View File

@ -16,9 +16,9 @@
*/
package org.apache.kafka.jmh.core;
import kafka.server.DelayedOperation;
import kafka.server.DelayedOperationPurgatory;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.ShutdownableThread;
@ -30,6 +30,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -43,14 +44,12 @@ import java.util.stream.IntStream;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;
import static java.lang.String.format;
public class TestPurgatoryPerformance {
public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) throws Exception {
TestArgumentDefinition def = new TestArgumentDefinition(args);
def.checkRequiredArgs();
@ -70,7 +69,7 @@ public class TestPurgatoryPerformance {
IntervalSamples intervalSamples = new IntervalSamples(1000000, requestRate);
DelayedOperationPurgatory<FakeOperation> purgatory =
DelayedOperationPurgatory.apply("fake purgatory", 0, 1000, true, true);
new DelayedOperationPurgatory<>("fake purgatory", 0, 1000);
CompletionQueue queue = new CompletionQueue();
List<String> gcNames = gcMXBeans.stream().map(MemoryManagerMXBean::getName).collect(Collectors.toList());
@ -78,8 +77,8 @@ public class TestPurgatoryPerformance {
long initialCpuTimeNano = getProcessCpuTimeNanos(osMXBean).orElseThrow();
long start = System.currentTimeMillis();
Random rand = new Random();
List<String> keys = IntStream.range(0, numKeys)
.mapToObj(i -> format("fakeKey%d", rand.nextInt(numPossibleKeys)))
List<FakeOperationKey> keys = IntStream.range(0, numKeys)
.mapToObj(i -> new FakeOperationKey(format("fakeKey%d", rand.nextInt(numPossibleKeys))))
.collect(Collectors.toList());
AtomicLong requestArrivalTime = new AtomicLong(start);
@ -196,7 +195,7 @@ public class TestPurgatoryPerformance {
}
public void checkRequiredArgs() {
CommandLineUtils.checkRequiredArgs(parser, options, numRequestsOpt, requestRateOpt, pct75Opt, pct50Opt);
CommandLineUtils.checkRequiredArgs(parser, options, numRequestsOpt, requestRateOpt, timeoutOpt, pct75Opt, pct50Opt);
}
public int numRequests() {
@ -240,7 +239,7 @@ public class TestPurgatoryPerformance {
LatencySamples latencySamples,
AtomicLong requestArrivalTime,
CountDownLatch latch,
List<String> keys,
List<FakeOperationKey> keys,
AtomicLong end) {
int i = numRequests;
while (i > 0) {
@ -263,9 +262,7 @@ public class TestPurgatoryPerformance {
queue.add(request);
}
purgatory.tryCompleteElseWatch(request, CollectionConverters.asScala(
keys.stream().map(k -> (Object) k).collect(Collectors.toList())
).toSeq());
purgatory.tryCompleteElseWatch(request, keys);
}
end.set(System.currentTimeMillis());
}
@ -433,13 +430,39 @@ public class TestPurgatoryPerformance {
}
}
private static class FakeOperationKey implements DelayedOperationKey {
private final String key;
public FakeOperationKey(String key) {
this.key = key;
}
@Override
public String keyLabel() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FakeOperationKey that = (FakeOperationKey) o;
return Objects.equals(key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(key);
}
}
private static class FakeOperation extends DelayedOperation {
final long completesAt;
final long latencyMs;
final CountDownLatch latch;
public FakeOperation(long delayMs, long latencyMs, CountDownLatch latch) {
super(delayMs, Option.empty());
super(delayMs);
this.latencyMs = latencyMs;
this.latch = latch;
completesAt = System.currentTimeMillis() + delayMs;

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.server.util.timer.TimerTask;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* An operation whose processing needs to be delayed for at most the given delayMs. For example
* a delayed produce operation could be waiting for specified number of acks; or
* a delayed fetch operation could be waiting for a given number of bytes to accumulate.
* <br/>
* The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
* Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
* forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
* or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
* forceComplete().
* <br/>
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
* <br/>
* Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete()
* like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction().
*/
public abstract class DelayedOperation extends TimerTask {
private final AtomicBoolean completed = new AtomicBoolean(false);
// Visible for testing
final Lock lock;
public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
this(delayMs, lockOpt.orElse(new ReentrantLock()));
}
public DelayedOperation(long delayMs) {
this(delayMs, new ReentrantLock());
}
public DelayedOperation(long delayMs, Lock lock) {
super(delayMs);
this.lock = lock;
}
/*
* Force completing the delayed operation, if not already completed.
* This function can be triggered when
*
* 1. The operation has been verified to be completable inside tryComplete()
* 2. The operation has expired and hence needs to be completed right now
*
* Return true iff the operation is completed by the caller: note that
* concurrent threads can try to complete the same operation, but only
* the first thread will succeed in completing the operation and return
* true, others will still return false
*/
public boolean forceComplete() {
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel();
onComplete();
return true;
} else {
return false;
}
}
/**
* Check if the delayed operation is already completed
*/
public boolean isCompleted() {
return completed.get();
}
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
public abstract void onExpiration();
/**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
public abstract void onComplete();
/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
* <br/>
* This function needs to be defined in subclasses
*/
public abstract boolean tryComplete();
/**
* Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
* @param action else function to be executed after first tryComplete returns false
* @return result of tryComplete
*/
boolean safeTryCompleteOrElse(Action action) {
lock.lock();
try {
if (tryComplete()) return true;
else {
action.apply();
// last completion check
return tryComplete();
}
} finally {
lock.unlock();
}
}
/**
* Thread-safe variant of tryComplete()
*/
boolean safeTryComplete() {
lock.lock();
try {
if (isCompleted()) return false;
else return tryComplete();
} finally {
lock.unlock();
}
}
/**
* run() method defines a task that is executed on timeout
*/
@Override
public void run() {
if (forceComplete())
onExpiration();
}
@FunctionalInterface
public interface Action {
void apply();
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
/**
* Keys used for delayed operation metrics recording
*/
public interface DelayedOperationKey {
String keyLabel();
}

View File

@ -0,0 +1,414 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class DelayedOperationPurgatory<T extends DelayedOperation> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedOperationPurgatory.class);
private static final int SHARDS = 512; // Shard the watcher list to reduce lock contention
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedOperationPurgatory");
private final Map<String, String> metricsTags;
private final List<WatcherList> watcherLists;
// the number of estimated total operations in the purgatory
private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0);
/* background thread expiring operations that have timed out */
private final ExpiredOperationReaper expirationReaper = new ExpiredOperationReaper();
private final String purgatoryName;
private final Timer timeoutTimer;
private final int brokerId;
private final int purgeInterval;
private final boolean reaperEnabled;
private final boolean timerEnabled;
public DelayedOperationPurgatory(String purgatoryName, Timer timer, int brokerId, boolean reaperEnabled) {
this(purgatoryName, timer, brokerId, 1000, reaperEnabled, true);
}
public DelayedOperationPurgatory(String purgatoryName, int brokerId) {
this(purgatoryName, brokerId, 1000);
}
public DelayedOperationPurgatory(String purgatoryName, int brokerId, int purgeInterval) {
this(purgatoryName, new SystemTimer(purgatoryName), brokerId, purgeInterval, true, true);
}
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
@SuppressWarnings("this-escape")
public DelayedOperationPurgatory(String purgatoryName,
Timer timeoutTimer,
int brokerId,
int purgeInterval,
boolean reaperEnabled,
boolean timerEnabled) {
this.purgatoryName = purgatoryName;
this.timeoutTimer = timeoutTimer;
this.brokerId = brokerId;
this.purgeInterval = purgeInterval;
this.reaperEnabled = reaperEnabled;
this.timerEnabled = timerEnabled;
watcherLists = new ArrayList<>(SHARDS);
for (int i = 0; i < SHARDS; i++) {
watcherLists.add(new WatcherList());
}
metricsTags = Collections.singletonMap("delayedOperation", purgatoryName);
metricsGroup.newGauge("PurgatorySize", this::watched, metricsTags);
metricsGroup.newGauge("NumDelayedOperations", this::numDelayed, metricsTags);
if (reaperEnabled) {
expirationReaper.start();
}
}
private WatcherList watcherList(DelayedOperationKey key) {
return watcherLists.get(Math.abs(key.hashCode() % watcherLists.size()));
}
/**
* Check if the operation can be completed, if not watch it based on the given watch keys
* <br/>
* Note that a delayed operation can be watched on multiple keys. It is possible that
* an operation is completed after it has been added to the watch list for some, but
* not all the keys. In this case, the operation is considered completed and won't
* be added to the watch list of the remaining keys. The expiration reaper thread will
* remove this operation from any watcher list in which the operation exists.
*
* @param operation the delayed operation to be checked
* @param watchKeys keys for bookkeeping the operation
* @return true iff the delayed operations can be completed by the caller
*/
public <K extends DelayedOperationKey> boolean tryCompleteElseWatch(T operation, List<K> watchKeys) {
if (watchKeys.isEmpty()) {
throw new IllegalArgumentException("The watch key list can't be empty");
}
// The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is
// going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse().
// If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At
// this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering
// event since the operation is already on the watcher list for all keys.
//
// ==============[story about lock]==============
// Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing
// the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and
// checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete()
// 1) thread_a holds readlock of stateLock from TransactionStateManager
// 2) thread_a is executing tryCompleteElseWatch()
// 3) thread_a adds op to watch list
// 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a)
// 5) thread_c calls checkAndComplete() and holds lock of op
// 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b)
// 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c)
//
// Note that even with the current approach, deadlocks could still be introduced. For example,
// 1) thread_a calls tryCompleteElseWatch() and gets lock of op
// 2) thread_a adds op to watch list
// 3) thread_a calls op#tryComplete and tries to require lock_b
// 4) thread_b holds lock_b and calls checkAndComplete()
// 5) thread_b sees op from watch list
// 6) thread_b needs lock of op
// To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding
// any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
// holding an exclusive lock to make the call is often unnecessary.
if (operation.safeTryCompleteOrElse(() -> {
watchKeys.forEach(key -> watchForOperation(key, operation));
if (!watchKeys.isEmpty())
estimatedTotalOperations.incrementAndGet();
})) {
return true;
}
// if it cannot be completed by now and hence is watched, add to the timeout queue also
if (!operation.isCompleted()) {
if (timerEnabled)
timeoutTimer.add(operation);
if (operation.isCompleted()) {
// cancel the timer task
operation.cancel();
}
}
return false;
}
/**
* Check if some delayed operations can be completed with the given watch key,
* and if yes complete them.
*
* @return the number of completed operations during this process
*/
public <K extends DelayedOperationKey> int checkAndComplete(K key) {
WatcherList wl = watcherList(key);
Watchers watchers;
wl.watchersLock.lock();
try {
watchers = wl.watchersByKey.get(key);
} finally {
wl.watchersLock.unlock();
}
int numCompleted = watchers == null ? 0 : watchers.tryCompleteWatched();
if (numCompleted > 0) {
LOG.debug("Request key {} unblocked {} {} operations", key, numCompleted, purgatoryName);
}
return numCompleted;
}
/**
* Return the total size of watch lists the purgatory. Since an operation may be watched
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/
public int watched() {
int sum = 0;
for (WatcherList watcherList : watcherLists) {
sum += watcherList.allWatchers().stream().mapToInt(Watchers::countWatched).sum();
}
return sum;
}
/**
* Return the number of delayed operations in the expiry queue
*/
public int numDelayed() {
return timeoutTimer.size();
}
/**
* Cancel watching on any delayed operations for the given key. Note the operation will not be completed
*/
public List<T> cancelForKey(DelayedOperationKey key) {
WatcherList wl = watcherList(key);
wl.watchersLock.lock();
try {
Watchers watchers = wl.watchersByKey.remove(key);
if (watchers != null)
return watchers.cancel();
else
return Collections.emptyList();
} finally {
wl.watchersLock.unlock();
}
}
/*
* Watch the operation
*/
private void watchForOperation(DelayedOperationKey key, T operation) {
WatcherList wl = watcherList(key);
wl.watchersLock.lock();
try {
Watchers watcher = wl.watchersByKey.computeIfAbsent(key, Watchers::new);
watcher.watch(operation);
} finally {
wl.watchersLock.unlock();
}
}
/*
* Remove the key from watcher lists if its list is empty
*/
private void removeKeyIfEmpty(DelayedOperationKey key, Watchers watchers) {
WatcherList wl = watcherList(key);
wl.watchersLock.lock();
try {
// if the current key is no longer correlated to the watchers to remove, skip
if (wl.watchersByKey.get(key) != watchers)
return;
if (watchers != null && watchers.isEmpty()) {
wl.watchersByKey.remove(key);
}
} finally {
wl.watchersLock.unlock();
}
}
/**
* Shutdown the expiration reaper thread
*/
public void shutdown() throws Exception {
if (reaperEnabled) {
expirationReaper.initiateShutdown();
// improve shutdown time by waking up any ShutdownableThread blocked on poll by sending a no-op
timeoutTimer.add(new TimerTask(0) {
@Override
public void run() {}
});
expirationReaper.awaitShutdown();
}
timeoutTimer.close();
metricsGroup.removeMetric("PurgatorySize", metricsTags);
metricsGroup.removeMetric("NumDelayedOperations", metricsTags);
}
/**
* A list of operation watching keys
*/
private class WatcherList {
private final ConcurrentHashMap<DelayedOperationKey, Watchers> watchersByKey = new ConcurrentHashMap<>();
private final ReentrantLock watchersLock = new ReentrantLock();
/*
* Return all the current watcher lists,
* note that the returned watchers may be removed from the list by other threads
*/
Collection<Watchers> allWatchers() {
return watchersByKey.values();
}
}
/**
* A linked list of watched delayed operations based on some key
*/
private class Watchers {
private final ConcurrentLinkedQueue<T> operations = new ConcurrentLinkedQueue<>();
private final DelayedOperationKey key;
Watchers(DelayedOperationKey key) {
this.key = key;
}
// count the current number of watched operations. This is O(n), so use isEmpty() if possible
int countWatched() {
return operations.size();
}
boolean isEmpty() {
return operations.isEmpty();
}
// add the element to watch
void watch(T t) {
operations.add(t);
}
// traverse the list and try to complete some watched elements
int tryCompleteWatched() {
int completed = 0;
Iterator<T> iter = operations.iterator();
while (iter.hasNext()) {
T curr = iter.next();
if (curr.isCompleted()) {
// another thread has completed this operation, just remove it
iter.remove();
} else if (curr.safeTryComplete()) {
iter.remove();
completed += 1;
}
}
if (operations.isEmpty())
removeKeyIfEmpty(key, this);
return completed;
}
List<T> cancel() {
Iterator<T> iter = operations.iterator();
List<T> cancelled = new ArrayList<>();
while (iter.hasNext()) {
T curr = iter.next();
curr.cancel();
iter.remove();
cancelled.add(curr);
}
return cancelled;
}
// traverse the list and purge elements that are already completed by others
int purgeCompleted() {
int purged = 0;
Iterator<T> iter = operations.iterator();
while (iter.hasNext()) {
T curr = iter.next();
if (curr.isCompleted()) {
iter.remove();
purged += 1;
}
}
if (operations.isEmpty())
removeKeyIfEmpty(key, this);
return purged;
}
}
private void advanceClock(long timeoutMs) throws InterruptedException {
timeoutTimer.advanceClock(timeoutMs);
// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get() - numDelayed() > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the cleanup, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(numDelayed());
LOG.debug("Begin purging watch lists");
int purged = 0;
for (WatcherList watcherList : watcherLists) {
purged += watcherList.allWatchers().stream().mapToInt(Watchers::purgeCompleted).sum();
}
LOG.debug("Purged {} elements from watch lists.", purged);
}
}
/**
* A background reaper to expire delayed operations that have timed out
*/
private class ExpiredOperationReaper extends ShutdownableThread {
ExpiredOperationReaper() {
super("ExpirationReaper-" + brokerId + "-" + purgatoryName, false);
}
@Override
public void doWork() {
try {
advanceClock(200L);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import java.util.Objects;
/**
* Used by delayed-join operations
*/
public class GroupJoinKey implements DelayedOperationKey {
private final String groupId;
public GroupJoinKey(String groupId) {
this.groupId = groupId;
}
@Override
public String keyLabel() {
return "join-" + groupId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GroupJoinKey that = (GroupJoinKey) o;
return Objects.equals(groupId, that.groupId);
}
@Override
public int hashCode() {
return Objects.hash(groupId);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import java.util.Objects;
/**
* Used by delayed-sync operations
*/
public class GroupSyncKey implements DelayedOperationKey {
private final String groupId;
public GroupSyncKey(String groupId) {
this.groupId = groupId;
}
@Override
public String keyLabel() {
return "sync-" + groupId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GroupSyncKey that = (GroupSyncKey) o;
return Objects.equals(groupId, that.groupId);
}
@Override
public int hashCode() {
return Objects.hash(groupId);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import java.util.Objects;
/**
* Used by delayed-join-group operations
*/
public class MemberKey implements DelayedOperationKey {
private final String groupId;
private final String consumerId;
public MemberKey(String groupId, String consumerId) {
this.groupId = groupId;
this.consumerId = consumerId;
}
@Override
public String keyLabel() {
return groupId + "-" + consumerId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberKey memberKey = (MemberKey) o;
return Objects.equals(groupId, memberKey.groupId) && Objects.equals(consumerId, memberKey.consumerId);
}
@Override
public int hashCode() {
return Objects.hash(groupId, consumerId);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import java.util.Objects;
/**
* Used by delayed-produce and delayed-fetch operations
*/
public class TopicPartitionOperationKey implements DelayedOperationKey {
public final String topic;
public final int partition;
public TopicPartitionOperationKey(String topic, int partition) {
this.topic = topic;
this.partition = partition;
}
public TopicPartitionOperationKey(TopicPartition tp) {
this(tp.topic(), tp.partition());
}
public TopicPartitionOperationKey(TopicIdPartition tp) {
this(tp.topic(), tp.partition());
}
@Override
public String keyLabel() {
return topic + "-" + partition;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionOperationKey that = (TopicPartitionOperationKey) o;
return partition == that.partition && Objects.equals(topic, that.topic);
}
@Override
public int hashCode() {
return Objects.hash(topic, partition);
}
}

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class DelayedOperationTest {
private final MockKey test1 = new MockKey("test1");
private final MockKey test2 = new MockKey("test2");
private final MockKey test3 = new MockKey("test3");
private final Random random = new Random();
private DelayedOperationPurgatory<DelayedOperation> purgatory;
private ScheduledExecutorService executorService;
@BeforeEach
public void setUp() {
purgatory = new DelayedOperationPurgatory<>("mock", 0);
}
@AfterEach
public void tearDown() throws Exception {
purgatory.shutdown();
if (executorService != null)
executorService.shutdown();
}
private static class MockKey implements DelayedOperationKey {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MockKey mockKey = (MockKey) o;
return Objects.equals(key, mockKey.key);
}
@Override
public int hashCode() {
return key != null ? key.hashCode() : 0;
}
final String key;
MockKey(String key) {
this.key = key;
}
@Override
public String keyLabel() {
return key;
}
}
@Test
public void testLockInTryCompleteElseWatch() {
DelayedOperation op = new DelayedOperation(100000L) {
@Override
public void onExpiration() {}
@Override
public void onComplete() {}
@Override
public boolean tryComplete() {
assertTrue(((ReentrantLock) lock).isHeldByCurrentThread());
return false;
}
@Override
public boolean safeTryComplete() {
fail("tryCompleteElseWatch should not use safeTryComplete");
return super.safeTryComplete();
}
};
purgatory.tryCompleteElseWatch(op, Collections.singletonList(new MockKey("key")));
}
private DelayedOperation op(boolean shouldComplete) {
return new DelayedOperation(100000L) {
@Override
public void onExpiration() {}
@Override
public void onComplete() {}
@Override
public boolean tryComplete() {
assertTrue(((ReentrantLock) lock).isHeldByCurrentThread());
return shouldComplete;
}
};
}
@Test
public void testSafeTryCompleteOrElse() {
final AtomicBoolean pass = new AtomicBoolean();
assertFalse(op(false).safeTryCompleteOrElse(() -> pass.set(true)));
assertTrue(pass.get());
assertTrue(op(true).safeTryCompleteOrElse(() -> fail("this method should NOT be executed")));
}
@Test
public void testRequestSatisfaction() {
MockDelayedOperation r1 = new MockDelayedOperation(100000L);
MockDelayedOperation r2 = new MockDelayedOperation(100000L);
assertEquals(0, purgatory.checkAndComplete(test1), "With no waiting requests, nothing should be satisfied");
assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(new MockKey("test1"))), "r1 not satisfied and hence watched");
assertEquals(0, purgatory.checkAndComplete(test1), "Still nothing satisfied");
assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(new MockKey("test2"))), "r2 not satisfied and hence watched");
assertEquals(0, purgatory.checkAndComplete(test2), "Still nothing satisfied");
r1.completable = true;
assertEquals(1, purgatory.checkAndComplete(test1), "r1 satisfied");
assertEquals(0, purgatory.checkAndComplete(test1), "Nothing satisfied");
r2.completable = true;
assertEquals(1, purgatory.checkAndComplete(test2), "r2 satisfied");
assertEquals(0, purgatory.checkAndComplete(test2), "Nothing satisfied");
}
@Test
public void testRequestExpiry() throws Exception {
long expiration = 20L;
long start = Time.SYSTEM.hiResClockMs();
MockDelayedOperation r1 = new MockDelayedOperation(expiration);
MockDelayedOperation r2 = new MockDelayedOperation(200000L);
assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)), "r1 not satisfied and hence watched");
assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(test2)), "r2 not satisfied and hence watched");
r1.awaitExpiration();
long elapsed = Time.SYSTEM.hiResClockMs() - start;
assertTrue(r1.isCompleted(), "r1 completed due to expiration");
assertFalse(r2.isCompleted(), "r2 hasn't completed");
assertTrue(elapsed >= expiration, "Time for expiration " + elapsed + " should at least " + expiration);
}
@Test
public void testRequestPurge() {
MockDelayedOperation r1 = new MockDelayedOperation(100000L);
MockDelayedOperation r2 = new MockDelayedOperation(100000L);
MockDelayedOperation r3 = new MockDelayedOperation(100000L);
purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1));
purgatory.tryCompleteElseWatch(r2, Arrays.asList(test1, test2));
purgatory.tryCompleteElseWatch(r3, Arrays.asList(test1, test2, test3));
assertEquals(3, purgatory.numDelayed(), "Purgatory should have 3 total delayed operations");
assertEquals(6, purgatory.watched(), "Purgatory should have 6 watched elements");
// complete the operations, it should immediately be purged from the delayed operation
r2.completable = true;
r2.tryComplete();
assertEquals(2, purgatory.numDelayed(), "Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed());
r3.completable = true;
r3.tryComplete();
assertEquals(1, purgatory.numDelayed(), "Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed());
// checking a watch should purge the watch list
purgatory.checkAndComplete(test1);
assertEquals(4, purgatory.watched(), "Purgatory should have 4 watched elements instead of " + purgatory.watched());
purgatory.checkAndComplete(test2);
assertEquals(2, purgatory.watched(), "Purgatory should have 2 watched elements instead of " + purgatory.watched());
purgatory.checkAndComplete(test3);
assertEquals(1, purgatory.watched(), "Purgatory should have 1 watched elements instead of " + purgatory.watched());
}
@Test
public void shouldCancelForKeyReturningCancelledOperations() {
purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1));
purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1));
purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test2));
List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1);
assertEquals(2, cancelledOperations.size());
assertEquals(1, purgatory.numDelayed());
assertEquals(1, purgatory.watched());
}
@Test
public void shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist() {
List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1);
assertTrue(cancelledOperations.isEmpty());
}
/**
* Test `tryComplete` with multiple threads to verify that there are no timing windows
* when completion is not performed even if the thread that makes the operation completable
* may not be able to acquire the operation lock. Since it is difficult to test all scenarios,
* this test uses random delays with a large number of threads.
*/
@Test
public void testTryCompleteWithMultipleThreads() throws ExecutionException, InterruptedException {
executorService = Executors.newScheduledThreadPool(20);
int maxDelayMs = 10;
int completionAttempts = 20;
List<TestDelayOperation> ops = new ArrayList<>();
for (int i = 0; i < 100; i++) {
TestDelayOperation op = new TestDelayOperation(i, completionAttempts, maxDelayMs);
purgatory.tryCompleteElseWatch(op, Collections.singletonList(op.key));
ops.add(op);
}
List<Future<?>> futures = new ArrayList<>();
for (int i = 1; i <= completionAttempts; i++) {
for (TestDelayOperation op : ops) {
futures.add(scheduleTryComplete(executorService, op, random.nextInt(maxDelayMs)));
}
}
for (Future<?> future : futures) {
future.get();
}
ops.forEach(op -> assertTrue(op.isCompleted(), "Operation " + op.key.keyLabel() + " should have completed"));
}
private Future<?> scheduleTryComplete(ScheduledExecutorService executorService, TestDelayOperation op, long delayMs) {
return executorService.schedule(() -> {
if (op.completionAttemptsRemaining.decrementAndGet() == 0) {
op.completable = true;
}
purgatory.checkAndComplete(op.key);
}, delayMs, TimeUnit.MILLISECONDS);
}
private static class MockDelayedOperation extends DelayedOperation {
private final Optional<Lock> responseLockOpt;
boolean completable = false;
MockDelayedOperation(long delayMs) {
this(delayMs, Optional.empty());
}
MockDelayedOperation(long delayMs, Optional<Lock> responseLockOpt) {
super(delayMs);
this.responseLockOpt = responseLockOpt;
}
@Override
public boolean tryComplete() {
if (completable) {
return forceComplete();
} else {
return false;
}
}
@Override
public void onExpiration() { }
@Override
public void onComplete() {
responseLockOpt.ifPresent(lock -> {
if (!lock.tryLock())
throw new IllegalStateException("Response callback lock could not be acquired in callback");
});
synchronized (this) {
notify();
}
}
void awaitExpiration() throws InterruptedException {
synchronized (this) {
wait();
}
}
}
private class TestDelayOperation extends MockDelayedOperation {
private final MockKey key;
private final AtomicInteger completionAttemptsRemaining;
private final int maxDelayMs;
TestDelayOperation(int index, int completionAttempts, int maxDelayMs) {
super(10000L, Optional.empty());
key = new MockKey("key" + index);
completionAttemptsRemaining = new AtomicInteger(completionAttempts);
this.maxDelayMs = maxDelayMs;
}
@Override
public boolean tryComplete() {
boolean shouldComplete = completable;
try {
Thread.sleep(random.nextInt(maxDelayMs));
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
if (shouldComplete)
return forceComplete();
else
return false;
}
}
}