diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index 2e587142372..c576ea76f9a 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -93,6 +93,11 @@
+
+
+
+
+
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f31a67260ea..5d2ae4cf2e0 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -19,10 +19,8 @@ package kafka.log.remote;
import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
-import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.StopPartition;
-import kafka.server.TopicPartitionOperationKey;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
@@ -68,6 +66,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.purgatory.TopicPartitionOperationKey;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 79dd542f943..f64dc96e6e5 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -24,7 +24,6 @@ import kafka.server.AlterPartitionManager;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
-import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
import kafka.server.DelayedRemoteListOffsets;
@@ -39,6 +38,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
+import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 8994d1b1188..0d25d850ff7 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
-import kafka.server.DelayedOperation;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@@ -25,6 +24,7 @@ import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -42,7 +42,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
@@ -70,7 +69,7 @@ public class DelayedShareFetch extends DelayedOperation {
ReplicaManager replicaManager,
SharePartitionManager sharePartitionManager,
LinkedHashMap sharePartitions) {
- super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
+ super(shareFetchData.fetchParams().maxWaitMs, Optional.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java
index 4e9e4dbbe88..38475182985 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java
@@ -16,8 +16,6 @@
*/
package kafka.server.share;
-import kafka.server.DelayedOperationKey;
-
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@@ -25,7 +23,7 @@ import java.util.Objects;
/**
* A key for delayed share fetch purgatory that refers to the share partition.
*/
-public class DelayedShareFetchGroupKey implements DelayedShareFetchKey, DelayedOperationKey {
+public class DelayedShareFetchGroupKey implements DelayedShareFetchKey {
private final String groupId;
private final Uuid topicId;
private final int partition;
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
index d12c30bd289..7979514a830 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
@@ -16,8 +16,10 @@
*/
package kafka.server.share;
+import org.apache.kafka.server.purgatory.DelayedOperationKey;
+
/**
* A key for delayed operations that fetch data for share consumers.
*/
-public interface DelayedShareFetchKey {
+public interface DelayedShareFetchKey extends DelayedOperationKey {
}
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java
index 02f0439cea1..af7f92aa76e 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java
@@ -16,8 +16,6 @@
*/
package kafka.server.share;
-import kafka.server.DelayedOperationKey;
-
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@@ -25,7 +23,7 @@ import java.util.Objects;
/**
* A key for delayed share fetch purgatory that refers to the topic partition.
*/
-public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey, DelayedOperationKey {
+public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey {
private final Uuid topicId;
private final int partition;
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index b427f0a87dc..804af3e1c87 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -64,7 +64,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -74,8 +73,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import scala.jdk.javaapi.CollectionConverters;
-
/**
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
* It is responsible for fetching messages from the log and acknowledging the messages.
@@ -546,9 +543,8 @@ public class SharePartitionManager implements AutoCloseable {
// Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be
// completed else watch until it can be completed/timeout.
- private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set keys) {
- replicaManager.addDelayedShareFetchRequest(delayedShareFetch,
- CollectionConverters.asScala(keys).toSeq());
+ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, List keys) {
+ replicaManager.addDelayedShareFetchRequest(delayedShareFetch, keys);
}
@Override
@@ -574,7 +570,7 @@ public class SharePartitionManager implements AutoCloseable {
// Initialize lazily, if required.
Map erroneous = null;
- Set delayedShareFetchWatchKeys = new HashSet<>();
+ List delayedShareFetchWatchKeys = new ArrayList<>();
LinkedHashMap sharePartitions = new LinkedHashMap<>();
for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) {
SharePartitionKey sharePartitionKey = sharePartitionKey(
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 77a7e6d46cb..636a64c7f01 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -45,6 +45,7 @@ import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level
@@ -93,18 +94,18 @@ class DelayedOperations(topicId: Option[Uuid],
produce: DelayedOperationPurgatory[DelayedProduce],
fetch: DelayedOperationPurgatory[DelayedFetch],
deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords],
- shareFetch: DelayedOperationPurgatory[DelayedShareFetch]) {
+ shareFetch: DelayedOperationPurgatory[DelayedShareFetch]) extends Logging {
def checkAndCompleteAll(): Unit = {
- val requestKey = TopicPartitionOperationKey(topicPartition)
- CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), fetch, Level.ERROR)
- CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), produce, Level.ERROR)
- CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), deleteRecords, Level.ERROR)
+ val requestKey = new TopicPartitionOperationKey(topicPartition)
+ CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), this, Level.ERROR)
+ CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), this, Level.ERROR)
+ CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), this, Level.ERROR)
if (topicId.isDefined) CoreUtils.swallow(() -> shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(
- topicId.get, topicPartition.partition())), shareFetch, Level.ERROR)
+ topicId.get, topicPartition.partition())), this, Level.ERROR)
}
- def numDelayedDelete: Int = deleteRecords.numDelayed
+ def numDelayedDelete: Int = deleteRecords.numDelayed()
}
object Partition {
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 55a0496dda0..1dd2793c553 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.group
-import kafka.server.DelayedOperation
+import org.apache.kafka.server.purgatory.DelayedOperation
/**
* Delayed heartbeat operations that are added to the purgatory for session timeout checking.
@@ -28,7 +28,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
memberId: String,
isPending: Boolean,
timeoutMs: Long)
- extends DelayedOperation(timeoutMs, Some(group.lock)) {
+ extends DelayedOperation(timeoutMs, group.lock) {
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, forceComplete _)
override def onExpiration(): Unit = coordinator.onExpireHeartbeat(group, memberId, isPending)
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 22dfa9d157b..51eeb324b1c 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -17,8 +17,8 @@
package kafka.coordinator.group
-import kafka.server.{DelayedOperationPurgatory, GroupJoinKey}
-
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKey}
+import java.util
import scala.math.{max, min}
/**
@@ -85,7 +85,7 @@ private[group] class InitialDelayedJoin(
configuredRebalanceDelay,
delay,
remaining
- ), Seq(GroupJoinKey(group.groupId)))
+ ), util.List.of(new GroupJoinKey(group.groupId)))
} else
super.onComplete()
}
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
index bad109a9123..2c327b922eb 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
@@ -17,8 +17,7 @@
package kafka.coordinator.group
-import kafka.server.DelayedOperation
-
+import org.apache.kafka.server.purgatory.DelayedOperation
import java.util.concurrent.locks.Lock
/**
@@ -30,5 +29,5 @@ private[group] abstract class DelayedRebalance(
groupLock: Lock
) extends DelayedOperation(
rebalanceTimeoutMs,
- Some(groupLock)
+ groupLock
)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 9bde8ca4017..8ca956daedf 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -16,6 +16,7 @@
*/
package kafka.coordinator.group
+import java.util
import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.server._
@@ -33,6 +34,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.{Group, OffsetAndMetadata, OffsetConfig}
import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKey, GroupSyncKey, MemberKey}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
@@ -226,7 +228,7 @@ private[group] class GroupCoordinator(
// attempt to complete JoinGroup
if (group.is(PreparingRebalance)) {
- rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
+ rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
}
}
}
@@ -690,7 +692,7 @@ private[group] class GroupCoordinator(
}
} else if (group.isPendingMember(memberId)) {
removePendingMemberAndUpdateGroup(group, memberId)
- heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
+ heartbeatPurgatory.checkAndComplete(new MemberKey(group.groupId, memberId))
info(s"Pending member with memberId=$memberId has left group ${group.groupId} " +
s"through explicit `LeaveGroup` request")
memberLeaveError(leavingMember, Errors.NONE)
@@ -1197,12 +1199,12 @@ private[group] class GroupCoordinator(
group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NOT_COORDINATOR))
}
- rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
+ rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
case Stable | CompletingRebalance =>
for (member <- group.allMemberMetadata) {
group.maybeInvokeSyncCallback(member, SyncGroupResult(Errors.NOT_COORDINATOR))
- heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, member.memberId))
+ heartbeatPurgatory.checkAndComplete(new MemberKey(group.groupId, member.memberId))
}
}
@@ -1283,7 +1285,7 @@ private[group] class GroupCoordinator(
}
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
- val memberKey = MemberKey(group.groupId, member.memberId)
+ val memberKey = new MemberKey(group.groupId, member.memberId)
// complete current heartbeat expectation
member.heartbeatSatisfied = true
@@ -1292,20 +1294,20 @@ private[group] class GroupCoordinator(
// reschedule the next heartbeat expiration deadline
member.heartbeatSatisfied = false
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
- heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
+ heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, util.Collections.singletonList(memberKey))
}
/**
* Add pending member expiration to heartbeat purgatory
*/
private def addPendingMemberExpiration(group: GroupMetadata, pendingMemberId: String, timeoutMs: Long): Unit = {
- val pendingMemberKey = MemberKey(group.groupId, pendingMemberId)
+ val pendingMemberKey = new MemberKey(group.groupId, pendingMemberId)
val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, timeoutMs)
- heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey))
+ heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, util.Collections.singletonList(pendingMemberKey))
}
private def removeHeartbeatForLeavingMember(group: GroupMetadata, memberId: String): Unit = {
- val memberKey = MemberKey(group.groupId, memberId)
+ val memberKey = new MemberKey(group.groupId, memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
}
@@ -1498,8 +1500,8 @@ private[group] class GroupCoordinator(
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")
- val groupKey = GroupJoinKey(group.groupId)
- rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
+ val groupKey = new GroupJoinKey(group.groupId)
+ rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, util.Collections.singletonList(groupKey))
}
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
@@ -1512,7 +1514,7 @@ private[group] class GroupCoordinator(
group.currentState match {
case Dead | Empty =>
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
- case PreparingRebalance => rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
+ case PreparingRebalance => rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
}
}
@@ -1520,7 +1522,7 @@ private[group] class GroupCoordinator(
group.remove(memberId)
if (group.is(PreparingRebalance)) {
- rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
+ rebalancePurgatory.checkAndComplete(new GroupJoinKey(group.groupId))
}
}
@@ -1554,7 +1556,7 @@ private[group] class GroupCoordinator(
error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
rebalancePurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
- Seq(GroupJoinKey(group.groupId)))
+ util.Collections.singletonList(new GroupJoinKey(group.groupId)))
} else {
group.initNextGeneration()
if (group.is(Empty)) {
@@ -1620,7 +1622,7 @@ private[group] class GroupCoordinator(
private def maybeCompleteSyncExpiration(
group: GroupMetadata
): Unit = {
- val groupKey = GroupSyncKey(group.groupId)
+ val groupKey = new GroupSyncKey(group.groupId)
rebalancePurgatory.checkAndComplete(groupKey)
}
@@ -1628,8 +1630,8 @@ private[group] class GroupCoordinator(
group: GroupMetadata
): Unit = {
val delayedSync = new DelayedSync(this, group, group.generationId, group.rebalanceTimeoutMs)
- val groupKey = GroupSyncKey(group.groupId)
- rebalancePurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+ val groupKey = new GroupSyncKey(group.groupId)
+ rebalancePurgatory.tryCompleteElseWatch(delayedSync, util.Collections.singletonList(groupKey))
}
def tryCompletePendingSync(
@@ -1763,8 +1765,8 @@ object GroupCoordinator {
time: Time,
metrics: Metrics
): GroupCoordinator = {
- val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
- val rebalancePurgatory = DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
+ val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
+ val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}
diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
index 35acf1c3680..5f204a24ff1 100644
--- a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
@@ -17,9 +17,11 @@
package kafka.server
+import kafka.utils.Logging
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
@@ -49,7 +51,7 @@ class DelayedCreatePartitions(delayMs: Long,
createMetadata: Seq[CreatePartitionsMetadata],
adminManager: ZkAdminManager,
responseCallback: Map[String, ApiError] => Unit)
- extends DelayedOperation(delayMs) {
+ extends DelayedOperation(delayMs) with Logging {
/**
* The operation can be completed if all of the topics that do not have an error exist and every partition has a
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 0375cc8a072..c2cffb6a077 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -18,13 +18,15 @@
package kafka.server
-import java.util.concurrent.TimeUnit
+import kafka.utils.Logging
+import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.DeleteRecordsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
@@ -45,7 +47,7 @@ class DelayedDeleteRecords(delayMs: Long,
deleteRecordsStatus: Map[TopicPartition, DeleteRecordsPartitionStatus],
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult] => Unit)
- extends DelayedOperation(delayMs) {
+ extends DelayedOperation(delayMs) with Logging {
// first update the acks pending variable according to the error code
deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala b/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala
index 523252cea14..4ec4698aecb 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala
@@ -17,7 +17,9 @@
package kafka.server
+import kafka.utils.Logging
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
@@ -40,7 +42,7 @@ class DelayedDeleteTopics(delayMs: Long,
deleteMetadata: Seq[DeleteTopicMetadata],
adminManager: ZkAdminManager,
responseCallback: Map[String, Errors] => Unit)
- extends DelayedOperation(delayMs) {
+ extends DelayedOperation(delayMs) with Logging {
/**
* The operation can be completed if all of the topics not in error have been removed
diff --git a/core/src/main/scala/kafka/server/DelayedElectLeader.scala b/core/src/main/scala/kafka/server/DelayedElectLeader.scala
index cd0a8040589..e7171cff985 100644
--- a/core/src/main/scala/kafka/server/DelayedElectLeader.scala
+++ b/core/src/main/scala/kafka/server/DelayedElectLeader.scala
@@ -17,9 +17,11 @@
package kafka.server
+import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection.{Map, mutable}
@@ -32,7 +34,7 @@ class DelayedElectLeader(
results: Map[TopicPartition, ApiError],
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, ApiError] => Unit
-) extends DelayedOperation(delayMs) {
+) extends DelayedOperation(delayMs) with Logging {
private val waitingPartitions = mutable.Map() ++= expectedLeaders
private val fullResults = mutable.Map() ++= results
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 46fff1604ae..74a3e2b1a29 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -18,6 +18,7 @@
package kafka.server
import com.yammer.metrics.core.Meter
+import kafka.utils.Logging
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicIdPartition
@@ -26,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
@@ -51,7 +53,7 @@ class DelayedFetch(
replicaManager: ReplicaManager,
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
-) extends DelayedOperation(params.maxWaitMs) {
+) extends DelayedOperation(params.maxWaitMs) with Logging {
override def toString: String = {
s"DelayedFetch(params=$params" +
diff --git a/core/src/main/scala/kafka/server/DelayedFuture.scala b/core/src/main/scala/kafka/server/DelayedFuture.scala
index 0bbe07f8dd9..a24bc387089 100644
--- a/core/src/main/scala/kafka/server/DelayedFuture.scala
+++ b/core/src/main/scala/kafka/server/DelayedFuture.scala
@@ -17,11 +17,14 @@
package kafka.server
+import kafka.utils.Logging
+
+import java.util
import java.util.concurrent._
import java.util.function.BiConsumer
-
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.KafkaThread
+import org.apache.kafka.server.purgatory.{DelayedOperation, DelayedOperationKey, DelayedOperationPurgatory}
import scala.collection.Seq
@@ -32,7 +35,7 @@ import scala.collection.Seq
class DelayedFuture[T](timeoutMs: Long,
futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit)
- extends DelayedOperation(timeoutMs) {
+ extends DelayedOperation(timeoutMs) with Logging {
/**
* The operation can be completed if all the futures have completed successfully
@@ -70,19 +73,21 @@ class DelayedFuture[T](timeoutMs: Long,
}
class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) {
- private val purgatory = DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
+ private val purgatory = new DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
private val executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](),
new ThreadFactory {
override def newThread(r: Runnable): Thread = new KafkaThread(s"DelayedExecutor-$purgatoryName", r, true)
})
- private val purgatoryKey = new Object
+ private val purgatoryKey = new DelayedOperationKey() {
+ override def keyLabel(): String = "delayed-future-key"
+ }
def tryCompleteElseWatch[T](timeoutMs: Long,
futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit): DelayedFuture[T] = {
val delayedFuture = new DelayedFuture[T](timeoutMs, futures, responseCallback)
- val done = purgatory.tryCompleteElseWatch(delayedFuture, Seq(purgatoryKey))
+ val done = purgatory.tryCompleteElseWatch(delayedFuture, util.Collections.singletonList(purgatoryKey))
if (!done) {
val callbackAction = new BiConsumer[Void, Throwable]() {
override def accept(result: Void, exception: Throwable): Unit = delayedFuture.forceComplete()
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
deleted file mode 100644
index 0e1dfbb894e..00000000000
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ /dev/null
@@ -1,452 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent._
-import java.util.concurrent.atomic._
-import java.util.concurrent.locks.{Lock, ReentrantLock}
-import kafka.utils.CoreUtils.inLock
-import kafka.utils._
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.util.ShutdownableThread
-import org.apache.kafka.server.util.timer.{SystemTimer, Timer, TimerTask}
-
-import scala.collection._
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-
-/**
- * An operation whose processing needs to be delayed for at most the given delayMs. For example
- * a delayed produce operation could be waiting for specified number of acks; or
- * a delayed fetch operation could be waiting for a given number of bytes to accumulate.
- *
- * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
- * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
- * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
- * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
- * forceComplete().
- *
- * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
- *
- * Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete()
- * like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction().
- */
-abstract class DelayedOperation(delayMs: Long,
- lockOpt: Option[Lock] = None)
- extends TimerTask(delayMs) with Logging {
-
- private val completed = new AtomicBoolean(false)
- // Visible for testing
- private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
-
- /*
- * Force completing the delayed operation, if not already completed.
- * This function can be triggered when
- *
- * 1. The operation has been verified to be completable inside tryComplete()
- * 2. The operation has expired and hence needs to be completed right now
- *
- * Return true iff the operation is completed by the caller: note that
- * concurrent threads can try to complete the same operation, but only
- * the first thread will succeed in completing the operation and return
- * true, others will still return false
- */
- def forceComplete(): Boolean = {
- if (completed.compareAndSet(false, true)) {
- // cancel the timeout timer
- cancel()
- onComplete()
- true
- } else {
- false
- }
- }
-
- /**
- * Check if the delayed operation is already completed
- */
- def isCompleted: Boolean = completed.get()
-
- /**
- * Call-back to execute when a delayed operation gets expired and hence forced to complete.
- */
- def onExpiration(): Unit
-
- /**
- * Process for completing an operation; This function needs to be defined
- * in subclasses and will be called exactly once in forceComplete()
- */
- def onComplete(): Unit
-
- /**
- * Try to complete the delayed operation by first checking if the operation
- * can be completed by now. If yes execute the completion logic by calling
- * forceComplete() and return true iff forceComplete returns true; otherwise return false
- *
- * This function needs to be defined in subclasses
- */
- def tryComplete(): Boolean
-
- /**
- * Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
- * @param f else function to be executed after first tryComplete returns false
- * @return result of tryComplete
- */
- private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
- if (tryComplete()) true
- else {
- f
- // last completion check
- tryComplete()
- }
- }
-
- /**
- * Thread-safe variant of tryComplete()
- */
- private[server] def safeTryComplete(): Boolean = inLock(lock) {
- if (isCompleted)
- false
- else
- tryComplete()
- }
-
- /*
- * run() method defines a task that is executed on timeout
- */
- override def run(): Unit = {
- if (forceComplete())
- onExpiration()
- }
-}
-
-object DelayedOperationPurgatory {
-
- private val Shards = 512 // Shard the watcher list to reduce lock contention
-
- def apply[T <: DelayedOperation](purgatoryName: String,
- brokerId: Int = 0,
- purgeInterval: Int = 1000,
- reaperEnabled: Boolean = true,
- timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
- val timer = new SystemTimer(purgatoryName)
- new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled)
- }
-
-}
-
-/**
- * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
- */
-final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
- timeoutTimer: Timer,
- brokerId: Int = 0,
- purgeInterval: Int = 1000,
- reaperEnabled: Boolean = true,
- timerEnabled: Boolean = true)
- extends Logging {
- private val metricsGroup = new KafkaMetricsGroup(this.getClass)
-
- /* a list of operation watching keys */
- private class WatcherList {
- val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
-
- val watchersLock = new ReentrantLock()
-
- /*
- * Return all the current watcher lists,
- * note that the returned watchers may be removed from the list by other threads
- */
- def allWatchers: Iterable[Watchers] = {
- watchersByKey.values
- }
- }
-
- private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)
- private def watcherList(key: Any): WatcherList = {
- watcherLists(Math.abs(key.hashCode() % watcherLists.length))
- }
-
- // the number of estimated total operations in the purgatory
- private[this] val estimatedTotalOperations = new AtomicInteger(0)
-
- /* background thread expiring operations that have timed out */
- private val expirationReaper = new ExpiredOperationReaper()
-
- private val metricsTags = Map("delayedOperation" -> purgatoryName).asJava
- metricsGroup.newGauge("PurgatorySize", () => watched, metricsTags)
- metricsGroup.newGauge("NumDelayedOperations", () => numDelayed, metricsTags)
-
- if (reaperEnabled)
- expirationReaper.start()
-
- /**
- * Check if the operation can be completed, if not watch it based on the given watch keys
- *
- * Note that a delayed operation can be watched on multiple keys. It is possible that
- * an operation is completed after it has been added to the watch list for some, but
- * not all of the keys. In this case, the operation is considered completed and won't
- * be added to the watch list of the remaining keys. The expiration reaper thread will
- * remove this operation from any watcher list in which the operation exists.
- *
- * @param operation the delayed operation to be checked
- * @param watchKeys keys for bookkeeping the operation
- * @return true iff the delayed operations can be completed by the caller
- */
- def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
- assert(watchKeys.nonEmpty, "The watch key list can't be empty")
-
- // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is
- // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse().
- // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At
- // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering
- // event since the operation is already on the watcher list for all keys.
- //
- // ==============[story about lock]==============
- // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing
- // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and
- // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete()
- // 1) thread_a holds readlock of stateLock from TransactionStateManager
- // 2) thread_a is executing tryCompleteElseWatch()
- // 3) thread_a adds op to watch list
- // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a)
- // 5) thread_c calls checkAndComplete() and holds lock of op
- // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b)
- // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c)
- //
- // Note that even with the current approach, deadlocks could still be introduced. For example,
- // 1) thread_a calls tryCompleteElseWatch() and gets lock of op
- // 2) thread_a adds op to watch list
- // 3) thread_a calls op#tryComplete and tries to require lock_b
- // 4) thread_b holds lock_b and calls checkAndComplete()
- // 5) thread_b sees op from watch list
- // 6) thread_b needs lock of op
- // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding
- // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
- // holding a exclusive lock to make the call is often unnecessary.
- if (operation.safeTryCompleteOrElse {
- watchKeys.foreach(key => watchForOperation(key, operation))
- if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
- }) return true
-
- // if it cannot be completed by now and hence is watched, add to the expire queue also
- if (!operation.isCompleted) {
- if (timerEnabled)
- timeoutTimer.add(operation)
- if (operation.isCompleted) {
- // cancel the timer task
- operation.cancel()
- }
- }
-
- false
- }
-
- /**
- * Check if some delayed operations can be completed with the given watch key,
- * and if yes complete them.
- *
- * @return the number of completed operations during this process
- */
- def checkAndComplete(key: Any): Int = {
- val wl = watcherList(key)
- val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
- val numCompleted = if (watchers == null)
- 0
- else
- watchers.tryCompleteWatched()
- if (numCompleted > 0) {
- debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
- }
- numCompleted
- }
-
- /**
- * Return the total size of watch lists the purgatory. Since an operation may be watched
- * on multiple lists, and some of its watched entries may still be in the watch lists
- * even when it has been completed, this number may be larger than the number of real operations watched
- */
- def watched: Int = {
- watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.countWatched).sum }
- }
-
- /**
- * Return the number of delayed operations in the expiry queue
- */
- def numDelayed: Int = timeoutTimer.size
-
- /**
- * Cancel watching on any delayed operations for the given key. Note the operation will not be completed
- */
- def cancelForKey(key: Any): List[T] = {
- val wl = watcherList(key)
- inLock(wl.watchersLock) {
- val watchers = wl.watchersByKey.remove(key)
- if (watchers != null)
- watchers.cancel()
- else
- Nil
- }
- }
-
- /*
- * Return the watch list of the given key, note that we need to
- * grab the removeWatchersLock to avoid the operation being added to a removed watcher list
- */
- private def watchForOperation(key: Any, operation: T): Unit = {
- val wl = watcherList(key)
- inLock(wl.watchersLock) {
- val watcher = wl.watchersByKey.getAndMaybePut(key)
- watcher.watch(operation)
- }
- }
-
- /*
- * Remove the key from watcher lists if its list is empty
- */
- private def removeKeyIfEmpty(key: Any, watchers: Watchers): Unit = {
- val wl = watcherList(key)
- inLock(wl.watchersLock) {
- // if the current key is no longer correlated to the watchers to remove, skip
- if (wl.watchersByKey.get(key) != watchers)
- return
-
- if (watchers != null && watchers.isEmpty) {
- wl.watchersByKey.remove(key)
- }
- }
- }
-
- /**
- * Shutdown the expire reaper thread
- */
- def shutdown(): Unit = {
- if (reaperEnabled) {
- expirationReaper.initiateShutdown()
- // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op
- timeoutTimer.add(new TimerTask(0) {
- override def run(): Unit = {}
- })
- expirationReaper.awaitShutdown()
- }
- timeoutTimer.close()
- metricsGroup.removeMetric("PurgatorySize", metricsTags)
- metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
- }
-
- /**
- * A linked list of watched delayed operations based on some key
- */
- private class Watchers(val key: Any) {
- private[this] val operations = new ConcurrentLinkedQueue[T]()
-
- // count the current number of watched operations. This is O(n), so use isEmpty() if possible
- def countWatched: Int = operations.size
-
- def isEmpty: Boolean = operations.isEmpty
-
- // add the element to watch
- def watch(t: T): Unit = {
- operations.add(t)
- }
-
- // traverse the list and try to complete some watched elements
- def tryCompleteWatched(): Int = {
- var completed = 0
-
- val iter = operations.iterator()
- while (iter.hasNext) {
- val curr = iter.next()
- if (curr.isCompleted) {
- // another thread has completed this operation, just remove it
- iter.remove()
- } else if (curr.safeTryComplete()) {
- iter.remove()
- completed += 1
- }
- }
-
- if (operations.isEmpty)
- removeKeyIfEmpty(key, this)
-
- completed
- }
-
- def cancel(): List[T] = {
- val iter = operations.iterator()
- val cancelled = new ListBuffer[T]()
- while (iter.hasNext) {
- val curr = iter.next()
- curr.cancel()
- iter.remove()
- cancelled += curr
- }
- cancelled.toList
- }
-
- // traverse the list and purge elements that are already completed by others
- def purgeCompleted(): Int = {
- var purged = 0
-
- val iter = operations.iterator()
- while (iter.hasNext) {
- val curr = iter.next()
- if (curr.isCompleted) {
- iter.remove()
- purged += 1
- }
- }
-
- if (operations.isEmpty)
- removeKeyIfEmpty(key, this)
-
- purged
- }
- }
-
- def advanceClock(timeoutMs: Long): Unit = {
- timeoutTimer.advanceClock(timeoutMs)
-
- // Trigger a purge if the number of completed but still being watched operations is larger than
- // the purge threshold. That number is computed by the difference btw the estimated total number of
- // operations and the number of pending delayed operations.
- if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
- // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
- // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
- // a little overestimated total number of operations.
- estimatedTotalOperations.getAndSet(numDelayed)
- debug("Begin purging watch lists")
- val purged = watcherLists.foldLeft(0) {
- case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
- }
- debug("Purged %d elements from watch lists.".format(purged))
- }
- }
-
- /**
- * A background reaper to expire delayed operations that have timed out
- */
- private class ExpiredOperationReaper extends ShutdownableThread(
- "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
- false) {
-
- override def doWork(): Unit = {
- advanceClock(200L)
- }
- }
-}
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
deleted file mode 100644
index 037b38ca79c..00000000000
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
-
-/**
- * Keys used for delayed operation metrics recording
- */
-trait DelayedOperationKey {
- def keyLabel: String
-}
-
-/* used by delayed-produce and delayed-fetch operations */
-case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
- override def keyLabel: String = "%s-%d".format(topic, partition)
-}
-
-object TopicPartitionOperationKey {
- def apply(topicPartition: TopicPartition): TopicPartitionOperationKey = {
- apply(topicPartition.topic, topicPartition.partition)
- }
- def apply(topicIdPartition: TopicIdPartition): TopicPartitionOperationKey = {
- apply(topicIdPartition.topic, topicIdPartition.partition)
- }
-}
-
-/* used by delayed-join-group operations */
-case class MemberKey(groupId: String, consumerId: String) extends DelayedOperationKey {
- override def keyLabel: String = "%s-%s".format(groupId, consumerId)
-}
-
-/* used by delayed-join operations */
-case class GroupJoinKey(groupId: String) extends DelayedOperationKey {
- override def keyLabel: String = "join-%s".format(groupId)
-}
-
-/* used by delayed-sync operations */
-case class GroupSyncKey(groupId: String) extends DelayedOperationKey {
- override def keyLabel: String = "sync-%s".format(groupId)
-}
-
-/* used by delayed-topic operations */
-case class TopicKey(topic: String) extends DelayedOperationKey {
- override def keyLabel: String = topic
-}
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index c6470c81358..9c212416ce6 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -21,14 +21,16 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
-import kafka.utils.Pool
+import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection._
import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters.RichOption
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
@@ -58,8 +60,8 @@ class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
- lockOpt: Option[Lock] = None)
- extends DelayedOperation(delayMs, lockOpt) {
+ lockOpt: Option[Lock])
+ extends DelayedOperation(delayMs, lockOpt.toJava) with Logging {
override lazy val logger: Logger = DelayedProduce.logger
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index bd644929f42..45bfe69844a 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -18,10 +18,12 @@
package kafka.server
import com.yammer.metrics.core.Meter
+import kafka.utils.Logging
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
@@ -42,7 +44,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
- extends DelayedOperation(remoteFetchMaxWaitMs) {
+ extends DelayedOperation(remoteFetchMaxWaitMs) with Logging {
if (fetchParams.isFromFollower) {
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
index 1e917c49694..b75a4252255 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
@@ -17,13 +17,14 @@
package kafka.server
import com.yammer.metrics.core.Meter
-import kafka.utils.Pool
+import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.purgatory.DelayedOperation
import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable}
@@ -33,7 +34,8 @@ class DelayedRemoteListOffsets(delayMs: Long,
version: Int,
statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus],
replicaManager: ReplicaManager,
- responseCallback: List[ListOffsetsTopicResponse] => Unit) extends DelayedOperation(delayMs) {
+ responseCallback: List[ListOffsetsTopicResponse] => Unit)
+ extends DelayedOperation(delayMs) with Logging {
// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
statusByPartition.foreachEntry { (topicPartition, status) =>
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0ef77fb6376..0cba5cf4e41 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -73,6 +73,7 @@ import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
+import org.apache.kafka.server.purgatory.TopicPartitionOperationKey
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.ErroneousAndValidPartitionData
@@ -417,7 +418,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (replicaManager.hasDelayedElectionOperations) {
updateMetadataRequest.partitionStates.forEach { partitionState =>
val tp = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
- replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp))
+ replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp))
}
}
requestHelper.sendResponseExemptThrottle(request, new UpdateMetadataResponse(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ca5acf79b50..530dbe5b53f 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -60,6 +60,7 @@ import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, Topi
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
+import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
@@ -298,30 +299,30 @@ class ReplicaManager(val config: KafkaConfig,
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedProduce](
- purgatoryName = "Produce", brokerId = config.brokerId,
- purgeInterval = config.producerPurgatoryPurgeIntervalRequests))
+ new DelayedOperationPurgatory[DelayedProduce](
+ "Produce", config.brokerId,
+ config.producerPurgatoryPurgeIntervalRequests))
val delayedFetchPurgatory = delayedFetchPurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedFetch](
- purgatoryName = "Fetch", brokerId = config.brokerId,
- purgeInterval = config.fetchPurgatoryPurgeIntervalRequests))
+ new DelayedOperationPurgatory[DelayedFetch](
+ "Fetch", config.brokerId,
+ config.fetchPurgatoryPurgeIntervalRequests))
val delayedDeleteRecordsPurgatory = delayedDeleteRecordsPurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedDeleteRecords](
- purgatoryName = "DeleteRecords", brokerId = config.brokerId,
- purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests))
+ new DelayedOperationPurgatory[DelayedDeleteRecords](
+ "DeleteRecords", config.brokerId,
+ config.deleteRecordsPurgatoryPurgeIntervalRequests))
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedElectLeader](
- purgatoryName = "ElectLeader", brokerId = config.brokerId))
+ new DelayedOperationPurgatory[DelayedElectLeader](
+ "ElectLeader", config.brokerId))
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedRemoteFetch](
- purgatoryName = "RemoteFetch", brokerId = config.brokerId))
+ new DelayedOperationPurgatory[DelayedRemoteFetch](
+ "RemoteFetch", config.brokerId))
val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedRemoteListOffsets](
- purgatoryName = "RemoteListOffsets", brokerId = config.brokerId))
+ new DelayedOperationPurgatory[DelayedRemoteListOffsets](
+ "RemoteListOffsets", config.brokerId))
val delayedShareFetchPurgatory = delayedShareFetchPurgatoryParam.getOrElse(
- DelayedOperationPurgatory[DelayedShareFetch](
- purgatoryName = "ShareFetch", brokerId = config.brokerId,
- purgeInterval = config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))
+ new DelayedOperationPurgatory[DelayedShareFetch](
+ "ShareFetch", config.brokerId,
+ config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))
/* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
@@ -471,7 +472,7 @@ class ReplicaManager(val config: KafkaConfig,
}
private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition, topicId: Option[Uuid]): Unit = {
- val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
+ val topicPartitionOperationKey = new TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
@@ -486,7 +487,7 @@ class ReplicaManager(val config: KafkaConfig,
* after successfully replicating from the leader.
*/
private[server] def completeDelayedFetchRequests(topicPartitions: Seq[TopicPartition]): Unit = {
- topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
+ topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(new TopicPartitionOperationKey(tp)))
}
/**
@@ -506,7 +507,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param delayedShareFetchKeys The keys corresponding to which the delayed share fetch request will be stored in the purgatory
*/
private[server] def addDelayedShareFetchRequest(delayedShareFetch: DelayedShareFetch,
- delayedShareFetchKeys : Seq[DelayedShareFetchKey]): Unit = {
+ delayedShareFetchKeys : util.List[DelayedShareFetchKey]): Unit = {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchKeys)
}
@@ -980,7 +981,7 @@ class ReplicaManager(val config: KafkaConfig,
): Unit = {
actionQueue.add {
() => appendResults.foreach { case (topicOptionalIdPartition, result) =>
- val requestKey = TopicPartitionOperationKey(topicOptionalIdPartition.topicPartition)
+ val requestKey = new TopicPartitionOperationKey(topicOptionalIdPartition.topicPartition)
result.info.leaderHwChange match {
case LeaderHwChange.INCREASED =>
// some delayed operations may be unblocked after HW changed
@@ -1014,12 +1015,12 @@ class ReplicaManager(val config: KafkaConfig,
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
- val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
+ val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
- delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
+ delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.asJava)
} else {
// we can respond immediately
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }
@@ -1392,12 +1393,12 @@ class ReplicaManager(val config: KafkaConfig,
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
- val deleteRecordsRequestKeys = offsetPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
+ val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed delete records operation is being created, new
// requests may arrive and hence make this operation completable.
- delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
+ delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys.asJava)
} else {
// we can respond immediately
val deleteRecordsResponseStatus = deleteRecordsStatus.map { case (k, status) => k -> status.responseStatus }
@@ -1601,9 +1602,9 @@ class ReplicaManager(val config: KafkaConfig,
// create delayed remote list offsets operation
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, statusByPartition, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
- val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
+ val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList
// try to complete the request immediately, otherwise put it into the purgatory
- delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
+ delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys.asJava)
} else {
// we can respond immediately
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
@@ -1663,7 +1664,7 @@ class ReplicaManager(val config: KafkaConfig,
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
- delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
+ delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, util.Collections.singletonList(key))
None
}
@@ -1765,12 +1766,12 @@ class ReplicaManager(val config: KafkaConfig,
)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
- val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
+ val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
- delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
+ delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys.asJava)
}
}
}
@@ -2812,8 +2813,8 @@ class ReplicaManager(val config: KafkaConfig,
}
if (expectedLeaders.nonEmpty) {
val watchKeys = expectedLeaders.iterator.map {
- case (tp, _) => TopicPartitionOperationKey(tp)
- }.toBuffer
+ case (tp, _) => new TopicPartitionOperationKey(tp)
+ }.toList.asJava
delayedElectLeaderPurgatory.tryCompleteElseWatch(
new DelayedElectLeader(
diff --git a/core/src/main/scala/kafka/server/TopicKey.scala b/core/src/main/scala/kafka/server/TopicKey.scala
new file mode 100644
index 00000000000..86b4f505b93
--- /dev/null
+++ b/core/src/main/scala/kafka/server/TopicKey.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import org.apache.kafka.server.purgatory.DelayedOperationKey
+
+/* used by delayed-topic operations */
+case class TopicKey(topic: String) extends DelayedOperationKey {
+
+ override def keyLabel: String = topic
+}
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 1d8f18d4f83..99ff17289d3 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigType, QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
import org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG
+import org.apache.kafka.server.purgatory.{DelayedOperation, DelayedOperationPurgatory}
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.{Map, mutable, _}
@@ -74,7 +75,7 @@ class ZkAdminManager(val config: KafkaConfig,
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
- private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
+ private val topicPurgatory = new DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val adminZkClient = new AdminZkClient(zkClient, Some(config))
private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient))
@@ -250,9 +251,9 @@ class ZkAdminManager(val config: KafkaConfig,
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this,
responseCallback)
- val delayedCreateKeys = toCreate.values.map(topic => TopicKey(topic.name)).toBuffer
+ val delayedCreateKeys = toCreate.values.map(topic => TopicKey(topic.name)).toList
// try to complete the request immediately, otherwise put it into the purgatory
- topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
+ topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys.asJava)
}
}
@@ -297,9 +298,9 @@ class ZkAdminManager(val config: KafkaConfig,
} else {
// 3. else pass the topics and errors to the delayed operation and set the keys
val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
- val delayedDeleteKeys = topics.map(TopicKey).toSeq
+ val delayedDeleteKeys = topics.map(TopicKey).toList
// try to complete the request immediately, otherwise put it into the purgatory
- topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
+ topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys.asJava)
}
}
@@ -393,9 +394,9 @@ class ZkAdminManager(val config: KafkaConfig,
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeoutMs, metadata, this, callback)
- val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name))
+ val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name)).toList
// try to complete the request immediately, otherwise put it into the purgatory
- topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
+ topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys.asJava)
}
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 9c9ea3457b1..77d6db80158 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
-import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
@@ -28,6 +27,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.purgatory.DelayedOperationKey;
+import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
@@ -44,17 +45,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import scala.jdk.javaapi.CollectionConverters;
-
import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
@@ -447,7 +446,7 @@ public class DelayedShareFetchTest {
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
- Set