KAFKA-15853 Move transactions configs out of core (#15670)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Omnia Ibrahim 2024-04-12 17:29:51 +01:00 committed by GitHub
parent 31171d0791
commit 61baa7ac6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 278 additions and 276 deletions

View File

@ -2181,6 +2181,7 @@ project(':streams') {
testImplementation project(':tools')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage')
testImplementation project(':transaction-coordinator')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':server')

View File

@ -413,6 +413,7 @@
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfigs" />
</subpackage>
<subpackage name="test">

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
@ -90,13 +90,13 @@ class TransactionStateManager(brokerId: Int,
@volatile private var transactionTopicPartitionCount: Int = _
/** setup metrics*/
private val partitionLoadSensor = metrics.sensor(TransactionStateManagerConfig.LOAD_TIME_SENSOR)
private val partitionLoadSensor = metrics.sensor(TransactionStateManagerConfigs.LOAD_TIME_SENSOR)
partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
TransactionStateManagerConfig.METRICS_GROUP,
TransactionStateManagerConfigs.METRICS_GROUP,
"The max time it took to load the partitions in the last 30sec"), new Max())
partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
TransactionStateManagerConfig.METRICS_GROUP,
TransactionStateManagerConfigs.METRICS_GROUP,
"The avg time it took to load the partitions in the last 30sec"), new Avg())
// visible for testing only
@ -803,15 +803,15 @@ private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
private[transaction] case class CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int,
transactionMetadata: TransactionMetadata)
private[transaction] case class TransactionConfig(transactionalIdExpirationMs: Int = TransactionStateManagerConfig.DEFAULT_TRANSACTIONAL_ID_EXPIRATION_MS,
transactionMaxTimeoutMs: Int = TransactionStateManagerConfig.DEFAULT_TRANSACTIONS_MAX_TIMEOUT_MS,
transactionLogNumPartitions: Int = TransactionLogConfig.DEFAULT_NUM_PARTITIONS,
transactionLogReplicationFactor: Short = TransactionLogConfig.DEFAULT_REPLICATION_FACTOR,
transactionLogSegmentBytes: Int = TransactionLogConfig.DEFAULT_SEGMENT_BYTES,
transactionLogLoadBufferSize: Int = TransactionLogConfig.DEFAULT_LOAD_BUFFER_SIZE,
transactionLogMinInsyncReplicas: Int = TransactionLogConfig.DEFAULT_MIN_IN_SYNC_REPLICAS,
abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS,
removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfig.DEFAULT_REMOVE_EXPIRED_TRANSACTIONAL_IDS_INTERVAL_MS,
private[transaction] case class TransactionConfig(transactionalIdExpirationMs: Int = TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
transactionMaxTimeoutMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
transactionLogNumPartitions: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
transactionLogReplicationFactor: Short = TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
transactionLogSegmentBytes: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
transactionLogLoadBufferSize: Int = TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
transactionLogMinInsyncReplicas: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
requestTimeoutMs: Int = Defaults.REQUEST_TIMEOUT_MS)
case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short) {

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
@ -84,6 +85,7 @@ import scala.jdk.CollectionConverters._
object DynamicBrokerConfig {
private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
val AllDynamicConfigs = DynamicSecurityConfigs ++
LogCleaner.ReconfigurableConfigs ++
@ -92,7 +94,7 @@ object DynamicBrokerConfig {
Set(KafkaConfig.MetricReporterClassesProp) ++
DynamicListenerConfig.ReconfigurableConfigs ++
SocketServer.ReconfigurableConfigs ++
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala ++
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp, KafkaConfig.NumNetworkThreadsProp)
@ -1136,21 +1138,21 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) {
info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}")
info(s"Reconfigure ${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}")
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
}
if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) {
info(s"Reconfigure ${KafkaConfig.TransactionPartitionVerificationEnableProp} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}")
info(s"Reconfigure ${TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}")
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
}
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
if (newConfig.producerIdExpirationMs < 0)
throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}")
throw new ConfigException(s"${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}")
}
override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
override def reconfigurableConfigs: Set[String] = DynamicProducerStateManagerConfig
}

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.security.PasswordEncoderConfigs
@ -49,7 +50,7 @@ import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ServerTop
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig
@ -265,22 +266,6 @@ object KafkaConfig {
val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms"
val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms"
val TransactionsMaxTimeoutMsProp = "transaction.max.timeout.ms"
val TransactionsTopicMinISRProp = "transaction.state.log.min.isr"
val TransactionsLoadBufferSizeProp = "transaction.state.log.load.buffer.size"
val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor"
val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = "transaction.abort.timed.out.transaction.cleanup.interval.ms"
val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = "transaction.remove.expired.transaction.cleanup.interval.ms"
val TransactionPartitionVerificationEnableProp = "transaction.partition.verification.enable"
val ProducerIdExpirationMsProp = ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS
val ProducerIdExpirationCheckIntervalMsProp = "producer.id.expiration.check.interval.ms"
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlots = "max.incremental.fetch.session.cache.slots"
val FetchMaxBytes = "fetch.max.bytes"
@ -637,26 +622,6 @@ object KafkaConfig {
val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
"or this timeout is reached. This is similar to the producer request timeout."
val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden."
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsDoc = "The time in ms that the transaction coordinator will wait without receiving any transaction status updates " +
"for the current transaction before expiring its transactional id. Transactional IDs will not expire while a the transaction is still ongoing."
val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
"If a clients requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large)."
val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this replication factor requirement."
val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out"
val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing"
val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition"
val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " +
"Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " +
"<code>delivery.timeout.ms</code> can help prevent expiration during retries and protect against message duplication, but the default should be reasonable for most use cases."
val ProducerIdExpirationCheckIntervalMsDoc = "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing."
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."
@ -940,21 +905,21 @@ object KafkaConfig {
.define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc)
/** ********* Transaction management configuration ***********/
.define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc)
.define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc)
.define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc)
.define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc)
.define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc)
.define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc)
.define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc)
.define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
.define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC)
.define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc)
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
.define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc)
.define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
// Configuration for testing only as default value should be sufficient for typical usage
.defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS, atLeast(1), LOW, ProducerIdExpirationCheckIntervalMsDoc)
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC)
/** ********* Fetch Configuration **************/
.define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc)
@ -1591,20 +1556,20 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val offsetsTopicCompressionType = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionType.forId(value)).orNull
/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
val transactionMaxTimeoutMs = getInt(KafkaConfig.TransactionsMaxTimeoutMsProp)
val transactionTopicMinISR = getInt(KafkaConfig.TransactionsTopicMinISRProp)
val transactionsLoadBufferSize = getInt(KafkaConfig.TransactionsLoadBufferSizeProp)
val transactionTopicReplicationFactor = getShort(KafkaConfig.TransactionsTopicReplicationFactorProp)
val transactionTopicPartitions = getInt(KafkaConfig.TransactionsTopicPartitionsProp)
val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
val transactionTopicMinISR = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG)
val transactionsLoadBufferSize = getInt(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG)
val transactionTopicReplicationFactor = getShort(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG)
val transactionTopicPartitions = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG)
val transactionTopicSegmentBytes = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG)
val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG)
def transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
def transactionPartitionVerificationEnable = getBoolean(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
def producerIdExpirationMs = getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG)
val producerIdExpirationCheckIntervalMs = getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG)
/** ********* Metric Configuration **************/
val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.resource.{Resource, ResourcePattern}
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRANSACTIONAL_ID}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.junit.jupiter.api.{BeforeEach, TestInfo}
@ -111,9 +112,9 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName)
}

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.junit.jupiter.api.Assertions._
@ -88,9 +89,9 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
}

View File

@ -19,17 +19,17 @@ package kafka.api
import java.util
import java.util.{Collections, List, Map, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
import org.apache.kafka.clients.admin.{Admin,AlterConfigOp, ConfigEntry, ProducerState}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@ -202,7 +202,7 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness {
}
private def producerIdExpirationConfig(configValue: String): Map[ConfigResource, util.Collection[AlterConfigOp]] = {
val producerIdCfg = new ConfigEntry(KafkaConfig.ProducerIdExpirationMsProp, configValue)
val producerIdCfg = new ConfigEntry(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue)
val configs = Collections.singletonList(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET))
Collections.singletonMap(configResource, configs)
}
@ -228,18 +228,18 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness {
// Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
serverProps.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString)
serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "5000")
serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, "500")
serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "10000")
serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "5000")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500")
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, "10000")
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500")
serverProps
}
}

View File

@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -41,8 +42,8 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
val brokerCount = 1
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
this.serverConfig.setProperty(KafkaConfig.TransactionsTopicMinISRProp, "1")
this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val topic = "topic"

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
@ -52,8 +53,8 @@ class TransactionsBounceTest extends IntegrationTestHarness {
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 3.toString)
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
overridingProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 1.toString)
overridingProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 3.toString)
overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString)
overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 3.toString)
overridingProps.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
overridingProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")

View File

@ -18,7 +18,6 @@
package kafka.api
import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
@ -28,6 +27,7 @@ import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@ -203,18 +203,18 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
// Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
serverProps.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString)
serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "10000")
serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, "500")
serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "5000")
serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "10000")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500")
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, "5000")
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500")
serverProps
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerGrou
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@ -61,18 +62,18 @@ class TransactionsTest extends IntegrationTestHarness {
props.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
// Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
props.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
props.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
props.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
props.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString)
props.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
props.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
props.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
props.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
// The new group coordinator does not support verifying transactions yet.
if (isNewGroupCoordinatorEnabled()) {
props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "false")
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
}
props

View File

@ -24,6 +24,7 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@ -105,14 +106,14 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicMinISRProp, 1.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString)
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString)
serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
serverProps
}

View File

@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -1271,7 +1272,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// Dynamically turn verification off.
val configPrefix = listenerPrefix(SecureExternal)
val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS, configPrefix)
updatedProps.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "false")
updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
alterConfigsUsingConfigCommand(updatedProps)
verifyConfiguration(false)
@ -1283,7 +1284,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyConfiguration(false)
// Turn verification back on.
updatedProps.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "true")
updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true")
alterConfigsUsingConfigCommand(updatedProps)
verifyConfiguration(true)
}

View File

@ -26,7 +26,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -52,8 +52,8 @@ object StressTestLog {
scheduler = time.scheduler,
time = time,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,

View File

@ -29,7 +29,7 @@ import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -221,8 +221,8 @@ object TestLinearWriteSpeed {
brokerTopicStats = new BrokerTopicStats,
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.util.MockTime
@ -300,7 +301,7 @@ class PartitionLockTest extends Logging {
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeout = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false)
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,

View File

@ -52,6 +52,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@ -434,7 +435,7 @@ class PartitionTest extends AbstractPartitionTest {
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, true)
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs
import org.apache.kafka.server.util.MockScheduler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -1004,7 +1004,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
val expectedTransition = TxnTransitMetadata(producerId, producerId, (producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS)
txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
@ -1015,7 +1015,7 @@ class TransactionCoordinatorTest {
).thenAnswer(_ => {})
coordinator.startup(() => transactionStatePartitionCount, false)
time.sleep(TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS)
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
scheduler.tick()
verify(transactionManager).timedOutTransactions()
verify(transactionManager, times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
@ -1064,7 +1064,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, metadata))))
coordinator.startup(() => transactionStatePartitionCount, false)
time.sleep(TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS)
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
scheduler.tick()
verify(transactionManager).timedOutTransactions()
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
@ -1088,7 +1088,7 @@ class TransactionCoordinatorTest {
val bumpedEpoch = (producerEpoch + 1).toShort
val expectedTransition = TxnTransitMetadata(producerId, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS)
PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
@ -1099,7 +1099,7 @@ class TransactionCoordinatorTest {
).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS))
coordinator.startup(() => transactionStatePartitionCount, false)
time.sleep(TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS)
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
scheduler.tick()
verify(transactionManager).timedOutTransactions()

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.junit.jupiter.api.{AfterEach, Tag}
@ -113,8 +113,8 @@ abstract class AbstractLogCleanerIntegrationTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)

View File

@ -22,7 +22,7 @@ import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -64,8 +64,8 @@ class BrokerCompressionTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
@ -54,7 +54,7 @@ class LogCleanerManagerTest extends Logging {
val logConfig: LogConfig = new LogConfig(logProps)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
val offset = 999
val producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false)
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
@ -105,7 +105,7 @@ class LogCleanerManagerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val config = createLowRetentionLogConfig(logSegmentSize, TopicConfig.CLEANUP_POLICY_COMPACT)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time)
@ -814,7 +814,7 @@ class LogCleanerManagerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
@ -868,7 +868,7 @@ class LogCleanerManagerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
@ -61,7 +61,7 @@ class LogCleanerTest extends Logging {
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
val producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false)
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
@AfterEach
def teardown(): Unit = {
@ -164,7 +164,7 @@ class LogCleanerTest extends Logging {
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, dir,
@ -2032,7 +2032,7 @@ class LogCleanerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true

View File

@ -24,7 +24,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
@ -152,8 +152,8 @@ class LogConcurrencyTest {
brokerTopicStats = brokerTopicStats,
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true

View File

@ -29,9 +29,9 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
@ -56,8 +56,8 @@ class LogLoaderTest {
var config: KafkaConfig = _
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false)
val producerIdExpirationCheckIntervalMs: Int = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS
val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val producerIdExpirationCheckIntervalMs: Int = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
var logsToClose: Seq[UnifiedLog] = Seq()
@ -99,7 +99,7 @@ class LogLoaderTest {
val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
// Create a LogManager with some overridden methods to facilitate interception of clean shutdown
// flag and to inject an error
@ -347,7 +347,7 @@ class LogLoaderTest {
def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
// Intercept all segment read calls
@ -504,7 +504,7 @@ class LogLoaderTest {
firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch)
assertEquals(firstAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
val maxProducerIdExpirationMs = Defaults.PRODUCER_ID_EXPIRATION_MS
val maxProducerIdExpirationMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT
mockTime.sleep(maxProducerIdExpirationMs)
assertEquals(Optional.empty(), log.producerStateManager.lastEntry(producerId))

View File

@ -30,10 +30,10 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopic
import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.image.{TopicImage, TopicsImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.config.Defaults
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@ -822,7 +822,7 @@ class LogManagerTest {
val segmentBytes = 1024
val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0,
5 * 60 * 1000, new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false), Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS)
5 * 60 * 1000, new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT)
assertTrue(expectedSegmentsPerLog > 0)
// calculate numMessages to append to logs. It'll create "expectedSegmentsPerLog" log segments with segment.bytes=1024
@ -958,7 +958,7 @@ class LogManagerTest {
recoveryPoint = 0,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
scheduler = mockTime.scheduler,
time = mockTime,
brokerTopicStats = mockBrokerTopicStats,

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{BatchMetadata, EpochEntry, LogConfig, LogFileUtils, LogSegment, LogSegmentOffsetOverflowException, ProducerStateEntry, ProducerStateManager, ProducerStateManagerConfig, RollParams}
@ -608,7 +608,7 @@ class LogSegmentTest {
topicPartition,
logDir,
5 * 60 * 1000,
new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
new MockTime()
)
}

View File

@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
@ -92,8 +92,8 @@ object LogTestUtils {
logStartOffset: Long = 0L,
recoveryPoint: Long = 0L,
maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs: Int = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs: Int = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.storage.internals.log.{AppendOrigin, CompletedTxn, LogFileUtils, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, ProducerStateManager, ProducerStateManagerConfig, TxnMetadata, VerificationStateEntry}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -47,7 +47,7 @@ class ProducerStateManagerTest {
private val partition = new TopicPartition("test", 0)
private val producerId = 1L
private val maxTransactionTimeoutMs = 5 * 60 * 1000
private val producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, true)
private val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true)
private val lateTransactionTimeoutMs = maxTransactionTimeoutMs + ProducerStateManager.LATE_TRANSACTION_BUFFER_MS
private val time = new MockTime

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
@ -63,7 +63,7 @@ class UnifiedLogTest {
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val mockTime = new MockTime()
var logsToClose: Seq[UnifiedLog] = Seq()
val producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false)
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
def metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@BeforeEach
@ -4229,7 +4229,7 @@ class UnifiedLogTest {
time: Time = mockTime,
maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
producerStateManagerConfig: ProducerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs: Int = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerIdExpirationCheckIntervalMs: Int = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
@ -67,10 +68,10 @@ class AutoTopicCreationManagerTest {
props.setProperty(KafkaConfig.RequestTimeoutMsProp, requestTimeout.toString)
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, internalTopicPartitions.toString)
props.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, internalTopicPartitions.toString)
props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString)
props.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, internalTopicReplicationFactor.toString)
props.setProperty(KafkaConfig.TransactionsTopicPartitionsProp, internalTopicReplicationFactor.toString)
props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
config = KafkaConfig.fromProps(props)
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))

View File

@ -73,6 +73,7 @@ import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common._
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
@ -1190,8 +1191,8 @@ class KafkaApisTest extends Logging {
groupId, AuthorizationResult.ALLOWED)
Topic.GROUP_METADATA_TOPIC_NAME
case CoordinatorType.TRANSACTION =>
topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString)
when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID,
groupId, AuthorizationResult.ALLOWED)
@ -1300,8 +1301,8 @@ class KafkaApisTest extends Logging {
true
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString)
when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
true
case _ =>

View File

@ -37,6 +37,7 @@ import java.util.{Collections, Properties}
import org.apache.kafka.common.Node
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.config.{KafkaSecurityConfigs, ServerTopicConfigSynonyms, ZkConfigs}
@ -901,13 +902,13 @@ class KafkaConfigTest {
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
case KafkaConfig.TransactionalIdExpirationMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsMaxTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsTopicMinISRProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsLoadBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")

View File

@ -73,6 +73,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
import kafka.log.remote.RemoteLogManager
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.util.timer.MockTimer
@ -2460,7 +2461,7 @@ class ReplicaManagerTest {
// Dynamically enable verification.
config.dynamicConfig.initialize(None, None)
val props = new Properties()
props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "true")
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
TestUtils.waitUntilTrue(() => config.transactionPartitionVerificationEnable == true, "Config did not dynamically update.")
@ -2512,7 +2513,7 @@ class ReplicaManagerTest {
// Disable verification
config.dynamicConfig.initialize(None, None)
val props = new Properties()
props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "false")
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
TestUtils.waitUntilTrue(() => config.transactionPartitionVerificationEnable == false, "Config did not dynamically update.")

View File

@ -33,10 +33,10 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.snapshot.RecordsSnapshotWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -77,8 +77,8 @@ class DumpLogSegmentsTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, false),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true

View File

@ -22,7 +22,7 @@ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import kafka.log.{LocalLog, LogLoader, UnifiedLog}
import kafka.server.BrokerTopicStats
import kafka.utils.TestUtils.retry
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
@ -134,8 +134,8 @@ class SchedulerTest {
val logConfig = new LogConfig(new Properties())
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = Defaults.PRODUCER_ID_EXPIRATION_MS
val producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS
val maxProducerIdExpirationMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT
val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)

View File

@ -70,11 +70,12 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.{ClientMetricsManager, ControllerRequestCompletionHandler}
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.util.timer.SystemTimer
@ -1515,8 +1516,8 @@ object TestUtils extends Logging {
flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(Defaults.PRODUCER_ID_EXPIRATION_MS, transactionVerificationEnabled),
producerIdExpirationCheckIntervalMs = Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, transactionVerificationEnabled),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats,

View File

@ -25,8 +25,6 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.OffsetConfig;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.security.PasswordEncoderConfigs;
import org.apache.kafka.server.common.MetadataVersion;
@ -159,20 +157,6 @@ public class Defaults {
public static final int OFFSET_COMMIT_TIMEOUT_MS = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS;
public static final short OFFSET_COMMIT_REQUIRED_ACKS = OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS;
/** ********* Transaction management configuration *********/
public static final int TRANSACTIONAL_ID_EXPIRATION_MS = TransactionStateManagerConfig.DEFAULT_TRANSACTIONAL_ID_EXPIRATION_MS;
public static final int TRANSACTIONS_MAX_TIMEOUT_MS = TransactionStateManagerConfig.DEFAULT_TRANSACTIONS_MAX_TIMEOUT_MS;
public static final int TRANSACTIONS_TOPIC_MIN_ISR = TransactionLogConfig.DEFAULT_MIN_IN_SYNC_REPLICAS;
public static final int TRANSACTIONS_LOAD_BUFFER_SIZE = TransactionLogConfig.DEFAULT_LOAD_BUFFER_SIZE;
public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR = TransactionLogConfig.DEFAULT_REPLICATION_FACTOR;
public static final int TRANSACTIONS_TOPIC_PARTITIONS = TransactionLogConfig.DEFAULT_NUM_PARTITIONS;
public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES = TransactionLogConfig.DEFAULT_SEGMENT_BYTES;
public static final int TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS = TransactionStateManagerConfig.DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS;
public static final int TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS = TransactionStateManagerConfig.DEFAULT_REMOVE_EXPIRED_TRANSACTIONAL_IDS_INTERVAL_MS;
public static final boolean TRANSACTION_PARTITION_VERIFICATION_ENABLE = true;
public static final int PRODUCER_ID_EXPIRATION_MS = 86400000;
public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS = 600000;
/** ********* Fetch Configuration *********/
public static final int MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS = 1000;
public static final int FETCH_MAX_BYTES = 55 * 1024 * 1024;

View File

@ -16,15 +16,7 @@
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.utils.Utils;
import java.util.Set;
public class ProducerStateManagerConfig {
public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms";
public static final String TRANSACTION_VERIFICATION_ENABLED = "transaction.partition.verification.enable";
public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED);
private volatile int producerIdExpirationMs;
private volatile boolean transactionVerificationEnabled;

View File

@ -22,6 +22,7 @@ import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
@ -118,7 +119,7 @@ public class EmbeddedKafkaCluster {
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig.TransactionsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), true);
for (int i = 0; i < brokers.length; i++) {

View File

@ -1,26 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
public class TransactionLogConfig {
// Log-level config default values
public static final int DEFAULT_NUM_PARTITIONS = 50;
public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
public static final short DEFAULT_REPLICATION_FACTOR = 3;
public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
public final class TransactionLogConfigs {
// Log-level config and default values
public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions";
public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment).";
public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes";
public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024;
public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads";
public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor";
public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this replication factor requirement.";
public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr";
public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2;
public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "The minimum number of replicas that must acknowledge a write to transaction topic in order to be considered successful.";
public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG = "transaction.state.log.load.buffer.size";
public static final int TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large).";
public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG = "transaction.partition.verification.enable";
public static final boolean TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT = true;
public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition";
public static final String PRODUCER_ID_EXPIRATION_MS_CONFIG = "producer.id.expiration.ms";
public static final int PRODUCER_ID_EXPIRATION_MS_DEFAULT = 86400000;
public static final String PRODUCER_ID_EXPIRATION_MS_DOC = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " +
"Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " +
"<code>delivery.timeout.ms</code> can help prevent expiration during retries and protect against message duplication, but the default should be reasonable for most use cases.";
public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG = "producer.id.expiration.check.interval.ms";
public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 600000;
public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing.";
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import java.util.concurrent.TimeUnit;
public class TransactionStateManagerConfig {
// Default transaction management config values
public static final int DEFAULT_TRANSACTIONS_MAX_TIMEOUT_MS = (int) TimeUnit.MINUTES.toMillis(15);
public static final int DEFAULT_TRANSACTIONAL_ID_EXPIRATION_MS = (int) TimeUnit.DAYS.toMillis(7);
public static final int DEFAULT_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS = (int) TimeUnit.SECONDS.toMillis(10);
public static final int DEFAULT_REMOVE_EXPIRED_TRANSACTIONAL_IDS_INTERVAL_MS = (int) TimeUnit.HOURS.toMillis(1);
public static final String METRICS_GROUP = "transaction-coordinator-metrics";
public static final String LOAD_TIME_SENSOR = "TransactionsPartitionLoadTime";
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import java.util.concurrent.TimeUnit;
public final class TransactionStateManagerConfigs {
// Transaction management configs and default values
public static final String TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG = "transaction.max.timeout.ms";
public static final int TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT = (int) TimeUnit.MINUTES.toMillis(15);
public static final String TRANSACTIONS_MAX_TIMEOUT_MS_DOC = "The maximum allowed timeout for transactions. " +
"If a clients requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. " +
"This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.";
public static final String TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG = "transactional.id.expiration.ms";
public static final int TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT = (int) TimeUnit.DAYS.toMillis(7);
public static final String TRANSACTIONAL_ID_EXPIRATION_MS_DOC = "The time in ms that the transaction coordinator will wait without receiving any transaction status updates " +
"for the current transaction before expiring its transactional id. Transactional IDs will not expire while a the transaction is still ongoing.";
public static final String TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG = "transaction.abort.timed.out.transaction.cleanup.interval.ms";
public static final int TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT = (int) TimeUnit.SECONDS.toMillis(10);
public static final String TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC = "The interval at which to rollback transactions that have timed out";
public static final String TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG = "transaction.remove.expired.transaction.cleanup.interval.ms";
public static final int TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT = (int) TimeUnit.HOURS.toMillis(1);
public static final String TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing";
public static final String METRICS_GROUP = "transaction-coordinator-metrics";
public static final String LOAD_TIME_SENSOR = "TransactionsPartitionLoadTime";
}