diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1c21038e66a..ee0cf18212a 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -2468,4 +2469,23 @@ public class CoordinatorRuntime, U> implements Aut
Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}
+
+ /**
+ * Util method which returns all the topic partitions for which
+ * the state machine is in active state.
+ *
+ * This could be useful if the caller does not have a specific
+ * target internal topic partition.
+ * @return List of {@link TopicPartition} whose coordinators are active
+ */
+ public List activeTopicPartitions() {
+ if (coordinators == null || coordinators.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return coordinators.entrySet().stream()
+ .filter(entry -> entry.getValue().state.equals(CoordinatorState.ACTIVE))
+ .map(Map.Entry::getKey)
+ .toList();
+ }
}
diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index 47df8bcae34..cb8bec3f71c 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -107,4 +107,15 @@ public interface PartitionWriter {
short producerEpoch,
short apiVersion
) throws KafkaException;
+
+ /**
+ * Delete records from a topic partition until specified offset
+ * @param tp The partition to delete records from
+ * @param deleteBeforeOffset Offset to delete until, starting from the beginning
+ * @throws KafkaException Any KafkaException caught during the operation.
+ */
+ CompletableFuture deleteRecords(
+ TopicPartition tp,
+ long deleteBeforeOffset
+ ) throws KafkaException;
}
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
index 1f676ad550f..a8551f0734b 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
@@ -115,6 +115,14 @@ public class InMemoryPartitionWriter implements PartitionWriter {
}
}
+ @Override
+ public CompletableFuture deleteRecords(
+ TopicPartition tp,
+ long deleteBeforeOffset
+ ) throws KafkaException {
+ throw new RuntimeException("method not implemented");
+ }
+
@Override
public CompletableFuture maybeStartTransactionVerification(
TopicPartition tp,
diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index f70819ef438..211be799a7e 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
// Required offset.
partitionResult.lastOffset + 1
}
+
+ override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {
+ val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()
+
+ replicaManager.deleteRecords(
+ timeout = 30000L, // 30 seconds.
+ offsetPerPartition = Map(tp -> deleteBeforeOffset),
+ responseCallback = results => {
+ val result = results.get(tp)
+ if (result.isEmpty) {
+ responseFuture.completeExceptionally(new IllegalStateException(s"Delete status $result should have partition $tp."))
+ } else if (result.get.errorCode != Errors.NONE.code) {
+ responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception)
+ } else {
+ responseFuture.complete(null)
+ }
+ },
+ allowInternalTopicDeletion = true
+ )
+ responseFuture
+ }
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 67cc1a84c56..cb7005f4020 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
*/
- private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+ private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
- // reject delete records operation on internal topics
- if (Topic.isInternal(topicPartition.topic)) {
+ // reject delete records operation for internal topics unless allowInternalTopicDeletion is true
+ if (Topic.isInternal(topicPartition.topic) && !allowInternalTopicDeletion) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
@@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,
def deleteRecords(timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
- responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit): Unit = {
+ responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit,
+ allowInternalTopicDeletion: Boolean = false): Unit = {
val timeBeforeLocalDeleteRecords = time.milliseconds
- val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+ val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index f12d21019a7..9b192e851e9 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
@@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest {
batch
))
}
+
+ @Test
+ def testDeleteRecordsResponseContainsError(): Unit = {
+ val replicaManager = mock(classOf[ReplicaManager])
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager
+ )
+
+ val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])
+
+ // Response contains error.
+ when(replicaManager.deleteRecords(
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(),
+ callbackCapture.capture(),
+ ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ callbackCapture.getValue.apply(Map(
+ new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
+ .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code
+ )))
+ }
+
+ partitionRecordWriter.deleteRecords(
+ new TopicPartition("random-topic", 0),
+ 10L
+ ).whenComplete { (_, exp) =>
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp)
+ }
+
+ // Empty response
+ when(replicaManager.deleteRecords(
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(),
+ callbackCapture.capture(),
+ ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ callbackCapture.getValue.apply(Map[TopicPartition, DeleteRecordsPartitionResult]())
+ }
+
+ partitionRecordWriter.deleteRecords(
+ new TopicPartition("random-topic", 0),
+ 10L
+ ).whenComplete { (_, exp) =>
+ assertTrue(exp.isInstanceOf[IllegalStateException])
+ }
+ }
+
+ @Test
+ def testDeleteRecordsSuccess(): Unit = {
+ val replicaManager = mock(classOf[ReplicaManager])
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager
+ )
+
+ val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])
+
+ // response contains error
+ when(replicaManager.deleteRecords(
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(),
+ callbackCapture.capture(),
+ ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ callbackCapture.getValue.apply(Map(
+ new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
+ .setErrorCode(Errors.NONE.code)
+ ))
+ }
+
+ partitionRecordWriter.deleteRecords(
+ new TopicPartition("random-topic", 0),
+ 10L
+ ).whenComplete { (_, exp) =>
+ assertNull(exp)
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a36598a5eeb..27cd6644bd9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest}
+import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
@@ -11702,7 +11702,7 @@ class KafkaApisTest extends Logging {
val response = getReadShareGroupResponse(
readRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
readStateResultData
@@ -11757,7 +11757,7 @@ class KafkaApisTest extends Logging {
val response = getReadShareGroupResponse(
readRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
readStateResultData
@@ -11812,7 +11812,7 @@ class KafkaApisTest extends Logging {
val response = getWriteShareGroupResponse(
writeRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
writeStateResultData
@@ -11867,7 +11867,7 @@ class KafkaApisTest extends Logging {
val response = getWriteShareGroupResponse(
writeRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
writeStateResultData
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ce45fcb6093..044a7d490d7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartit
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
-import org.apache.kafka.common.message.LeaderAndIsrRequestData
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.{DeleteRecordsResponseData, LeaderAndIsrRequestData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
@@ -6660,6 +6661,61 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = {
+ val localId = 1
+ val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
+ val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+ val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
+ val directoryIds = rm.logManager.directoryIdsSet.toList
+ assertEquals(directoryIds.size, 2)
+ val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
+ val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
+ partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
+ new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
+ None)
+
+ def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
+ assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code)
+ }
+
+ // default internal topics delete disabled
+ rm.deleteRecords(
+ timeout = 0L,
+ Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L),
+ responseCallback = callback
+ )
+ }
+
+ @Test
+ def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = {
+ val localId = 1
+ val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
+ val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+ val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
+ val directoryIds = rm.logManager.directoryIdsSet.toList
+ assertEquals(directoryIds.size, 2)
+ val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
+ val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
+ partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
+ new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
+ None)
+
+ def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
+ assert(responseStatus.values.head.errorCode == Errors.NONE.code)
+ }
+
+ // internal topics delete allowed
+ rm.deleteRecords(
+ timeout = 0L,
+ Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L),
+ responseCallback = callback,
+ allowInternalTopicDeletion = true
+ )
+ }
+
private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
index a27a19914c9..58bc774fe20 100644
--- a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Optional;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.";
+ public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms";
+ public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
+ public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
- .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
+ .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
+ .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
@@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize;
private final CompressionType compressionType;
private final int appendLingerMs;
+ private final int pruneIntervalMs;
public ShareCoordinatorConfig(AbstractConfig config) {
@@ -108,6 +115,7 @@ public class ShareCoordinatorConfig {
.map(CompressionType::forId)
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
+ pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
validate();
}
@@ -151,6 +159,10 @@ public class ShareCoordinatorConfig {
return compressionType;
}
+ public int shareCoordinatorTopicPruneIntervalMs() {
+ return pruneIntervalMs;
+ }
+
private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
new file mode 100644
index 00000000000..69070f65e93
--- /dev/null
+++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineLong;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Util class to track the offsets written into the internal topic
+ * per share partition key.
+ * It calculates the minimum offset globally up to which the records
+ * in the internal partition are redundant i.e. they have been overridden
+ * by newer records.
+ */
+public class ShareCoordinatorOffsetsManager {
+
+ // Map to store share partition key => current partition offset
+ // being written.
+ private final TimelineHashMap offsets;
+
+ // Minimum offset representing the smallest necessary offset
+ // across the internal partition (offsets below this are redundant).
+ // We are using timeline object here because the offsets which are passed into
+ // updateState might not be committed yet. In case of retry, these offsets would
+ // be invalidated via the snapshot registry. Hence, using timeline object
+ // the values would automatically revert in accordance with the last committed offset.
+ private final TimelineLong lastRedundantOffset;
+
+ public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) {
+ Objects.requireNonNull(snapshotRegistry);
+ offsets = new TimelineHashMap<>(snapshotRegistry, 0);
+ lastRedundantOffset = new TimelineLong(snapshotRegistry);
+ lastRedundantOffset.set(Long.MAX_VALUE); // For easy application of Math.min.
+ }
+
+ /**
+ * Method updates internal state with the supplied offset for the provided
+ * share partition key. It then calculates the minimum offset, if possible,
+ * below which all offsets are redundant.
+ *
+ * @param key - represents {@link SharePartitionKey} whose offset needs updating
+ * @param offset - represents the latest partition offset for provided key
+ */
+ public void updateState(SharePartitionKey key, long offset) {
+ lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
+ offsets.put(key, offset);
+
+ Optional redundantOffset = findRedundantOffset();
+ redundantOffset.ifPresent(lastRedundantOffset::set);
+ }
+
+ private Optional findRedundantOffset() {
+ if (offsets.isEmpty()) {
+ return Optional.empty();
+ }
+
+ long soFar = Long.MAX_VALUE;
+
+ for (long offset : offsets.values()) {
+ // Get min offset among latest offsets
+ // for all share keys in the internal partition.
+ soFar = Math.min(soFar, offset);
+
+ // lastRedundantOffset represents the smallest necessary offset
+ // and if soFar equals it, we cannot proceed. This can happen
+ // if a share partition key hasn't had records written for a while.
+ // For example,
+ //
+ // key1:1
+ // key2:2 4 6
+ // key3:3 5 7
+ //
+ // We can see in above that offsets 2, 4, 3, 5 are redundant,
+ // but we do not have a contiguous prefix starting at lastRedundantOffset
+ // and we cannot proceed.
+ if (soFar == lastRedundantOffset.get()) {
+ return Optional.of(soFar);
+ }
+ }
+
+ return Optional.of(soFar);
+ }
+
+ /**
+ * Most recent last redundant offset. This method is to be used
+ * when the caller wants to query the value of such offset.
+ * @return Optional of type Long representing the offset or empty for invalid offset values
+ */
+ public Optional lastRedundantOffset() {
+ long value = lastRedundantOffset.get();
+ if (value <= 0 || value == Long.MAX_VALUE) {
+ return Optional.empty();
+ }
+
+ return Optional.of(value);
+ }
+
+ // visible for testing
+ TimelineHashMap curState() {
+ return offsets;
+ }
+}
diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index ced4e532b03..71dd2d88056 100644
--- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
@@ -60,6 +61,7 @@ import java.util.Map;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
@@ -75,6 +77,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
private final ShareCoordinatorMetrics shareCoordinatorMetrics;
private volatile int numPartitions = -1; // Number of partitions for __share_group_state. Provided when component is started.
private final Time time;
+ private final Timer timer;
+ private final PartitionWriter writer;
+ private final Map lastPrunedOffsets;
public static class Builder {
private final int nodeId;
@@ -184,7 +189,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
config,
runtime,
coordinatorMetrics,
- time
+ time,
+ timer,
+ writer
);
}
}
@@ -194,12 +201,18 @@ public class ShareCoordinatorService implements ShareCoordinator {
ShareCoordinatorConfig config,
CoordinatorRuntime runtime,
ShareCoordinatorMetrics shareCoordinatorMetrics,
- Time time) {
+ Time time,
+ Timer timer,
+ PartitionWriter writer
+ ) {
this.log = logContext.logger(ShareCoordinatorService.class);
this.config = config;
this.runtime = runtime;
this.shareCoordinatorMetrics = shareCoordinatorMetrics;
this.time = time;
+ this.timer = timer;
+ this.writer = writer;
+ this.lastPrunedOffsets = new ConcurrentHashMap<>();
}
@Override
@@ -240,9 +253,82 @@ public class ShareCoordinatorService implements ShareCoordinator {
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ setupRecordPruning();
log.info("Startup complete.");
}
+ private void setupRecordPruning() {
+ log.info("Scheduling share-group state topic prune job.");
+ timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) {
+ @Override
+ public void run() {
+ List> futures = new ArrayList<>();
+ runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp)));
+
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}))
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.error("Received error in share-group state topic prune.", exp);
+ }
+ // Perpetual recursion, failure or not.
+ setupRecordPruning();
+ });
+ }
+ });
+ }
+
+ private CompletableFuture performRecordPruning(TopicPartition tp) {
+ // This future will always be completed normally, exception or not.
+ CompletableFuture fut = new CompletableFuture<>();
+
+ runtime.scheduleWriteOperation(
+ "write-state-record-prune",
+ tp,
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ ShareCoordinatorShard::lastRedundantOffset
+ ).whenComplete((result, exception) -> {
+ if (exception != null) {
+ log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception);
+ Errors error = Errors.forException(exception);
+ // These errors might result from partition metadata not loaded
+ // or shard re-election. Will cause unnecessary noise, hence not logging
+ if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) {
+ log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception);
+ fut.completeExceptionally(exception);
+ return;
+ }
+ fut.complete(null);
+ return;
+ }
+ if (result.isPresent()) {
+ Long off = result.get();
+ Long lastPrunedOffset = lastPrunedOffsets.get(tp);
+ if (lastPrunedOffset != null && lastPrunedOffset.longValue() == off) {
+ log.debug("{} already pruned till offset {}", tp, off);
+ fut.complete(null);
+ return;
+ }
+
+ log.info("Pruning records in {} till offset {}.", tp, off);
+ writer.deleteRecords(tp, off)
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.debug("Exception while deleting records in {} till offset {}.", tp, off, exp);
+ fut.completeExceptionally(exp);
+ return;
+ }
+ fut.complete(null);
+ // Best effort prevention of issuing duplicate delete calls.
+ lastPrunedOffsets.put(tp, off);
+ });
+ } else {
+ log.debug("No offset value for tp {} found.", tp);
+ fut.complete(null);
+ }
+ });
+ return fut;
+ }
+
@Override
public void shutdown() {
if (!isActive.compareAndSet(true, false)) {
@@ -543,8 +629,10 @@ public class ShareCoordinatorService implements ShareCoordinator {
@Override
public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) {
throwIfNotActive();
+ TopicPartition tp = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex);
+ lastPrunedOffsets.remove(tp);
runtime.scheduleUnloadOperation(
- new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex),
+ tp,
partitionLeaderEpoch
);
}
diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index e8ca9ce6b8d..6f2a9c27b74 100644
--- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -73,6 +73,7 @@ public class ShareCoordinatorShard implements CoordinatorShard snapshotUpdateCount;
private final TimelineHashMap stateEpochMap;
private MetadataImage metadataImage;
+ private final ShareCoordinatorOffsetsManager offsetsManager;
public static final Exception NULL_TOPIC_ID = new Exception("The topic id cannot be null.");
public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number.");
@@ -162,6 +163,17 @@ public class ShareCoordinatorShard implements CoordinatorShard(snapshotRegistry, 0);
this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.offsetsManager = offsetsManager;
}
@Override
@@ -195,7 +208,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData);
}
+ /**
+ * Method which returns the last known redundant offset from the partition
+ * led by this shard.
+ * @return CoordinatorResult containing empty record list and an Optional representing the offset.
+ */
+ public CoordinatorResult, CoordinatorRecord> lastRedundantOffset() {
+ return new CoordinatorResult<>(
+ Collections.emptyList(),
+ this.offsetsManager.lastRedundantOffset()
+ );
+ }
+
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
*
- * if no snapshot has been created for the key => create a new ShareSnapshot record
+ * If no snapshot has been created for the key => create a new ShareSnapshot record
* else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record
* else create a new ShareUpdate record
*
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
new file mode 100644
index 00000000000..262f166be19
--- /dev/null
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShareCoordinatorOffsetsManagerTest {
+
+ private ShareCoordinatorOffsetsManager manager;
+ private static final SharePartitionKey KEY1 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0);
+ private static final SharePartitionKey KEY2 = SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0);
+ private static final SharePartitionKey KEY3 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1);
+ private static final SharePartitionKey KEY4 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7);
+
+ @BeforeEach
+ public void setUp() {
+ manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new LogContext()));
+ }
+
+ @Test
+ public void testUpdateStateAddsToInternalState() {
+ manager.updateState(KEY1, 0L);
+ assertEquals(Optional.empty(), manager.lastRedundantOffset());
+
+ manager.updateState(KEY1, 10L);
+ assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant.
+
+ manager.updateState(KEY2, 15L);
+ assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, still 10L.
+
+ assertEquals(10L, manager.curState().get(KEY1));
+ assertEquals(15L, manager.curState().get(KEY2));
+ }
+
+ private static class ShareOffsetTestHolder {
+ static class TestTuple {
+ final SharePartitionKey key;
+ final long offset;
+ final Optional expectedOffset;
+
+ private TestTuple(SharePartitionKey key, long offset, Optional expectedOffset) {
+ this.key = key;
+ this.offset = offset;
+ this.expectedOffset = expectedOffset;
+ }
+
+ static TestTuple instance(SharePartitionKey key, long offset, Optional expectedOffset) {
+ return new TestTuple(key, offset, expectedOffset);
+ }
+ }
+
+ private final String testName;
+ private final List tuples;
+ private final boolean shouldRun;
+
+ ShareOffsetTestHolder(String testName, List tuples) {
+ this(testName, tuples, true);
+ }
+
+ ShareOffsetTestHolder(String testName, List tuples, boolean shouldRun) {
+ this.testName = testName;
+ this.tuples = tuples;
+ this.shouldRun = shouldRun;
+ }
+ }
+
+ static Stream generateNoRedundantStateCases() {
+ return Stream.of(
+ new ShareOffsetTestHolder(
+ "no redundant state single key",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L))
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "no redundant state multiple keys",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(10L))
+ )
+ )
+ );
+ }
+
+ static Stream generateRedundantStateCases() {
+ return Stream.of(
+ new ShareOffsetTestHolder(
+ "redundant state single key",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 15L, Optional.of(15L))
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "redundant state multiple keys",
+ // KEY1: 10 17
+ // KEY2: 11 16
+ // KEY3: 15
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L))
+ )
+ )
+ );
+
+ }
+
+ static Stream generateComplexCases() {
+ return Stream.of(
+ new ShareOffsetTestHolder(
+ "redundant state reverse key order",
+ // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(18L))
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "redundant state infrequently written partition.",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 25L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(27L))
+ )
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateNoRedundantStateCases")
+ public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
+ if (holder.shouldRun) {
+ holder.tuples.forEach(tuple -> {
+ manager.updateState(tuple.key, tuple.offset);
+ assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
+ });
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateRedundantStateCases")
+ public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
+ if (holder.shouldRun) {
+ holder.tuples.forEach(tuple -> {
+ manager.updateState(tuple.key, tuple.offset);
+ assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
+ });
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateComplexCases")
+ public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
+ if (holder.shouldRun) {
+ holder.tuples.forEach(tuple -> {
+ manager.updateState(tuple.key, tuple.offset);
+ assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
+ });
+ }
+ }
+}
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index fab60d22f75..c725c824031 100644
--- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -35,9 +35,13 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.server.util.timer.Timer;
import org.junit.jupiter.api.Test;
@@ -45,6 +49,8 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -56,8 +62,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -66,7 +74,10 @@ class ShareCoordinatorServiceTest {
@SuppressWarnings("unchecked")
private CoordinatorRuntime mockRuntime() {
- return (CoordinatorRuntime) mock(CoordinatorRuntime.class);
+ CoordinatorRuntime runtime = mock(CoordinatorRuntime.class);
+ when(runtime.activeTopicPartitions())
+ .thenReturn(Collections.singletonList(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
+ return runtime;
}
@Test
@@ -74,10 +85,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ new MockTimer(),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -95,10 +108,12 @@ class ShareCoordinatorServiceTest {
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L).thenReturn(150L);
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
coordinatorMetrics,
- time
+ time,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -203,10 +218,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -273,7 +290,7 @@ class ShareCoordinatorServiceTest {
.setDeliveryState((byte) 0)
)))
);
-
+
when(runtime.scheduleWriteOperation(
eq("read-update-leader-epoch-state"),
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
@@ -303,10 +320,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -348,10 +367,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -393,10 +414,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -470,10 +493,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -531,10 +556,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -579,10 +606,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -620,10 +649,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -646,11 +677,13 @@ class ShareCoordinatorServiceTest {
public void testPartitionFor() {
CoordinatorRuntime runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
- new LogContext(),
- ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
- runtime,
- new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -673,4 +706,422 @@ class ShareCoordinatorServiceTest {
// asCoordinatorKey does not discriminate on topic name.
assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
}
+
+ @Test
+ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(11L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(2))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(2))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+ TopicPartition tp1 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
+
+ when(runtime.activeTopicPartitions())
+ .thenReturn(List.of(tp1, tp2));
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(writer.deleteRecords(
+ any(),
+ eq(20L)
+ )).thenReturn(
+ CompletableFuture.failedFuture(new Exception("bad stuff"))
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ eq(tp1),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(11L))
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ eq(tp2),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(20L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(21L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 2);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // Prune should be called.
+ verify(runtime, times(2)) // For 2 topic partitions.
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // Prune should be called as future completes exceptionally.
+ verify(runtime, times(4)) // Second prune with 2 topic partitions.
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(4))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskException() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(0))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskSuccess() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(1))
+ .deleteRecords(any(), eq(20L));
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskEmptyOffsetReturned() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(0))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(2))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(1))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+ CompletableFuture fut1 = new CompletableFuture<>();
+ fut1.completeExceptionally(new Exception("bad stuff"));
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ fut1
+ ).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(2))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(2))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
}
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index d7ddc2e0f44..37e331dceec 100644
--- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -79,11 +80,12 @@ class ShareCoordinatorShardTest {
private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
private MetadataImage metadataImage = null;
private Map configOverrides = new HashMap<>();
+ ShareCoordinatorOffsetsManager offsetsManager = mock(ShareCoordinatorOffsetsManager.class);
ShareCoordinatorShard build() {
if (metadataImage == null) metadataImage = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
if (config == null) {
- config = ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap(configOverrides));
+ config = ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap(configOverrides));
}
ShareCoordinatorShard shard = new ShareCoordinatorShard(
@@ -91,7 +93,8 @@ class ShareCoordinatorShardTest {
config,
coordinatorMetrics,
metricsShard,
- snapshotRegistry
+ snapshotRegistry,
+ offsetsManager
);
when(metadataImage.topics().getTopic((Uuid) any())).thenReturn(mock(TopicImage.class));
when(metadataImage.topics().getPartition(any(), anyInt())).thenReturn(mock(PartitionRegistration.class));
@@ -103,6 +106,11 @@ class ShareCoordinatorShardTest {
this.configOverrides = configOverrides;
return this;
}
+
+ public ShareCoordinatorShardBuilder setOffsetsManager(ShareCoordinatorOffsetsManager offsetsManager) {
+ this.offsetsManager = offsetsManager;
+ return this;
+ }
}
private void writeAndReplayDefaultRecord(ShareCoordinatorShard shard) {
@@ -796,6 +804,17 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
+ @Test
+ public void testLastRedundantOffset() {
+ ShareCoordinatorOffsetsManager manager = mock(ShareCoordinatorOffsetsManager.class);
+ ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder()
+ .setOffsetsManager(manager)
+ .build();
+
+ when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L));
+ assertEquals(new CoordinatorResult<>(Collections.emptyList(), Optional.of(10L)), shard.lastRedundantOffset());
+ }
+
@Test
public void testReadStateLeaderEpochUpdateSuccess() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
similarity index 95%
rename from share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
rename to share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index 5f8c37fc1e6..31b5bd88bdb 100644
--- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
+++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ShareCoordinatorConfigTest {
+public class ShareCoordinatorTestConfig {
private static final List CONFIG_DEF_LIST = Collections.singletonList(
ShareCoordinatorConfig.CONFIG_DEF
@@ -50,6 +50,7 @@ public class ShareCoordinatorConfigTest {
configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555");
configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, String.valueOf(CompressionType.NONE.id));
+ configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, "30000"); // 30 seconds
return configs;
}