KAFKA-18058: Share group state record pruning impl. (#18014)

In this PR, we've added a class ShareCoordinatorOffsetsManager, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job in ShareCoordinatorService which queries for the redundant offset at regular intervals and if a valid value is found, issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In this way the size of the partitions is kept manageable.

Reviewers: Jun Rao <junrao@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2024-12-12 13:08:03 +05:30 committed by GitHub
parent a0a501952b
commit 4c5ea05ec8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1175 additions and 49 deletions

View File

@ -52,6 +52,7 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -2468,4 +2469,23 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
Utils.closeQuietly(runtimeMetrics, "runtime metrics"); Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed."); log.info("Coordinator runtime closed.");
} }
/**
* Util method which returns all the topic partitions for which
* the state machine is in active state.
* <p>
* This could be useful if the caller does not have a specific
* target internal topic partition.
* @return List of {@link TopicPartition} whose coordinators are active
*/
public List<TopicPartition> activeTopicPartitions() {
if (coordinators == null || coordinators.isEmpty()) {
return Collections.emptyList();
}
return coordinators.entrySet().stream()
.filter(entry -> entry.getValue().state.equals(CoordinatorState.ACTIVE))
.map(Map.Entry::getKey)
.toList();
}
} }

View File

@ -107,4 +107,15 @@ public interface PartitionWriter {
short producerEpoch, short producerEpoch,
short apiVersion short apiVersion
) throws KafkaException; ) throws KafkaException;
/**
* Delete records from a topic partition until specified offset
* @param tp The partition to delete records from
* @param deleteBeforeOffset Offset to delete until, starting from the beginning
* @throws KafkaException Any KafkaException caught during the operation.
*/
CompletableFuture<Void> deleteRecords(
TopicPartition tp,
long deleteBeforeOffset
) throws KafkaException;
} }

View File

@ -115,6 +115,14 @@ public class InMemoryPartitionWriter implements PartitionWriter {
} }
} }
@Override
public CompletableFuture<Void> deleteRecords(
TopicPartition tp,
long deleteBeforeOffset
) throws KafkaException {
throw new RuntimeException("method not implemented");
}
@Override @Override
public CompletableFuture<VerificationGuard> maybeStartTransactionVerification( public CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
TopicPartition tp, TopicPartition tp,

View File

@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
// Required offset. // Required offset.
partitionResult.lastOffset + 1 partitionResult.lastOffset + 1
} }
override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {
val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()
replicaManager.deleteRecords(
timeout = 30000L, // 30 seconds.
offsetPerPartition = Map(tp -> deleteBeforeOffset),
responseCallback = results => {
val result = results.get(tp)
if (result.isEmpty) {
responseFuture.completeExceptionally(new IllegalStateException(s"Delete status $result should have partition $tp."))
} else if (result.get.errorCode != Errors.NONE.code) {
responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception)
} else {
responseFuture.complete(null)
}
},
allowInternalTopicDeletion = true
)
responseFuture
}
} }

View File

@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas; * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
*/ */
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = { private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition)) trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) => offsetPerPartition.map { case (topicPartition, requestedOffset) =>
// reject delete records operation on internal topics // reject delete records operation for internal topics unless allowInternalTopicDeletion is true
if (Topic.isInternal(topicPartition.topic)) { if (Topic.isInternal(topicPartition.topic) && !allowInternalTopicDeletion) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}")))) (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else { } else {
try { try {
@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,
def deleteRecords(timeout: Long, def deleteRecords(timeout: Long,
offsetPerPartition: Map[TopicPartition, Long], offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit): Unit = { responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit,
allowInternalTopicDeletion: Boolean = false): Unit = {
val timeBeforeLocalDeleteRecords = time.milliseconds val timeBeforeLocalDeleteRecords = time.milliseconds
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition) val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords)) debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) => val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>

View File

@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource import org.junit.jupiter.params.provider.EnumSource
@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest {
batch batch
)) ))
} }
@Test
def testDeleteRecordsResponseContainsError(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)
val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])
// Response contains error.
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code
)))
}
partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp)
}
// Empty response
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map[TopicPartition, DeleteRecordsPartitionResult]())
}
partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertTrue(exp.isInstanceOf[IllegalStateException])
}
}
@Test
def testDeleteRecordsSuccess(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)
val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])
// response contains error
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NONE.code)
))
}
partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertNull(exp)
}
}
} }

View File

@ -77,7 +77,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
@ -11702,7 +11702,7 @@ class KafkaApisTest extends Logging {
val response = getReadShareGroupResponse( val response = getReadShareGroupResponse(
readRequestData, readRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true, verifyNoErr = true,
null, null,
readStateResultData readStateResultData
@ -11757,7 +11757,7 @@ class KafkaApisTest extends Logging {
val response = getReadShareGroupResponse( val response = getReadShareGroupResponse(
readRequestData, readRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false, verifyNoErr = false,
authorizer, authorizer,
readStateResultData readStateResultData
@ -11812,7 +11812,7 @@ class KafkaApisTest extends Logging {
val response = getWriteShareGroupResponse( val response = getWriteShareGroupResponse(
writeRequestData, writeRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true, verifyNoErr = true,
null, null,
writeStateResultData writeStateResultData
@ -11867,7 +11867,7 @@ class KafkaApisTest extends Logging {
val response = getWriteShareGroupResponse( val response = getWriteShareGroupResponse(
writeRequestData, writeRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala, config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false, verifyNoErr = false,
authorizer, authorizer,
writeStateResultData writeStateResultData

View File

@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartit
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.{DeleteRecordsResponseData, LeaderAndIsrRequestData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
@ -6660,6 +6661,61 @@ class ReplicaManagerTest {
} }
} }
@Test
def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = {
val localId = 1
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
val directoryEventHandler = mock(classOf[DirectoryEventHandler])
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
val directoryIds = rm.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
None)
def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code)
}
// default internal topics delete disabled
rm.deleteRecords(
timeout = 0L,
Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L),
responseCallback = callback
)
}
@Test
def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = {
val localId = 1
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
val directoryEventHandler = mock(classOf[DirectoryEventHandler])
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
val directoryIds = rm.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
None)
def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
assert(responseStatus.values.head.errorCode == Errors.NONE.code)
}
// internal topics delete allowed
rm.deleteRecords(
timeout = 0L,
Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L),
responseCallback = callback,
allowInternalTopicDeletion = true
)
}
private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try { try {

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Optional; import java.util.Optional;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10; public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk."; public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.";
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms";
public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";
public static final ConfigDef CONFIG_DEF = new ConfigDef() public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC) .define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC) .define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC) .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC) .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC) .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC); .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
private final int stateTopicNumPartitions; private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor; private final short stateTopicReplicationFactor;
@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize; private final int loadBufferSize;
private final CompressionType compressionType; private final CompressionType compressionType;
private final int appendLingerMs; private final int appendLingerMs;
private final int pruneIntervalMs;
public ShareCoordinatorConfig(AbstractConfig config) { public ShareCoordinatorConfig(AbstractConfig config) {
@ -108,6 +115,7 @@ public class ShareCoordinatorConfig {
.map(CompressionType::forId) .map(CompressionType::forId)
.orElse(null); .orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG); appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
validate(); validate();
} }
@ -151,6 +159,10 @@ public class ShareCoordinatorConfig {
return compressionType; return compressionType;
} }
public int shareCoordinatorTopicPruneIntervalMs() {
return pruneIntervalMs;
}
private void validate() { private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500, Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG)); String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.share;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineLong;
import java.util.Objects;
import java.util.Optional;
/**
* Util class to track the offsets written into the internal topic
* per share partition key.
* It calculates the minimum offset globally up to which the records
* in the internal partition are redundant i.e. they have been overridden
* by newer records.
*/
public class ShareCoordinatorOffsetsManager {
// Map to store share partition key => current partition offset
// being written.
private final TimelineHashMap<SharePartitionKey, Long> offsets;
// Minimum offset representing the smallest necessary offset
// across the internal partition (offsets below this are redundant).
// We are using timeline object here because the offsets which are passed into
// updateState might not be committed yet. In case of retry, these offsets would
// be invalidated via the snapshot registry. Hence, using timeline object
// the values would automatically revert in accordance with the last committed offset.
private final TimelineLong lastRedundantOffset;
public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) {
Objects.requireNonNull(snapshotRegistry);
offsets = new TimelineHashMap<>(snapshotRegistry, 0);
lastRedundantOffset = new TimelineLong(snapshotRegistry);
lastRedundantOffset.set(Long.MAX_VALUE); // For easy application of Math.min.
}
/**
* Method updates internal state with the supplied offset for the provided
* share partition key. It then calculates the minimum offset, if possible,
* below which all offsets are redundant.
*
* @param key - represents {@link SharePartitionKey} whose offset needs updating
* @param offset - represents the latest partition offset for provided key
*/
public void updateState(SharePartitionKey key, long offset) {
lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
offsets.put(key, offset);
Optional<Long> redundantOffset = findRedundantOffset();
redundantOffset.ifPresent(lastRedundantOffset::set);
}
private Optional<Long> findRedundantOffset() {
if (offsets.isEmpty()) {
return Optional.empty();
}
long soFar = Long.MAX_VALUE;
for (long offset : offsets.values()) {
// Get min offset among latest offsets
// for all share keys in the internal partition.
soFar = Math.min(soFar, offset);
// lastRedundantOffset represents the smallest necessary offset
// and if soFar equals it, we cannot proceed. This can happen
// if a share partition key hasn't had records written for a while.
// For example,
// <p>
// key1:1
// key2:2 4 6
// key3:3 5 7
// <p>
// We can see in above that offsets 2, 4, 3, 5 are redundant,
// but we do not have a contiguous prefix starting at lastRedundantOffset
// and we cannot proceed.
if (soFar == lastRedundantOffset.get()) {
return Optional.of(soFar);
}
}
return Optional.of(soFar);
}
/**
* Most recent last redundant offset. This method is to be used
* when the caller wants to query the value of such offset.
* @return Optional of type Long representing the offset or empty for invalid offset values
*/
public Optional<Long> lastRedundantOffset() {
long value = lastRedundantOffset.get();
if (value <= 0 || value == Long.MAX_VALUE) {
return Optional.empty();
}
return Optional.of(value);
}
// visible for testing
TimelineHashMap<SharePartitionKey, Long> curState() {
return offsets;
}
}

View File

@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -60,6 +61,7 @@ import java.util.Map;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier; import java.util.function.IntSupplier;
@ -75,6 +77,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
private final ShareCoordinatorMetrics shareCoordinatorMetrics; private final ShareCoordinatorMetrics shareCoordinatorMetrics;
private volatile int numPartitions = -1; // Number of partitions for __share_group_state. Provided when component is started. private volatile int numPartitions = -1; // Number of partitions for __share_group_state. Provided when component is started.
private final Time time; private final Time time;
private final Timer timer;
private final PartitionWriter writer;
private final Map<TopicPartition, Long> lastPrunedOffsets;
public static class Builder { public static class Builder {
private final int nodeId; private final int nodeId;
@ -184,7 +189,9 @@ public class ShareCoordinatorService implements ShareCoordinator {
config, config,
runtime, runtime,
coordinatorMetrics, coordinatorMetrics,
time time,
timer,
writer
); );
} }
} }
@ -194,12 +201,18 @@ public class ShareCoordinatorService implements ShareCoordinator {
ShareCoordinatorConfig config, ShareCoordinatorConfig config,
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime, CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime,
ShareCoordinatorMetrics shareCoordinatorMetrics, ShareCoordinatorMetrics shareCoordinatorMetrics,
Time time) { Time time,
Timer timer,
PartitionWriter writer
) {
this.log = logContext.logger(ShareCoordinatorService.class); this.log = logContext.logger(ShareCoordinatorService.class);
this.config = config; this.config = config;
this.runtime = runtime; this.runtime = runtime;
this.shareCoordinatorMetrics = shareCoordinatorMetrics; this.shareCoordinatorMetrics = shareCoordinatorMetrics;
this.time = time; this.time = time;
this.timer = timer;
this.writer = writer;
this.lastPrunedOffsets = new ConcurrentHashMap<>();
} }
@Override @Override
@ -240,9 +253,82 @@ public class ShareCoordinatorService implements ShareCoordinator {
log.info("Starting up."); log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt(); numPartitions = shareGroupTopicPartitionCount.getAsInt();
setupRecordPruning();
log.info("Startup complete."); log.info("Startup complete.");
} }
private void setupRecordPruning() {
log.info("Scheduling share-group state topic prune job.");
timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) {
@Override
public void run() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}))
.whenComplete((res, exp) -> {
if (exp != null) {
log.error("Received error in share-group state topic prune.", exp);
}
// Perpetual recursion, failure or not.
setupRecordPruning();
});
}
});
}
private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
// This future will always be completed normally, exception or not.
CompletableFuture<Void> fut = new CompletableFuture<>();
runtime.scheduleWriteOperation(
"write-state-record-prune",
tp,
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
ShareCoordinatorShard::lastRedundantOffset
).whenComplete((result, exception) -> {
if (exception != null) {
log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception);
Errors error = Errors.forException(exception);
// These errors might result from partition metadata not loaded
// or shard re-election. Will cause unnecessary noise, hence not logging
if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) {
log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception);
fut.completeExceptionally(exception);
return;
}
fut.complete(null);
return;
}
if (result.isPresent()) {
Long off = result.get();
Long lastPrunedOffset = lastPrunedOffsets.get(tp);
if (lastPrunedOffset != null && lastPrunedOffset.longValue() == off) {
log.debug("{} already pruned till offset {}", tp, off);
fut.complete(null);
return;
}
log.info("Pruning records in {} till offset {}.", tp, off);
writer.deleteRecords(tp, off)
.whenComplete((res, exp) -> {
if (exp != null) {
log.debug("Exception while deleting records in {} till offset {}.", tp, off, exp);
fut.completeExceptionally(exp);
return;
}
fut.complete(null);
// Best effort prevention of issuing duplicate delete calls.
lastPrunedOffsets.put(tp, off);
});
} else {
log.debug("No offset value for tp {} found.", tp);
fut.complete(null);
}
});
return fut;
}
@Override @Override
public void shutdown() { public void shutdown() {
if (!isActive.compareAndSet(true, false)) { if (!isActive.compareAndSet(true, false)) {
@ -543,8 +629,10 @@ public class ShareCoordinatorService implements ShareCoordinator {
@Override @Override
public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) { public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) {
throwIfNotActive(); throwIfNotActive();
TopicPartition tp = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex);
lastPrunedOffsets.remove(tp);
runtime.scheduleUnloadOperation( runtime.scheduleUnloadOperation(
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex), tp,
partitionLeaderEpoch partitionLeaderEpoch
); );
} }

View File

@ -73,6 +73,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
private final TimelineHashMap<SharePartitionKey, Integer> snapshotUpdateCount; private final TimelineHashMap<SharePartitionKey, Integer> snapshotUpdateCount;
private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap; private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
private MetadataImage metadataImage; private MetadataImage metadataImage;
private final ShareCoordinatorOffsetsManager offsetsManager;
public static final Exception NULL_TOPIC_ID = new Exception("The topic id cannot be null."); public static final Exception NULL_TOPIC_ID = new Exception("The topic id cannot be null.");
public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number."); public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number.");
@ -162,6 +163,17 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
CoordinatorMetrics coordinatorMetrics, CoordinatorMetrics coordinatorMetrics,
CoordinatorMetricsShard metricsShard, CoordinatorMetricsShard metricsShard,
SnapshotRegistry snapshotRegistry SnapshotRegistry snapshotRegistry
) {
this(logContext, config, coordinatorMetrics, metricsShard, snapshotRegistry, new ShareCoordinatorOffsetsManager(snapshotRegistry));
}
ShareCoordinatorShard(
LogContext logContext,
ShareCoordinatorConfig config,
CoordinatorMetrics coordinatorMetrics,
CoordinatorMetricsShard metricsShard,
SnapshotRegistry snapshotRegistry,
ShareCoordinatorOffsetsManager offsetsManager
) { ) {
this.log = logContext.logger(ShareCoordinatorShard.class); this.log = logContext.logger(ShareCoordinatorShard.class);
this.config = config; this.config = config;
@ -171,6 +183,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0); this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0); this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0); this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
this.offsetsManager = offsetsManager;
} }
@Override @Override
@ -195,7 +208,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
switch (key.version()) { switch (key.version()) {
case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: // ShareSnapshot case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: // ShareSnapshot
handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) messageOrNull(value)); handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) messageOrNull(value), offset);
break; break;
case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // ShareUpdate case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // ShareUpdate
handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value)); handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value));
@ -205,7 +218,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
} }
} }
private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue value) { private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue value, long offset) {
SharePartitionKey mapKey = SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition()); SharePartitionKey mapKey = SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch()); maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
maybeUpdateStateEpochMap(mapKey, value.stateEpoch()); maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
@ -219,6 +232,8 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
snapshotUpdateCount.put(mapKey, 0); snapshotUpdateCount.put(mapKey, 0);
} }
} }
offsetsManager.updateState(mapKey, offset);
} }
private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) { private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) {
@ -378,10 +393,22 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(Collections.singletonList(record), responseData); return new CoordinatorResult<>(Collections.singletonList(record), responseData);
} }
/**
* Method which returns the last known redundant offset from the partition
* led by this shard.
* @return CoordinatorResult containing empty record list and an Optional<Long> representing the offset.
*/
public CoordinatorResult<Optional<Long>, CoordinatorRecord> lastRedundantOffset() {
return new CoordinatorResult<>(
Collections.emptyList(),
this.offsetsManager.lastRedundantOffset()
);
}
/** /**
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions. * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
* <p> * <p>
* if no snapshot has been created for the key => create a new ShareSnapshot record * If no snapshot has been created for the key => create a new ShareSnapshot record
* else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record * else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record
* else create a new ShareUpdate record * else create a new ShareUpdate record
* *

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ShareCoordinatorOffsetsManagerTest {
private ShareCoordinatorOffsetsManager manager;
private static final SharePartitionKey KEY1 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0);
private static final SharePartitionKey KEY2 = SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0);
private static final SharePartitionKey KEY3 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1);
private static final SharePartitionKey KEY4 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7);
@BeforeEach
public void setUp() {
manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new LogContext()));
}
@Test
public void testUpdateStateAddsToInternalState() {
manager.updateState(KEY1, 0L);
assertEquals(Optional.empty(), manager.lastRedundantOffset());
manager.updateState(KEY1, 10L);
assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant.
manager.updateState(KEY2, 15L);
assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, still 10L.
assertEquals(10L, manager.curState().get(KEY1));
assertEquals(15L, manager.curState().get(KEY2));
}
private static class ShareOffsetTestHolder {
static class TestTuple {
final SharePartitionKey key;
final long offset;
final Optional<Long> expectedOffset;
private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset) {
this.key = key;
this.offset = offset;
this.expectedOffset = expectedOffset;
}
static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset) {
return new TestTuple(key, offset, expectedOffset);
}
}
private final String testName;
private final List<TestTuple> tuples;
private final boolean shouldRun;
ShareOffsetTestHolder(String testName, List<TestTuple> tuples) {
this(testName, tuples, true);
}
ShareOffsetTestHolder(String testName, List<TestTuple> tuples, boolean shouldRun) {
this.testName = testName;
this.tuples = tuples;
this.shouldRun = shouldRun;
}
}
static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() {
return Stream.of(
new ShareOffsetTestHolder(
"no redundant state single key",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L))
)
),
new ShareOffsetTestHolder(
"no redundant state multiple keys",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(10L))
)
)
);
}
static Stream<ShareOffsetTestHolder> generateRedundantStateCases() {
return Stream.of(
new ShareOffsetTestHolder(
"redundant state single key",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 15L, Optional.of(15L))
)
),
new ShareOffsetTestHolder(
"redundant state multiple keys",
// KEY1: 10 17
// KEY2: 11 16
// KEY3: 15
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned
ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L))
)
)
);
}
static Stream<ShareOffsetTestHolder> generateComplexCases() {
return Stream.of(
new ShareOffsetTestHolder(
"redundant state reverse key order",
// Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(18L))
)
),
new ShareOffsetTestHolder(
"redundant state infrequently written partition.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 25L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(27L))
)
)
);
}
@ParameterizedTest
@MethodSource("generateNoRedundantStateCases")
public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
manager.updateState(tuple.key, tuple.offset);
assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
});
}
}
@ParameterizedTest
@MethodSource("generateRedundantStateCases")
public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
manager.updateState(tuple.key, tuple.offset);
assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
});
}
}
@ParameterizedTest
@MethodSource("generateComplexCases")
public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
manager.updateState(tuple.key, tuple.offset);
assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
});
}
}
}

View File

@ -35,9 +35,13 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics; import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -45,6 +49,8 @@ import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -56,8 +62,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -66,7 +74,10 @@ class ShareCoordinatorServiceTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> mockRuntime() { private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> mockRuntime() {
return (CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>) mock(CoordinatorRuntime.class); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mock(CoordinatorRuntime.class);
when(runtime.activeTopicPartitions())
.thenReturn(Collections.singletonList(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
return runtime;
} }
@Test @Test
@ -74,10 +85,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
new MockTimer(),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -95,10 +108,12 @@ class ShareCoordinatorServiceTest {
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L).thenReturn(150L); when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L).thenReturn(150L);
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
coordinatorMetrics, coordinatorMetrics,
time time,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -203,10 +218,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -303,10 +320,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -348,10 +367,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -393,10 +414,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
String groupId = "group1"; String groupId = "group1";
@ -470,10 +493,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
String groupId = "group1"; String groupId = "group1";
@ -531,10 +556,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -579,10 +606,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -620,10 +649,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
service.startup(() -> 1); service.startup(() -> 1);
@ -646,11 +677,13 @@ class ShareCoordinatorServiceTest {
public void testPartitionFor() { public void testPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService( ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(), new LogContext(),
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()), ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime, runtime,
new ShareCoordinatorMetrics(), new ShareCoordinatorMetrics(),
Time.SYSTEM Time.SYSTEM,
mock(Timer.class),
mock(PartitionWriter.class)
); );
String groupId = "group1"; String groupId = "group1";
@ -673,4 +706,422 @@ class ShareCoordinatorServiceTest {
// asCoordinatorKey does not discriminate on topic name. // asCoordinatorKey does not discriminate on topic name.
assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey()); assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
} }
@Test
public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
when(writer.deleteRecords(
any(),
eq(10L)
)).thenReturn(
CompletableFuture.completedFuture(null)
);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any()
)).thenReturn(
CompletableFuture.completedFuture(Optional.of(10L))
).thenReturn(
CompletableFuture.completedFuture(Optional.of(11L))
);
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 1);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(2))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(2))
.deleteRecords(any(), anyLong());
service.shutdown();
}
@Test
public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
TopicPartition tp1 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
TopicPartition tp2 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
when(runtime.activeTopicPartitions())
.thenReturn(List.of(tp1, tp2));
when(writer.deleteRecords(
any(),
eq(10L)
)).thenReturn(
CompletableFuture.completedFuture(null)
);
when(writer.deleteRecords(
any(),
eq(20L)
)).thenReturn(
CompletableFuture.failedFuture(new Exception("bad stuff"))
);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
eq(tp1),
any(),
any()
)).thenReturn(
CompletableFuture.completedFuture(Optional.of(10L))
).thenReturn(
CompletableFuture.completedFuture(Optional.of(11L))
);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
eq(tp2),
any(),
any()
)).thenReturn(
CompletableFuture.completedFuture(Optional.of(20L))
).thenReturn(
CompletableFuture.completedFuture(Optional.of(21L))
);
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 2);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // Prune should be called.
verify(runtime, times(2)) // For 2 topic partitions.
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // Prune should be called as future completes exceptionally.
verify(runtime, times(4)) // Second prune with 2 topic partitions.
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(4))
.deleteRecords(any(), anyLong());
service.shutdown();
}
@Test
public void testRecordPruningTaskException() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any()
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 1);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(0))
.deleteRecords(any(), anyLong());
service.shutdown();
}
@Test
public void testRecordPruningTaskSuccess() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 1);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(1))
.deleteRecords(any(), eq(20L));
service.shutdown();
}
@Test
public void testRecordPruningTaskEmptyOffsetReturned() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 1);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(0))
.deleteRecords(any(), anyLong());
service.shutdown();
}
@Test
public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
when(writer.deleteRecords(
any(),
eq(10L)
)).thenReturn(
CompletableFuture.completedFuture(null)
);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any()
)).thenReturn(
CompletableFuture.completedFuture(Optional.of(10L))
).thenReturn(
CompletableFuture.completedFuture(Optional.of(10L))
);
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 1);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(2))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(1))
.deleteRecords(any(), anyLong());
service.shutdown();
}
@Test
public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
PartitionWriter writer = mock(PartitionWriter.class);
CompletableFuture<Void> fut1 = new CompletableFuture<>();
fut1.completeExceptionally(new Exception("bad stuff"));
when(writer.deleteRecords(
any(),
eq(10L)
)).thenReturn(
fut1
).thenReturn(
CompletableFuture.completedFuture(null)
);
when(runtime.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any()
)).thenReturn(
CompletableFuture.completedFuture(Optional.of(10L))
).thenReturn(
CompletableFuture.completedFuture(Optional.of(10L))
);
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
time,
timer,
writer
));
service.startup(() -> 1);
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
timer.advanceClock(30005L); // prune should be called
verify(runtime, times(2))
.scheduleWriteOperation(
eq("write-state-record-prune"),
any(),
any(),
any());
verify(writer, times(2))
.deleteRecords(any(), anyLong());
service.shutdown();
}
} }

View File

@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -79,11 +80,12 @@ class ShareCoordinatorShardTest {
private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
private MetadataImage metadataImage = null; private MetadataImage metadataImage = null;
private Map<String, String> configOverrides = new HashMap<>(); private Map<String, String> configOverrides = new HashMap<>();
ShareCoordinatorOffsetsManager offsetsManager = mock(ShareCoordinatorOffsetsManager.class);
ShareCoordinatorShard build() { ShareCoordinatorShard build() {
if (metadataImage == null) metadataImage = mock(MetadataImage.class, RETURNS_DEEP_STUBS); if (metadataImage == null) metadataImage = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
if (config == null) { if (config == null) {
config = ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap(configOverrides)); config = ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap(configOverrides));
} }
ShareCoordinatorShard shard = new ShareCoordinatorShard( ShareCoordinatorShard shard = new ShareCoordinatorShard(
@ -91,7 +93,8 @@ class ShareCoordinatorShardTest {
config, config,
coordinatorMetrics, coordinatorMetrics,
metricsShard, metricsShard,
snapshotRegistry snapshotRegistry,
offsetsManager
); );
when(metadataImage.topics().getTopic((Uuid) any())).thenReturn(mock(TopicImage.class)); when(metadataImage.topics().getTopic((Uuid) any())).thenReturn(mock(TopicImage.class));
when(metadataImage.topics().getPartition(any(), anyInt())).thenReturn(mock(PartitionRegistration.class)); when(metadataImage.topics().getPartition(any(), anyInt())).thenReturn(mock(PartitionRegistration.class));
@ -103,6 +106,11 @@ class ShareCoordinatorShardTest {
this.configOverrides = configOverrides; this.configOverrides = configOverrides;
return this; return this;
} }
public ShareCoordinatorShardBuilder setOffsetsManager(ShareCoordinatorOffsetsManager offsetsManager) {
this.offsetsManager = offsetsManager;
return this;
}
} }
private void writeAndReplayDefaultRecord(ShareCoordinatorShard shard) { private void writeAndReplayDefaultRecord(ShareCoordinatorShard shard) {
@ -796,6 +804,17 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
} }
@Test
public void testLastRedundantOffset() {
ShareCoordinatorOffsetsManager manager = mock(ShareCoordinatorOffsetsManager.class);
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder()
.setOffsetsManager(manager)
.build();
when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L));
assertEquals(new CoordinatorResult<>(Collections.emptyList(), Optional.of(10L)), shard.lastRedundantOffset());
}
@Test @Test
public void testReadStateLeaderEpochUpdateSuccess() { public void testReadStateLeaderEpochUpdateSuccess() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();

View File

@ -28,7 +28,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class ShareCoordinatorConfigTest { public class ShareCoordinatorTestConfig {
private static final List<ConfigDef> CONFIG_DEF_LIST = Collections.singletonList( private static final List<ConfigDef> CONFIG_DEF_LIST = Collections.singletonList(
ShareCoordinatorConfig.CONFIG_DEF ShareCoordinatorConfig.CONFIG_DEF
@ -50,6 +50,7 @@ public class ShareCoordinatorConfigTest {
configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555"); configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555");
configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10"); configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, String.valueOf(CompressionType.NONE.id)); configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, String.valueOf(CompressionType.NONE.id));
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, "30000"); // 30 seconds
return configs; return configs;
} }