KAFKA-18654[2/2]: Transction V2 retry add partitions on the server side when handling produce request. (#18810)

During the transaction commit phase, it is normal to hit CONCURRENT_TRANSACTION error before the transaction markers are fully propagated. Instead of letting the client to retry the produce request, it is better to retry on the server side.

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Calvin Liu 2025-02-13 09:30:58 -08:00 committed by GitHub
parent 9fbf14d544
commit 9cb271f1e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 178 additions and 12 deletions

View File

@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
@ -206,8 +206,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _transactionLogConfig = new TransactionLogConfig(this)
private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this)
private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig
def addPartitionsToTxnConfig: AddPartitionsToTxnConfig = _addPartitionsToTxnConfig
private val _quotaConfig = new QuotaConfig(this)
def quotaConfig: QuotaConfig = _quotaConfig

View File

@ -790,18 +790,49 @@ class ReplicaManager(val config: KafkaConfig,
return
}
// Wrap the callback to be handled on an arbitrary request handler thread
// when transaction verification is complete. The request local passed in
// is only used when the callback is executed immediately.
val wrappedPostVerificationCallback = KafkaRequestHandler.wrapAsyncCallback(
postVerificationCallback,
requestLocal
)
val retryTimeoutMs = Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), config.requestTimeoutMs)
val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
val startVerificationTimeMs = time.milliseconds
def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = {
if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
// We've exceeded the retry timeout, so just call the callback with whatever results we have
wrappedPostVerificationCallback(results)
} else if (results._1.values.exists(_ == Errors.CONCURRENT_TRANSACTIONS)) {
// Retry the verification with backoff
scheduler.scheduleOnce("retry-add-partitions-to-txn", () => {
maybeSendPartitionsToTransactionCoordinator(
topicPartitionBatchInfo,
transactionalId,
transactionalProducerInfo.head._1,
transactionalProducerInfo.head._2,
maybeRetryOnConcurrentTransactions,
transactionSupportedOperation
)
}, addPartitionsRetryBackoffMs * 1L)
} else {
// We don't have concurrent transaction errors, so just call the callback with the results
wrappedPostVerificationCallback(results)
}
}
maybeSendPartitionsToTransactionCoordinator(
topicPartitionBatchInfo,
transactionalId,
transactionalProducerInfo.head._1,
transactionalProducerInfo.head._2,
// Wrap the callback to be handled on an arbitrary request handler thread
// when transaction verification is complete. The request local passed in
// is only used when the callback is executed immediately.
KafkaRequestHandler.wrapAsyncCallback(
postVerificationCallback,
requestLocal
),
// If we add partition directly from produce request,
// we should retry on concurrent transaction error here because:
// - the produce backoff adds too much delay
// - the produce request is expensive to retry
if (transactionSupportedOperation.supportsEpochBump) maybeRetryOnConcurrentTransactions else wrappedPostVerificationCallback,
transactionSupportedOperation
)
}

View File

@ -65,7 +65,7 @@ import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard}
@ -2252,6 +2252,80 @@ class ReplicaManagerTest {
}
}
@ParameterizedTest
@EnumSource(
value = classOf[Errors],
names = Array(
"NOT_COORDINATOR",
"CONCURRENT_TRANSACTIONS"
)
)
def testTransactionAddPartitionRetry(error: Errors): Unit = {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
val sequence = 6
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val scheduler = new MockScheduler(time)
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), scheduler = scheduler)
try {
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
// Append some transactional records.
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord("message".getBytes))
// We should add these partitions to the manager to verify.
val result = handleProduceAppend(replicaManager, tp0, transactionalRecords, origin = AppendOrigin.CLIENT,
transactionalId = transactionalId, transactionSupportedOperation = addPartition)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture(),
any()
)
val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
// Confirm we did not write to the log and instead returned error.
var callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
callback(Map(tp0 -> error).toMap)
if (error != Errors.CONCURRENT_TRANSACTIONS) {
// NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS
assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
} else {
// The append should not finish with error, it should retry later.
assertFalse(result.hasFired)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1)
scheduler.tick()
verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture(),
any()
)
callback = appendCallback.getValue()
callback(Map.empty[TopicPartition, Errors].toMap)
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId, producerEpoch))
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testTransactionVerificationBlocksOutOfOrderSequence(): Unit = {
val tp0 = new TopicPartition(topic, 0)
@ -3120,7 +3194,8 @@ class ReplicaManagerTest {
private def setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager: AddPartitionsToTxnManager,
transactionalTopicPartitions: List[TopicPartition],
config: KafkaConfig = config): ReplicaManager = {
config: KafkaConfig = config,
scheduler: Scheduler = new MockScheduler(time)): ReplicaManager = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val metadataCache = mock(classOf[MetadataCache])
@ -3128,7 +3203,7 @@ class ReplicaManagerTest {
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
scheduler = scheduler,
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = metadataCache,

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
import org.apache.kafka.coordinator.transaction.AddPartitionsToTxnConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.network.SocketServerConfigs;
@ -63,7 +64,8 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
MetricConfigs.CONFIG_DEF,
QuotaConfig.CONFIG_DEF,
BrokerSecurityConfigs.CONFIG_DEF,
DelegationTokenManagerConfigs.CONFIG_DEF
DelegationTokenManagerConfigs.CONFIG_DEF,
AddPartitionsToTxnConfig.CONFIG_DEF
));
public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
public final class AddPartitionsToTxnConfig {
// The default config values for the server-side add partition to transaction operations.
public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG = "add.partitions.to.txn.retry.backoff.max.ms";
public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT = 100;
public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC = "The maximum allowed timeout for adding " +
"partitions to transactions on the server side. It only applies to the actual add partition operations, " +
"not the verification. It will not be effective if it is larger than request.timeout.ms";
public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG = "add.partitions.to.txn.retry.backoff.ms";
public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT = 20;
public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC = "The server-side retry backoff when the server attempts" +
"to add the partition to the transaction";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG, INT, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DEFAULT, atLeast(0), HIGH, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_DOC)
.define(ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG, INT, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT, atLeast(1), HIGH, ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC);
private final int addPartitionsToTxnRetryBackoffMaxMs;
private final int addPartitionsToTxnRetryBackoffMs;
public AddPartitionsToTxnConfig(AbstractConfig config) {
addPartitionsToTxnRetryBackoffMaxMs = config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MAX_MS_CONFIG);
addPartitionsToTxnRetryBackoffMs = config.getInt(AddPartitionsToTxnConfig.ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG);
}
public int addPartitionsToTxnRetryBackoffMaxMs() {
return addPartitionsToTxnRetryBackoffMaxMs;
}
public int addPartitionsToTxnRetryBackoffMs() {
return addPartitionsToTxnRetryBackoffMs;
}
}