From 9cb271f1e174ad7697a705ae83e159fdd07bf8ef Mon Sep 17 00:00:00 2001 From: Calvin Liu <83986057+CalvinConfluent@users.noreply.github.com> Date: Thu, 13 Feb 2025 09:30:58 -0800 Subject: [PATCH] KAFKA-18654[2/2]: Transction V2 retry add partitions on the server side when handling produce request. (#18810) During the transaction commit phase, it is normal to hit CONCURRENT_TRANSACTION error before the transaction markers are fully propagated. Instead of letting the client to retry the produce request, it is better to retry on the server side. Reviewers: Artem Livshits , Justine Olshan --- .../main/scala/kafka/server/KafkaConfig.scala | 4 +- .../scala/kafka/server/ReplicaManager.scala | 45 +++++++++-- .../kafka/server/ReplicaManagerTest.scala | 81 ++++++++++++++++++- .../server/config/AbstractKafkaConfig.java | 4 +- .../transaction/AddPartitionsToTxnConfig.java | 56 +++++++++++++ 5 files changed, 178 insertions(+), 12 deletions(-) create mode 100644 transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6698d9eb21f..17b8cefb1ee 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.share.ShareCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} +import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AuthorizerUtils @@ -206,8 +206,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _transactionLogConfig = new TransactionLogConfig(this) private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this) + private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this) def transactionLogConfig: TransactionLogConfig = _transactionLogConfig def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig + def addPartitionsToTxnConfig: AddPartitionsToTxnConfig = _addPartitionsToTxnConfig private val _quotaConfig = new QuotaConfig(this) def quotaConfig: QuotaConfig = _quotaConfig diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index aa4d8ae24cc..f2e030b3e1f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -790,18 +790,49 @@ class ReplicaManager(val config: KafkaConfig, return } + // Wrap the callback to be handled on an arbitrary request handler thread + // when transaction verification is complete. The request local passed in + // is only used when the callback is executed immediately. + val wrappedPostVerificationCallback = KafkaRequestHandler.wrapAsyncCallback( + postVerificationCallback, + requestLocal + ) + + val retryTimeoutMs = Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), config.requestTimeoutMs) + val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + val startVerificationTimeMs = time.milliseconds + def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { + if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) { + // We've exceeded the retry timeout, so just call the callback with whatever results we have + wrappedPostVerificationCallback(results) + } else if (results._1.values.exists(_ == Errors.CONCURRENT_TRANSACTIONS)) { + // Retry the verification with backoff + scheduler.scheduleOnce("retry-add-partitions-to-txn", () => { + maybeSendPartitionsToTransactionCoordinator( + topicPartitionBatchInfo, + transactionalId, + transactionalProducerInfo.head._1, + transactionalProducerInfo.head._2, + maybeRetryOnConcurrentTransactions, + transactionSupportedOperation + ) + }, addPartitionsRetryBackoffMs * 1L) + } else { + // We don't have concurrent transaction errors, so just call the callback with the results + wrappedPostVerificationCallback(results) + } + } + maybeSendPartitionsToTransactionCoordinator( topicPartitionBatchInfo, transactionalId, transactionalProducerInfo.head._1, transactionalProducerInfo.head._2, - // Wrap the callback to be handled on an arbitrary request handler thread - // when transaction verification is complete. The request local passed in - // is only used when the callback is executed immediately. - KafkaRequestHandler.wrapAsyncCallback( - postVerificationCallback, - requestLocal - ), + // If we add partition directly from produce request, + // we should retry on concurrent transaction error here because: + // - the produce backoff adds too much delay + // - the produce request is expensive to retry + if (transactionSupportedOperation.supportsEpochBump) maybeRetryOnConcurrentTransactions else wrappedPostVerificationCallback, transactionSupportedOperation ) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0292f31b7db..616850ebc3e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -65,7 +65,7 @@ import org.apache.kafka.server.share.SharePartitionKey import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch} import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.server.util.timer.MockTimer -import org.apache.kafka.server.util.{MockScheduler, MockTime} +import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard} @@ -2252,6 +2252,80 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @EnumSource( + value = classOf[Errors], + names = Array( + "NOT_COORDINATOR", + "CONCURRENT_TRANSACTIONS" + ) + ) + def testTransactionAddPartitionRetry(error: Errors): Unit = { + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 6 + val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + val scheduler = new MockScheduler(time) + + val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), scheduler = scheduler) + try { + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), + (_, _) => ()) + + // Append some transactional records. + val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + + // We should add these partitions to the manager to verify. + val result = handleProduceAppend(replicaManager, tp0, transactionalRecords, origin = AppendOrigin.CLIENT, + transactionalId = transactionalId, transactionSupportedOperation = addPartition) + val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) + verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(producerId), + ArgumentMatchers.eq(producerEpoch), + ArgumentMatchers.eq(Seq(tp0)), + appendCallback.capture(), + any() + ) + val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId) + assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) + + // Confirm we did not write to the log and instead returned error. + var callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() + callback(Map(tp0 -> error).toMap) + + if (error != Errors.CONCURRENT_TRANSACTIONS) { + // NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + } else { + // The append should not finish with error, it should retry later. + assertFalse(result.hasFired) + assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) + + time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1) + scheduler.tick() + + verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(producerId), + ArgumentMatchers.eq(producerEpoch), + ArgumentMatchers.eq(Seq(tp0)), + appendCallback.capture(), + any() + ) + callback = appendCallback.getValue() + callback(Map.empty[TopicPartition, Errors].toMap) + assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId)) + assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, producerEpoch)) + } + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @Test def testTransactionVerificationBlocksOutOfOrderSequence(): Unit = { val tp0 = new TopicPartition(topic, 0) @@ -3120,7 +3194,8 @@ class ReplicaManagerTest { private def setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager: AddPartitionsToTxnManager, transactionalTopicPartitions: List[TopicPartition], - config: KafkaConfig = config): ReplicaManager = { + config: KafkaConfig = config, + scheduler: Scheduler = new MockScheduler(time)): ReplicaManager = { val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val metadataCache = mock(classOf[MetadataCache]) @@ -3128,7 +3203,7 @@ class ReplicaManagerTest { metrics = metrics, config = config, time = time, - scheduler = new MockScheduler(time), + scheduler = scheduler, logManager = mockLogMgr, quotaManagers = quotaManager, metadataCache = metadataCache, diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 7b01aa5813b..918534fce7a 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; import org.apache.kafka.coordinator.share.ShareCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.AddPartitionsToTxnConfig; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; import org.apache.kafka.network.SocketServerConfigs; @@ -63,7 +64,8 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { MetricConfigs.CONFIG_DEF, QuotaConfig.CONFIG_DEF, BrokerSecurityConfigs.CONFIG_DEF, - DelegationTokenManagerConfigs.CONFIG_DEF + DelegationTokenManagerConfigs.CONFIG_DEF, + AddPartitionsToTxnConfig.CONFIG_DEF )); public AbstractKafkaConfig(ConfigDef definition, Map originals, Map configProviderProps, boolean doLog) { diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java new file mode 100644 index 00000000000..951bebddf06 --- /dev/null +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; + +public final class AddPartitionsToTxnConfig { + // The default config values for the server-side add partition to transaction operations. + public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG = "add.partitions.to.txn.retry.backoff.max.ms"; + public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT = 100; + public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC = "The maximum allowed timeout for adding " + + "partitions to transactions on the server side. It only applies to the actual add partition operations, " + + "not the verification. It will not be effective if it is larger than request.timeout.ms"; + public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG = "add.partitions.to.txn.retry.backoff.ms"; + public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT = 20; + public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC = "The server-side retry backoff when the server attempts" + + "to add the partition to the transaction"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG, INT, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT, atLeast(0), HIGH, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC) + .define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG, INT, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT, atLeast(1), HIGH, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC); + + private final int addPartitionsToTxnRetryBackoffMaxMs; + private final int addPartitionsToTxnRetryBackoffMs; + + public AddPartitionsToTxnConfig(AbstractConfig config) { + addPartitionsToTxnRetryBackoffMaxMs = config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG); + addPartitionsToTxnRetryBackoffMs = config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG); + } + public int addPartitionsToTxnRetryBackoffMaxMs() { + return addPartitionsToTxnRetryBackoffMaxMs; + } + public int addPartitionsToTxnRetryBackoffMs() { + return addPartitionsToTxnRetryBackoffMs; + } +}