MINOR: Move BrokerReconfigurable to the sever-common module (#19383)

This patch moves `BrokerReconfigurable` to the `server-common module`
and decouples the `TransactionLogConfig` and `KafkaConfig` to unblock
KAFKA-14485.

Reviewers: PoAn Yang <payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-04-07 07:39:01 +08:00 committed by GitHub
parent e69a311068
commit 6d68f8a82c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 48 additions and 35 deletions

View File

@ -38,7 +38,7 @@
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />

View File

@ -59,6 +59,7 @@
<allow pkg="org.apache.kafka.metadata" />
<!-- utilities and reusable classes from server-common -->
<allow pkg="org.apache.kafka.config"/>
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.security" />
<allow pkg="org.apache.kafka.server.common" />

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionLogConfig}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler
@ -46,13 +46,14 @@ object TransactionCoordinator {
metadataCache: MetadataCache,
time: Time): TransactionCoordinator = {
val transactionLogConfig = new TransactionLogConfig(config)
val txnConfig = TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
config.transactionLogConfig.transactionTopicPartitions,
config.transactionLogConfig.transactionTopicReplicationFactor,
config.transactionLogConfig.transactionTopicSegmentBytes,
config.transactionLogConfig.transactionLoadBufferSize,
config.transactionLogConfig.transactionTopicMinISR,
transactionLogConfig.transactionTopicPartitions,
transactionLogConfig.transactionTopicReplicationFactor,
transactionLogConfig.transactionTopicSegmentBytes,
transactionLogConfig.transactionLoadBufferSize,
transactionLogConfig.transactionTopicMinISR,
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.transactionStateManagerConfig.transaction2PCEnabled,

View File

@ -29,6 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import scala.jdk.CollectionConverters._
import scala.collection._
@ -1548,6 +1549,7 @@ object LogManager {
val defaultLogConfig = new LogConfig(defaultProps)
val cleanerConfig = LogCleaner.cleanerConfig(config)
val transactionLogConfig = new TransactionLogConfig(config)
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
@ -1560,8 +1562,8 @@ object LogManager {
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxTransactionTimeoutMs = config.transactionStateManagerConfig.transactionMaxTimeoutMs,
producerStateManagerConfig = new ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, config.transactionLogConfig.transactionPartitionVerificationEnable),
producerIdExpirationCheckIntervalMs = config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
producerStateManagerConfig = new ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, transactionLogConfig.transactionPartitionVerificationEnable),
producerIdExpirationCheckIntervalMs = transactionLogConfig.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import scala.collection.{Map, Seq, Set, mutable}
@ -189,10 +190,11 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
val transactionLogConfig = new TransactionLogConfig(config)
new CreatableTopic()
.setName(topic)
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
.setNumPartitions(transactionLogConfig.transactionTopicPartitions)
.setReplicationFactor(transactionLogConfig.transactionTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(
txnCoordinator.transactionTopicConfigs))
case SHARE_GROUP_STATE_TOPIC_NAME =>

View File

@ -35,10 +35,11 @@ import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
import org.apache.kafka.config
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{ProcessRole, DynamicThreadPool}
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -323,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
reconfigurables.add(reconfigurable)
}
def addBrokerReconfigurable(reconfigurable: org.apache.kafka.server.config.BrokerReconfigurable): Unit = {
def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
brokerReconfigurables.add(new BrokerReconfigurable {
override def reconfigurableConfigs: Set[String] = reconfigurable.reconfigurableConfigs().asScala
@ -617,7 +618,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
/**
* Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead.
* Implement [[config.BrokerReconfigurable]] instead.
*/
trait BrokerReconfigurable {

View File

@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
@ -204,7 +204,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
override val transactionLogConfig = new TransactionLogConfig(this)
private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this)
private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig

View File

@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
@ -1038,10 +1039,13 @@ class ReplicaManager(val config: KafkaConfig,
callback: ((Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])) => Unit,
transactionSupportedOperation: TransactionSupportedOperation
): Unit = {
def transactionPartitionVerificationEnable = {
new TransactionLogConfig(config).transactionPartitionVerificationEnable
}
// Skip verification if the request is not transactional or transaction verification is disabled.
if (transactionalId == null ||
(!config.transactionLogConfig.transactionPartitionVerificationEnable && !transactionSupportedOperation.supportsEpochBump)
if (transactionalId == null
|| addPartitionsToTxnManager.isEmpty
|| (!transactionSupportedOperation.supportsEpochBump && !transactionPartitionVerificationEnable)
) {
callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard]))
return

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
@ -331,9 +332,10 @@ class BrokerMetadataPublisher(
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
}
try {
val transactionLogConfig = new TransactionLogConfig(config)
// Start the transaction coordinator.
txnCoordinator.startup(() => metadataCache.numPartitions(
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(config.transactionLogConfig.transactionTopicPartitions))
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(transactionLogConfig.transactionTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t)
}

View File

@ -2523,7 +2523,8 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
TestUtils.waitUntilTrue(() => config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
val transactionLogConfig = new TransactionLogConfig(config)
TestUtils.waitUntilTrue(() => transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
// Try to append more records. We don't need to send a request since the transaction is already ongoing.
val moreTransactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence + 1,
@ -2575,7 +2576,8 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
TestUtils.waitUntilTrue(() => !config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
val transactionLogConfig = new TransactionLogConfig(config)
TestUtils.waitUntilTrue(() => !transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.config;
package org.apache.kafka.config;
import org.apache.kafka.common.config.AbstractConfig;
import java.util.Set;
@ -27,13 +29,10 @@ import java.util.Set;
* The reconfiguration process follows three steps:
* <ol>
* <li>Determining which configurations can be dynamically updated via {@link #reconfigurableConfigs()}</li>
* <li>Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractKafkaConfig)}</li>
* <li>Applying the new configuration via {@link #reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li>
* <li>Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractConfig)}</li>
* <li>Applying the new configuration via {@link #reconfigure(AbstractConfig, AbstractConfig)}</li>
* </ol>
* <strong>Note: Since Kafka is eliminating Scala, developers should implement this interface instead of {@link kafka.server.BrokerReconfigurable}</strong>
*
*
* @see AbstractKafkaConfig
*/
public interface BrokerReconfigurable {
/**
@ -55,7 +54,7 @@ public interface BrokerReconfigurable {
*
* @param newConfig the new configuration to validate
*/
void validateReconfiguration(AbstractKafkaConfig newConfig);
void validateReconfiguration(AbstractConfig newConfig);
/**
* Applies the new configuration.
@ -65,5 +64,5 @@ public interface BrokerReconfigurable {
* @param oldConfig the previous configuration
* @param newConfig the new configuration to apply
*/
void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig newConfig);
void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
}

View File

@ -83,6 +83,4 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
public int backgroundThreads() {
return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
}
public abstract TransactionLogConfig transactionLogConfig();
}

View File

@ -16,7 +16,9 @@
*/
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.config.BrokerReconfigurable;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
@ -39,16 +41,16 @@ public class DynamicProducerStateManagerConfig implements BrokerReconfigurable {
}
@Override
public void validateReconfiguration(AbstractKafkaConfig newConfig) {
TransactionLogConfig transactionLogConfig = newConfig.transactionLogConfig();
public void validateReconfiguration(AbstractConfig newConfig) {
TransactionLogConfig transactionLogConfig = new TransactionLogConfig(newConfig);
if (transactionLogConfig.producerIdExpirationMs() < 0)
throw new ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot be less than 0, current value is " +
producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " + transactionLogConfig.producerIdExpirationMs());
}
@Override
public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig newConfig) {
TransactionLogConfig transactionLogConfig = newConfig.transactionLogConfig();
public void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig) {
TransactionLogConfig transactionLogConfig = new TransactionLogConfig(newConfig);
if (producerStateManagerConfig.producerIdExpirationMs() != transactionLogConfig.producerIdExpirationMs()) {
log.info("Reconfigure {} from {} to {}",
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,