KAFKA-18509 Move StateChangeLogger to server-common module (#20637)

We can rewrite this class from scala to java and move to server-common
module.  To maintain backward compatibility, we should keep the logger
name `state.change.logger`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-10-06 22:56:57 +08:00 committed by GitHub
parent 24cad50840
commit f68a149a18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 62 additions and 53 deletions

View File

@ -20,7 +20,6 @@ import java.lang.{Long => JLong}
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList}
import kafka.controller.StateChangeLogger
import kafka.log._
import kafka.server._
import kafka.server.share.DelayedShareFetch
@ -37,6 +36,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, PartitionRegistration}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.log.remote.TopicPartitionLog
@ -322,7 +322,7 @@ class Partition(val topicPartition: TopicPartition,
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
private val stateChangeLogger = new StateChangeLogger(localBrokerId)
private val remoteReplicasMap = new ConcurrentHashMap[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import com.typesafe.scalalogging.Logger
import kafka.utils.Logging
object StateChangeLogger {
private val logger = Logger("state.change.logger")
}
/**
* Simple class that sets `logIdent` appropriately depending on whether the state change logger is being used in the
* context of the KafkaController or not (e.g. ReplicaManager and MetadataCache log to the state change logger
* irrespective of whether the broker is the Controller).
*/
class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerEpoch: Option[Int]) extends Logging {
if (controllerEpoch.isDefined && !inControllerContext)
throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true")
override lazy val logger: Logger = StateChangeLogger.logger
locally {
val prefix = if (inControllerContext) "Controller" else "Broker"
val epochEntry = controllerEpoch.fold("")(epoch => s" epoch=$epoch")
logIdent = s"[$prefix id=$brokerId$epochEntry] "
}
}

View File

@ -18,7 +18,6 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.cluster.{Partition, PartitionListener}
import kafka.controller.StateChangeLogger
import kafka.log.LogManager
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
@ -48,6 +47,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition}
@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
@volatile private var isInControlledShutdown = false
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
protected val stateChangeLogger = new StateChangeLogger(localBrokerId)
private var logDirFailureHandler: LogDirFailureHandler = _
@ -789,9 +789,9 @@ class ReplicaManager(val config: KafkaConfig,
hasCustomErrorMessage = customException.isDefined
)
}
// In non-transaction paths, errorResults is typically empty, so we can
// In non-transaction paths, errorResults is typically empty, so we can
// directly use entriesPerPartition instead of creating a new filtered collection
val entriesWithoutErrorsPerPartition =
val entriesWithoutErrorsPerPartition =
if (errorResults.nonEmpty) entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }
else entriesPerPartition
@ -1637,13 +1637,13 @@ class ReplicaManager(val config: KafkaConfig,
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
val (task, result) = processRemoteFetch(remoteFetchInfo)
remoteFetchTasks.put(topicIdPartition, task)
remoteFetchResults.put(topicIdPartition, result)
}
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple class that sets logIdent appropriately depending on whether the state change logger is being used in the
* context of the broker (e.g. ReplicaManager and Partition).
*/
public class StateChangeLogger {
private static final Logger LOGGER = LoggerFactory.getLogger("state.change.logger");
private final String logIdent;
public StateChangeLogger(int brokerId) {
this.logIdent = String.format("[Broker id=%d] ", brokerId);
}
public void trace(String message) {
LOGGER.info("{}{}", logIdent, message);
}
public void info(String message) {
LOGGER.info("{}{}", logIdent, message);
}
public void warn(String message) {
LOGGER.warn("{}{}", logIdent, message);
}
public void error(String message) {
LOGGER.error("{}{}", logIdent, message);
}
public void error(String message, Throwable e) {
LOGGER.error("{}{}", logIdent, message, e);
}
}