From 4c5ea05ec85aba18abae1308f78345ff949659a5 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Thu, 12 Dec 2024 13:08:03 +0530 Subject: [PATCH] KAFKA-18058: Share group state record pruning impl. (#18014) In this PR, we've added a class ShareCoordinatorOffsetsManager, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job in ShareCoordinatorService which queries for the redundant offset at regular intervals and if a valid value is found, issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In this way the size of the partitions is kept manageable. Reviewers: Jun Rao , David Jacot , Andrew Schofield --- .../common/runtime/CoordinatorRuntime.java | 20 + .../common/runtime/PartitionWriter.java | 11 + .../runtime/InMemoryPartitionWriter.java | 8 + .../group/CoordinatorPartitionWriter.scala | 21 + .../scala/kafka/server/ReplicaManager.scala | 11 +- .../CoordinatorPartitionWriterTest.scala | 82 ++- .../unit/kafka/server/KafkaApisTest.scala | 10 +- .../kafka/server/ReplicaManagerTest.scala | 58 +- .../server/config/ShareCoordinatorConfig.java | 14 +- .../share/ShareCoordinatorOffsetsManager.java | 122 +++++ .../share/ShareCoordinatorService.java | 94 +++- .../share/ShareCoordinatorShard.java | 33 +- .../ShareCoordinatorOffsetsManagerTest.java | 209 ++++++++ .../share/ShareCoordinatorServiceTest.java | 505 +++++++++++++++++- .../share/ShareCoordinatorShardTest.java | 23 +- ...t.java => ShareCoordinatorTestConfig.java} | 3 +- 16 files changed, 1175 insertions(+), 49 deletions(-) create mode 100644 share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java create mode 100644 share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java rename share-coordinator/src/test/java/org/apache/kafka/coordinator/share/{ShareCoordinatorConfigTest.java => ShareCoordinatorTestConfig.java} (95%) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 1c21038e66a..ee0cf18212a 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -52,6 +52,7 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -2468,4 +2469,23 @@ public class CoordinatorRuntime, U> implements Aut Utils.closeQuietly(runtimeMetrics, "runtime metrics"); log.info("Coordinator runtime closed."); } + + /** + * Util method which returns all the topic partitions for which + * the state machine is in active state. + *

+ * This could be useful if the caller does not have a specific + * target internal topic partition. + * @return List of {@link TopicPartition} whose coordinators are active + */ + public List activeTopicPartitions() { + if (coordinators == null || coordinators.isEmpty()) { + return Collections.emptyList(); + } + + return coordinators.entrySet().stream() + .filter(entry -> entry.getValue().state.equals(CoordinatorState.ACTIVE)) + .map(Map.Entry::getKey) + .toList(); + } } diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java index 47df8bcae34..cb8bec3f71c 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java @@ -107,4 +107,15 @@ public interface PartitionWriter { short producerEpoch, short apiVersion ) throws KafkaException; + + /** + * Delete records from a topic partition until specified offset + * @param tp The partition to delete records from + * @param deleteBeforeOffset Offset to delete until, starting from the beginning + * @throws KafkaException Any KafkaException caught during the operation. + */ + CompletableFuture deleteRecords( + TopicPartition tp, + long deleteBeforeOffset + ) throws KafkaException; } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java index 1f676ad550f..a8551f0734b 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java @@ -115,6 +115,14 @@ public class InMemoryPartitionWriter implements PartitionWriter { } } + @Override + public CompletableFuture deleteRecords( + TopicPartition tp, + long deleteBeforeOffset + ) throws KafkaException { + throw new RuntimeException("method not implemented"); + } + @Override public CompletableFuture maybeStartTransactionVerification( TopicPartition tp, diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index f70819ef438..211be799a7e 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -165,4 +165,25 @@ class CoordinatorPartitionWriter( // Required offset. partitionResult.lastOffset + 1 } + + override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = { + val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]() + + replicaManager.deleteRecords( + timeout = 30000L, // 30 seconds. + offsetPerPartition = Map(tp -> deleteBeforeOffset), + responseCallback = results => { + val result = results.get(tp) + if (result.isEmpty) { + responseFuture.completeExceptionally(new IllegalStateException(s"Delete status $result should have partition $tp.")) + } else if (result.get.errorCode != Errors.NONE.code) { + responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception) + } else { + responseFuture.complete(null) + } + }, + allowInternalTopicDeletion = true + ) + responseFuture + } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 67cc1a84c56..cb7005f4020 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig, * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas; * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset */ - private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = { + private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = { trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition)) offsetPerPartition.map { case (topicPartition, requestedOffset) => - // reject delete records operation on internal topics - if (Topic.isInternal(topicPartition.topic)) { + // reject delete records operation for internal topics unless allowInternalTopicDeletion is true + if (Topic.isInternal(topicPartition.topic) && !allowInternalTopicDeletion) { (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}")))) } else { try { @@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig, def deleteRecords(timeout: Long, offsetPerPartition: Map[TopicPartition, Long], - responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit): Unit = { + responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit, + allowInternalTopicDeletion: Boolean = false): Unit = { val timeBeforeLocalDeleteRecords = time.milliseconds - val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition) + val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion) debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords)) val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index f12d21019a7..9b192e851e9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -20,13 +20,14 @@ import kafka.server.ReplicaManager import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.coordinator.common.runtime.PartitionWriter import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} import org.apache.kafka.test.TestUtils.assertFutureThrows -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue} import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource @@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest { batch )) } + + @Test + def testDeleteRecordsResponseContainsError(): Unit = { + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager + ) + + val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit]) + + // Response contains error. + when(replicaManager.deleteRecords( + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), + callbackCapture.capture(), + ArgumentMatchers.eq(true) + )).thenAnswer { _ => + callbackCapture.getValue.apply(Map( + new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult() + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code + ))) + } + + partitionRecordWriter.deleteRecords( + new TopicPartition("random-topic", 0), + 10L + ).whenComplete { (_, exp) => + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp) + } + + // Empty response + when(replicaManager.deleteRecords( + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), + callbackCapture.capture(), + ArgumentMatchers.eq(true) + )).thenAnswer { _ => + callbackCapture.getValue.apply(Map[TopicPartition, DeleteRecordsPartitionResult]()) + } + + partitionRecordWriter.deleteRecords( + new TopicPartition("random-topic", 0), + 10L + ).whenComplete { (_, exp) => + assertTrue(exp.isInstanceOf[IllegalStateException]) + } + } + + @Test + def testDeleteRecordsSuccess(): Unit = { + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager + ) + + val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit]) + + // response contains error + when(replicaManager.deleteRecords( + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), + callbackCapture.capture(), + ArgumentMatchers.eq(true) + )).thenAnswer { _ => + callbackCapture.getValue.apply(Map( + new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult() + .setErrorCode(Errors.NONE.code) + )) + } + + partitionRecordWriter.deleteRecords( + new TopicPartition("random-topic", 0), + 10L + ).whenComplete { (_, exp) => + assertNull(exp) + } + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a36598a5eeb..27cd6644bd9 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -77,7 +77,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} -import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest} +import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} @@ -11702,7 +11702,7 @@ class KafkaApisTest extends Logging { val response = getReadShareGroupResponse( readRequestData, - config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, verifyNoErr = true, null, readStateResultData @@ -11757,7 +11757,7 @@ class KafkaApisTest extends Logging { val response = getReadShareGroupResponse( readRequestData, - config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, verifyNoErr = false, authorizer, readStateResultData @@ -11812,7 +11812,7 @@ class KafkaApisTest extends Logging { val response = getWriteShareGroupResponse( writeRequestData, - config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, verifyNoErr = true, null, writeStateResultData @@ -11867,7 +11867,7 @@ class KafkaApisTest extends Logging { val response = getWriteShareGroupResponse( writeRequestData, - config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, verifyNoErr = false, authorizer, writeStateResultData diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index ce45fcb6093..044a7d490d7 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartit import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} -import org.apache.kafka.common.message.LeaderAndIsrRequestData +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.{DeleteRecordsResponseData, LeaderAndIsrRequestData} import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState @@ -6660,6 +6661,61 @@ class ReplicaManagerTest { } } + @Test + def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = { + val localId = 1 + val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME) + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + + val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + val directoryIds = rm.logManager.directoryIdsSet.toList + assertEquals(directoryIds.size, 2) + val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) + val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get + partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), + None) + + def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { + assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code) + } + + // default internal topics delete disabled + rm.deleteRecords( + timeout = 0L, + Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L), + responseCallback = callback + ) + } + + @Test + def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = { + val localId = 1 + val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME) + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + + val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + val directoryIds = rm.logManager.directoryIdsSet.toList + assertEquals(directoryIds.size, 2) + val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) + val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get + partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), + None) + + def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { + assert(responseStatus.values.head.errorCode == Errors.NONE.code) + } + + // internal topics delete allowed + rm.deleteRecords( + timeout = 0L, + Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L), + responseCallback = callback, + allowInternalTopicDeletion = true + ) + } + private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) try { diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java index a27a19914c9..58bc774fe20 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Utils; import java.util.Optional; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Type.INT; @@ -71,6 +72,10 @@ public class ShareCoordinatorConfig { public static final int APPEND_LINGER_MS_DEFAULT = 10; public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk."; + public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms"; + public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes + public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic."; + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC) .define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC) @@ -81,7 +86,8 @@ public class ShareCoordinatorConfig { .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC) .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC) .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC) - .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC); + .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC) + .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC); private final int stateTopicNumPartitions; private final short stateTopicReplicationFactor; @@ -93,6 +99,7 @@ public class ShareCoordinatorConfig { private final int loadBufferSize; private final CompressionType compressionType; private final int appendLingerMs; + private final int pruneIntervalMs; public ShareCoordinatorConfig(AbstractConfig config) { @@ -108,6 +115,7 @@ public class ShareCoordinatorConfig { .map(CompressionType::forId) .orElse(null); appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG); + pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG); validate(); } @@ -151,6 +159,10 @@ public class ShareCoordinatorConfig { return compressionType; } + public int shareCoordinatorTopicPruneIntervalMs() { + return pruneIntervalMs; + } + private void validate() { Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500, String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG)); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java new file mode 100644 index 00000000000..69070f65e93 --- /dev/null +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineLong; + +import java.util.Objects; +import java.util.Optional; + +/** + * Util class to track the offsets written into the internal topic + * per share partition key. + * It calculates the minimum offset globally up to which the records + * in the internal partition are redundant i.e. they have been overridden + * by newer records. + */ +public class ShareCoordinatorOffsetsManager { + + // Map to store share partition key => current partition offset + // being written. + private final TimelineHashMap offsets; + + // Minimum offset representing the smallest necessary offset + // across the internal partition (offsets below this are redundant). + // We are using timeline object here because the offsets which are passed into + // updateState might not be committed yet. In case of retry, these offsets would + // be invalidated via the snapshot registry. Hence, using timeline object + // the values would automatically revert in accordance with the last committed offset. + private final TimelineLong lastRedundantOffset; + + public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) { + Objects.requireNonNull(snapshotRegistry); + offsets = new TimelineHashMap<>(snapshotRegistry, 0); + lastRedundantOffset = new TimelineLong(snapshotRegistry); + lastRedundantOffset.set(Long.MAX_VALUE); // For easy application of Math.min. + } + + /** + * Method updates internal state with the supplied offset for the provided + * share partition key. It then calculates the minimum offset, if possible, + * below which all offsets are redundant. + * + * @param key - represents {@link SharePartitionKey} whose offset needs updating + * @param offset - represents the latest partition offset for provided key + */ + public void updateState(SharePartitionKey key, long offset) { + lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset)); + offsets.put(key, offset); + + Optional redundantOffset = findRedundantOffset(); + redundantOffset.ifPresent(lastRedundantOffset::set); + } + + private Optional findRedundantOffset() { + if (offsets.isEmpty()) { + return Optional.empty(); + } + + long soFar = Long.MAX_VALUE; + + for (long offset : offsets.values()) { + // Get min offset among latest offsets + // for all share keys in the internal partition. + soFar = Math.min(soFar, offset); + + // lastRedundantOffset represents the smallest necessary offset + // and if soFar equals it, we cannot proceed. This can happen + // if a share partition key hasn't had records written for a while. + // For example, + //

+ // key1:1 + // key2:2 4 6 + // key3:3 5 7 + //

+ // We can see in above that offsets 2, 4, 3, 5 are redundant, + // but we do not have a contiguous prefix starting at lastRedundantOffset + // and we cannot proceed. + if (soFar == lastRedundantOffset.get()) { + return Optional.of(soFar); + } + } + + return Optional.of(soFar); + } + + /** + * Most recent last redundant offset. This method is to be used + * when the caller wants to query the value of such offset. + * @return Optional of type Long representing the offset or empty for invalid offset values + */ + public Optional lastRedundantOffset() { + long value = lastRedundantOffset.get(); + if (value <= 0 || value == Long.MAX_VALUE) { + return Optional.empty(); + } + + return Optional.of(value); + } + + // visible for testing + TimelineHashMap curState() { + return offsets; + } +} diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index ced4e532b03..71dd2d88056 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ShareCoordinatorConfig; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; import org.slf4j.Logger; @@ -60,6 +61,7 @@ import java.util.Map; import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntSupplier; @@ -75,6 +77,9 @@ public class ShareCoordinatorService implements ShareCoordinator { private final ShareCoordinatorMetrics shareCoordinatorMetrics; private volatile int numPartitions = -1; // Number of partitions for __share_group_state. Provided when component is started. private final Time time; + private final Timer timer; + private final PartitionWriter writer; + private final Map lastPrunedOffsets; public static class Builder { private final int nodeId; @@ -184,7 +189,9 @@ public class ShareCoordinatorService implements ShareCoordinator { config, runtime, coordinatorMetrics, - time + time, + timer, + writer ); } } @@ -194,12 +201,18 @@ public class ShareCoordinatorService implements ShareCoordinator { ShareCoordinatorConfig config, CoordinatorRuntime runtime, ShareCoordinatorMetrics shareCoordinatorMetrics, - Time time) { + Time time, + Timer timer, + PartitionWriter writer + ) { this.log = logContext.logger(ShareCoordinatorService.class); this.config = config; this.runtime = runtime; this.shareCoordinatorMetrics = shareCoordinatorMetrics; this.time = time; + this.timer = timer; + this.writer = writer; + this.lastPrunedOffsets = new ConcurrentHashMap<>(); } @Override @@ -240,9 +253,82 @@ public class ShareCoordinatorService implements ShareCoordinator { log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + log.info("Scheduling share-group state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share-group state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(); + }); + } + }); + } + + private CompletableFuture performRecordPruning(TopicPartition tp) { + // This future will always be completed normally, exception or not. + CompletableFuture fut = new CompletableFuture<>(); + + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception); + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception); + fut.completeExceptionally(exception); + return; + } + fut.complete(null); + return; + } + if (result.isPresent()) { + Long off = result.get(); + Long lastPrunedOffset = lastPrunedOffsets.get(tp); + if (lastPrunedOffset != null && lastPrunedOffset.longValue() == off) { + log.debug("{} already pruned till offset {}", tp, off); + fut.complete(null); + return; + } + + log.info("Pruning records in {} till offset {}.", tp, off); + writer.deleteRecords(tp, off) + .whenComplete((res, exp) -> { + if (exp != null) { + log.debug("Exception while deleting records in {} till offset {}.", tp, off, exp); + fut.completeExceptionally(exp); + return; + } + fut.complete(null); + // Best effort prevention of issuing duplicate delete calls. + lastPrunedOffsets.put(tp, off); + }); + } else { + log.debug("No offset value for tp {} found.", tp); + fut.complete(null); + } + }); + return fut; + } + @Override public void shutdown() { if (!isActive.compareAndSet(true, false)) { @@ -543,8 +629,10 @@ public class ShareCoordinatorService implements ShareCoordinator { @Override public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) { throwIfNotActive(); + TopicPartition tp = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex); + lastPrunedOffsets.remove(tp); runtime.scheduleUnloadOperation( - new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex), + tp, partitionLeaderEpoch ); } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index e8ca9ce6b8d..6f2a9c27b74 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -73,6 +73,7 @@ public class ShareCoordinatorShard implements CoordinatorShard snapshotUpdateCount; private final TimelineHashMap stateEpochMap; private MetadataImage metadataImage; + private final ShareCoordinatorOffsetsManager offsetsManager; public static final Exception NULL_TOPIC_ID = new Exception("The topic id cannot be null."); public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number."); @@ -162,6 +163,17 @@ public class ShareCoordinatorShard implements CoordinatorShard(snapshotRegistry, 0); this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0); this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0); + this.offsetsManager = offsetsManager; } @Override @@ -195,7 +208,7 @@ public class ShareCoordinatorShard implements CoordinatorShard(Collections.singletonList(record), responseData); } + /** + * Method which returns the last known redundant offset from the partition + * led by this shard. + * @return CoordinatorResult containing empty record list and an Optional representing the offset. + */ + public CoordinatorResult, CoordinatorRecord> lastRedundantOffset() { + return new CoordinatorResult<>( + Collections.emptyList(), + this.offsetsManager.lastRedundantOffset() + ); + } + /** * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions. *

- * if no snapshot has been created for the key => create a new ShareSnapshot record + * If no snapshot has been created for the key => create a new ShareSnapshot record * else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record * else create a new ShareUpdate record * diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java new file mode 100644 index 00000000000..262f166be19 --- /dev/null +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.timeline.SnapshotRegistry; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ShareCoordinatorOffsetsManagerTest { + + private ShareCoordinatorOffsetsManager manager; + private static final SharePartitionKey KEY1 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0); + private static final SharePartitionKey KEY2 = SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0); + private static final SharePartitionKey KEY3 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1); + private static final SharePartitionKey KEY4 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7); + + @BeforeEach + public void setUp() { + manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new LogContext())); + } + + @Test + public void testUpdateStateAddsToInternalState() { + manager.updateState(KEY1, 0L); + assertEquals(Optional.empty(), manager.lastRedundantOffset()); + + manager.updateState(KEY1, 10L); + assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant. + + manager.updateState(KEY2, 15L); + assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, still 10L. + + assertEquals(10L, manager.curState().get(KEY1)); + assertEquals(15L, manager.curState().get(KEY2)); + } + + private static class ShareOffsetTestHolder { + static class TestTuple { + final SharePartitionKey key; + final long offset; + final Optional expectedOffset; + + private TestTuple(SharePartitionKey key, long offset, Optional expectedOffset) { + this.key = key; + this.offset = offset; + this.expectedOffset = expectedOffset; + } + + static TestTuple instance(SharePartitionKey key, long offset, Optional expectedOffset) { + return new TestTuple(key, offset, expectedOffset); + } + } + + private final String testName; + private final List tuples; + private final boolean shouldRun; + + ShareOffsetTestHolder(String testName, List tuples) { + this(testName, tuples, true); + } + + ShareOffsetTestHolder(String testName, List tuples, boolean shouldRun) { + this.testName = testName; + this.tuples = tuples; + this.shouldRun = shouldRun; + } + } + + static Stream generateNoRedundantStateCases() { + return Stream.of( + new ShareOffsetTestHolder( + "no redundant state single key", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)) + ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(10L)) + ) + ) + ); + } + + static Stream generateRedundantStateCases() { + return Stream.of( + new ShareOffsetTestHolder( + "redundant state single key", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 15L, Optional.of(15L)) + ) + ), + + new ShareOffsetTestHolder( + "redundant state multiple keys", + // KEY1: 10 17 + // KEY2: 11 16 + // KEY3: 15 + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned + ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L)) + ) + ) + ); + + } + + static Stream generateComplexCases() { + return Stream.of( + new ShareOffsetTestHolder( + "redundant state reverse key order", + // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1. + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(18L)) + ) + ), + + new ShareOffsetTestHolder( + "redundant state infrequently written partition.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 25L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(27L)) + ) + ) + ); + } + + @ParameterizedTest + @MethodSource("generateNoRedundantStateCases") + public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) { + if (holder.shouldRun) { + holder.tuples.forEach(tuple -> { + manager.updateState(tuple.key, tuple.offset); + assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); + }); + } + } + + @ParameterizedTest + @MethodSource("generateRedundantStateCases") + public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) { + if (holder.shouldRun) { + holder.tuples.forEach(tuple -> { + manager.updateState(tuple.key, tuple.offset); + assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); + }); + } + } + + @ParameterizedTest + @MethodSource("generateComplexCases") + public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) { + if (holder.shouldRun) { + holder.tuples.forEach(tuple -> { + manager.updateState(tuple.key, tuple.offset); + assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); + }); + } + } +} diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index fab60d22f75..c725c824031 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -35,9 +35,13 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.util.FutureUtils; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.server.util.timer.MockTimer; +import org.apache.kafka.server.util.timer.Timer; import org.junit.jupiter.api.Test; @@ -45,6 +49,8 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -56,8 +62,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -66,7 +74,10 @@ class ShareCoordinatorServiceTest { @SuppressWarnings("unchecked") private CoordinatorRuntime mockRuntime() { - return (CoordinatorRuntime) mock(CoordinatorRuntime.class); + CoordinatorRuntime runtime = mock(CoordinatorRuntime.class); + when(runtime.activeTopicPartitions()) + .thenReturn(Collections.singletonList(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0))); + return runtime; } @Test @@ -74,10 +85,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + new MockTimer(), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -95,10 +108,12 @@ class ShareCoordinatorServiceTest { when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L).thenReturn(150L); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, coordinatorMetrics, - time + time, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -203,10 +218,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -273,7 +290,7 @@ class ShareCoordinatorServiceTest { .setDeliveryState((byte) 0) ))) ); - + when(runtime.scheduleWriteOperation( eq("read-update-leader-epoch-state"), eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), @@ -303,10 +320,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -348,10 +367,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -393,10 +414,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); String groupId = "group1"; @@ -470,10 +493,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); String groupId = "group1"; @@ -531,10 +556,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -579,10 +606,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -620,10 +649,12 @@ class ShareCoordinatorServiceTest { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), runtime, new ShareCoordinatorMetrics(), - Time.SYSTEM + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); service.startup(() -> 1); @@ -646,11 +677,13 @@ class ShareCoordinatorServiceTest { public void testPartitionFor() { CoordinatorRuntime runtime = mockRuntime(); ShareCoordinatorService service = new ShareCoordinatorService( - new LogContext(), - ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), - runtime, - new ShareCoordinatorMetrics(), - Time.SYSTEM + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) ); String groupId = "group1"; @@ -673,4 +706,422 @@ class ShareCoordinatorServiceTest { // asCoordinatorKey does not discriminate on topic name. assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey()); } + + @Test + public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + when(writer.deleteRecords( + any(), + eq(10L) + )).thenReturn( + CompletableFuture.completedFuture(null) + ); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn( + CompletableFuture.completedFuture(Optional.of(10L)) + ).thenReturn( + CompletableFuture.completedFuture(Optional.of(11L)) + ); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 1); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(1)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(2)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(2)) + .deleteRecords(any(), anyLong()); + service.shutdown(); + } + + @Test + public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + TopicPartition tp1 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0); + TopicPartition tp2 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1); + + when(runtime.activeTopicPartitions()) + .thenReturn(List.of(tp1, tp2)); + + when(writer.deleteRecords( + any(), + eq(10L) + )).thenReturn( + CompletableFuture.completedFuture(null) + ); + + when(writer.deleteRecords( + any(), + eq(20L) + )).thenReturn( + CompletableFuture.failedFuture(new Exception("bad stuff")) + ); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + eq(tp1), + any(), + any() + )).thenReturn( + CompletableFuture.completedFuture(Optional.of(10L)) + ).thenReturn( + CompletableFuture.completedFuture(Optional.of(11L)) + ); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + eq(tp2), + any(), + any() + )).thenReturn( + CompletableFuture.completedFuture(Optional.of(20L)) + ).thenReturn( + CompletableFuture.completedFuture(Optional.of(21L)) + ); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 2); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // Prune should be called. + verify(runtime, times(2)) // For 2 topic partitions. + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // Prune should be called as future completes exceptionally. + verify(runtime, times(4)) // Second prune with 2 topic partitions. + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(4)) + .deleteRecords(any(), anyLong()); + service.shutdown(); + } + + @Test + public void testRecordPruningTaskException() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 1); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(1)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(0)) + .deleteRecords(any(), anyLong()); + service.shutdown(); + } + + @Test + public void testRecordPruningTaskSuccess() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn(CompletableFuture.completedFuture(Optional.of(20L))); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 1); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(1)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(1)) + .deleteRecords(any(), eq(20L)); + service.shutdown(); + } + + @Test + public void testRecordPruningTaskEmptyOffsetReturned() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 1); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(1)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(0)) + .deleteRecords(any(), anyLong()); + service.shutdown(); + } + + @Test + public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + when(writer.deleteRecords( + any(), + eq(10L) + )).thenReturn( + CompletableFuture.completedFuture(null) + ); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn( + CompletableFuture.completedFuture(Optional.of(10L)) + ).thenReturn( + CompletableFuture.completedFuture(Optional.of(10L)) + ); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 1); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(1)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(2)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(1)) + .deleteRecords(any(), anyLong()); + service.shutdown(); + } + + @Test + public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + CompletableFuture fut1 = new CompletableFuture<>(); + fut1.completeExceptionally(new Exception("bad stuff")); + + when(writer.deleteRecords( + any(), + eq(10L) + )).thenReturn( + fut1 + ).thenReturn( + CompletableFuture.completedFuture(null) + ); + + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn( + CompletableFuture.completedFuture(Optional.of(10L)) + ).thenReturn( + CompletableFuture.completedFuture(Optional.of(10L)) + ); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + time, + timer, + writer + )); + + service.startup(() -> 1); + verify(runtime, times(0)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(1)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + timer.advanceClock(30005L); // prune should be called + verify(runtime, times(2)) + .scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any()); + + verify(writer, times(2)) + .deleteRecords(any(), anyLong()); + service.shutdown(); + } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index d7ddc2e0f44..37e331dceec 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -79,11 +80,12 @@ class ShareCoordinatorShardTest { private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private MetadataImage metadataImage = null; private Map configOverrides = new HashMap<>(); + ShareCoordinatorOffsetsManager offsetsManager = mock(ShareCoordinatorOffsetsManager.class); ShareCoordinatorShard build() { if (metadataImage == null) metadataImage = mock(MetadataImage.class, RETURNS_DEEP_STUBS); if (config == null) { - config = ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap(configOverrides)); + config = ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap(configOverrides)); } ShareCoordinatorShard shard = new ShareCoordinatorShard( @@ -91,7 +93,8 @@ class ShareCoordinatorShardTest { config, coordinatorMetrics, metricsShard, - snapshotRegistry + snapshotRegistry, + offsetsManager ); when(metadataImage.topics().getTopic((Uuid) any())).thenReturn(mock(TopicImage.class)); when(metadataImage.topics().getPartition(any(), anyInt())).thenReturn(mock(PartitionRegistration.class)); @@ -103,6 +106,11 @@ class ShareCoordinatorShardTest { this.configOverrides = configOverrides; return this; } + + public ShareCoordinatorShardBuilder setOffsetsManager(ShareCoordinatorOffsetsManager offsetsManager) { + this.offsetsManager = offsetsManager; + return this; + } } private void writeAndReplayDefaultRecord(ShareCoordinatorShard shard) { @@ -796,6 +804,17 @@ class ShareCoordinatorShardTest { verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); } + @Test + public void testLastRedundantOffset() { + ShareCoordinatorOffsetsManager manager = mock(ShareCoordinatorOffsetsManager.class); + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder() + .setOffsetsManager(manager) + .build(); + + when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L)); + assertEquals(new CoordinatorResult<>(Collections.emptyList(), Optional.of(10L)), shard.lastRedundantOffset()); + } + @Test public void testReadStateLeaderEpochUpdateSuccess() { ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java similarity index 95% rename from share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java rename to share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java index 5f8c37fc1e6..31b5bd88bdb 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java @@ -28,7 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ShareCoordinatorConfigTest { +public class ShareCoordinatorTestConfig { private static final List CONFIG_DEF_LIST = Collections.singletonList( ShareCoordinatorConfig.CONFIG_DEF @@ -50,6 +50,7 @@ public class ShareCoordinatorConfigTest { configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555"); configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10"); configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, String.valueOf(CompressionType.NONE.id)); + configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, "30000"); // 30 seconds return configs; }